From 370a7ace4b3c8ffb6187900f37499990f1b976a2 Mon Sep 17 00:00:00 2001 From: Toshio Kuratomi Date: Mon, 18 Dec 2017 10:17:13 -0800 Subject: [PATCH] Split basic units (#33510) Split the one monolithic test for basic.py into several files * Split test_basic.py along categories. This is preliminary to get a handle on things. Eventually we may want to further split it so each file is only testing a single function. * Cleanup unused imports from splitting test_basic.py * Port atomic_move test to pytest. Working on getting rid of need to maintain procenv * Split a test of symbolic_mode_to_octal to follow unittest best practices Each test should only invoke the function under test once * Port test_argument_spec to pytest. * Fix suboptions failure --- lib/ansible/module_utils/basic.py | 2 +- .../basic/test__symbolic_mode_to_octal.py | 29 +- .../module_utils/basic/test_argument_spec.py | 430 +++++- .../module_utils/basic/test_atomic_move.py | 217 +++ .../module_utils/basic/test_deprecate_warn.py | 7 - .../basic/test_dict_converters.py | 31 + .../module_utils/basic/test_exit_json.py | 6 +- .../module_utils/basic/test_filesystem.py | 136 ++ .../basic/test_get_module_path.py | 22 + .../basic/test_heuristic_log_sanitize.py | 4 - test/units/module_utils/basic/test_imports.py | 132 ++ test/units/module_utils/basic/test_log.py | 11 - test/units/module_utils/basic/test_no_log.py | 27 +- .../basic/test_platform_distribution.py | 108 ++ test/units/module_utils/basic/test_selinux.py | 252 ++++ test/units/module_utils/test_basic.py | 1275 ----------------- 16 files changed, 1330 insertions(+), 1359 deletions(-) create mode 100644 test/units/module_utils/basic/test_atomic_move.py create mode 100644 test/units/module_utils/basic/test_dict_converters.py create mode 100644 test/units/module_utils/basic/test_filesystem.py create mode 100644 test/units/module_utils/basic/test_get_module_path.py create mode 100644 test/units/module_utils/basic/test_imports.py create mode 100644 test/units/module_utils/basic/test_platform_distribution.py create mode 100644 test/units/module_utils/basic/test_selinux.py delete mode 100644 test/units/module_utils/test_basic.py diff --git a/lib/ansible/module_utils/basic.py b/lib/ansible/module_utils/basic.py index 74fdc3c467e..4c26cb91d85 100644 --- a/lib/ansible/module_utils/basic.py +++ b/lib/ansible/module_utils/basic.py @@ -1923,7 +1923,7 @@ class AnsibleModule(object): wanted = v.get('type', None) if wanted == 'dict' or (wanted == 'list' and v.get('elements', '') == 'dict'): spec = v.get('options', None) - if spec is None or not params[k]: + if spec is None or k not in params or params[k] is None: continue self._options_context.append(k) diff --git a/test/units/module_utils/basic/test__symbolic_mode_to_octal.py b/test/units/module_utils/basic/test__symbolic_mode_to_octal.py index b83169e8530..cdfc717474c 100644 --- a/test/units/module_utils/basic/test__symbolic_mode_to_octal.py +++ b/test/units/module_utils/basic/test__symbolic_mode_to_octal.py @@ -10,9 +10,6 @@ __metaclass__ = type import pytest -from ansible.compat.tests import unittest -from ansible.compat.tests.mock import MagicMock - from ansible.module_utils.basic import AnsibleModule @@ -68,6 +65,10 @@ DATA = ( # Going from no permissions to setting all for user, group, and/or oth (0o100000, u'u=rw-x+X,g=r-x+X,o=r-x+X', 0o0644), ) +UMASK_DATA = ( + (0o100000, '+rwx', 0o770), + (0o100777, '-rwx', 0o007), +) INVALID_DATA = ( (0o040000, u'a=foo', "bad symbolic permission for mode: a=foo"), @@ -76,30 +77,26 @@ INVALID_DATA = ( @pytest.mark.parametrize('stat_info, mode_string, expected', DATA) -def test_good_symbolic_modes(stat_info, mode_string, expected): - mock_stat = MagicMock() +def test_good_symbolic_modes(mocker, stat_info, mode_string, expected): + mock_stat = mocker.MagicMock() mock_stat.st_mode = stat_info assert AnsibleModule._symbolic_mode_to_octal(mock_stat, mode_string) == expected -def test_umask_with_symbolic_modes(mocker): - mock_stat = MagicMock() - mock_stat.st_mode = 0o100000 - +@pytest.mark.parametrize('stat_info, mode_string, expected', UMASK_DATA) +def test_umask_with_symbolic_modes(mocker, stat_info, mode_string, expected): mock_umask = mocker.patch('os.umask') mock_umask.return_value = 0o7 - assert AnsibleModule._symbolic_mode_to_octal(mock_stat, '+rwx') == 0o770 + mock_stat = mocker.MagicMock() + mock_stat.st_mode = stat_info - mock_stat = MagicMock() - mock_stat.st_mode = 0o100777 - - assert AnsibleModule._symbolic_mode_to_octal(mock_stat, '-rwx') == 0o007 + assert AnsibleModule._symbolic_mode_to_octal(mock_stat, mode_string) == expected @pytest.mark.parametrize('stat_info, mode_string, expected', INVALID_DATA) -def test_invalid_symbolic_modes(stat_info, mode_string, expected): - mock_stat = MagicMock() +def test_invalid_symbolic_modes(mocker, stat_info, mode_string, expected): + mock_stat = mocker.MagicMock() mock_stat.st_mode = stat_info with pytest.raises(ValueError) as exc: assert AnsibleModule._symbolic_mode_to_octal(mock_stat, mode_string) == 'blah' diff --git a/test/units/module_utils/basic/test_argument_spec.py b/test/units/module_utils/basic/test_argument_spec.py index 7e167454f20..babca5a0744 100644 --- a/test/units/module_utils/basic/test_argument_spec.py +++ b/test/units/module_utils/basic/test_argument_spec.py @@ -1,51 +1,447 @@ -# Copyright (c) 2017 Ansible Project +# -*- coding: utf-8 -*- +# (c) 2012-2014, Michael DeHaan +# (c) 2016 Toshio Kuratomi +# Copyright: Ansible Project # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) -from __future__ import (absolute_import, division) +from __future__ import absolute_import, division, print_function __metaclass__ = type import json +import os import pytest -from ansible.compat.tests.mock import MagicMock +from ansible.compat.tests.mock import MagicMock, patch from ansible.module_utils import basic +from ansible.module_utils.six import string_types +from ansible.module_utils.six.moves import builtins + +from units.mock.procenv import ModuleTestCase, swap_stdin_and_argv -MOCK_VALIDATOR_SUCCESS = MagicMock(return_value=42) MOCK_VALIDATOR_FAIL = MagicMock(side_effect=TypeError("bad conversion")) # Data is argspec, argument, expected VALID_SPECS = ( - ({'arg': {'type': int}}, {'arg': 42}, 42), - ({'arg': {'type': int}}, {'arg': '42'}, 42), - ({'arg': {'type': MOCK_VALIDATOR_SUCCESS}}, {'arg': 42}, 42), + # Simple type=int + ({'arg': {'type': 'int'}}, {'arg': 42}, 42), + # Type=int with conversion from string + ({'arg': {'type': 'int'}}, {'arg': '42'}, 42), + # Simple type=float + ({'arg': {'type': 'float'}}, {'arg': 42.0}, 42.0), + # Type=float conversion from int + ({'arg': {'type': 'float'}}, {'arg': 42}, 42.0), + # Type=float conversion from string + ({'arg': {'type': 'float'}}, {'arg': '42.0'}, 42.0), + # Type=float conversion from string without decimal point + ({'arg': {'type': 'float'}}, {'arg': '42'}, 42.0), + # Simple type=bool + ({'arg': {'type': 'bool'}}, {'arg': True}, True), + # Type=int with conversion from string + ({'arg': {'type': 'bool'}}, {'arg': 'yes'}, True), + # Type=str converts to string + ({'arg': {'type': 'str'}}, {'arg': 42}, '42'), + # Type is implicit, converts to string + ({'arg': {'type': 'str'}}, {'arg': 42}, '42'), + # parameter is required + ({'arg': {'required': True}}, {'arg': 42}, '42'), ) INVALID_SPECS = ( - ({'arg': {'type': int}}, {'arg': "bad"}, "invalid literal for int() with base 10: 'bad'"), + # Type is int; unable to convert this string + ({'arg': {'type': 'int'}}, {'arg': "bad"}, "invalid literal for int() with base 10: 'bad'"), + # Type is int; unable to convert float + ({'arg': {'type': 'int'}}, {'arg': 42.1}, "'float'> cannot be converted to an int"), + # type is a callable that fails to convert ({'arg': {'type': MOCK_VALIDATOR_FAIL}}, {'arg': "bad"}, "bad conversion"), + # unknown parameter + ({'arg': {'type': 'int'}}, {'other': 'bad', '_ansible_module_name': 'ansible_unittest'}, + 'Unsupported parameters for (ansible_unittest) module: other Supported parameters include: arg'), + # parameter is required + ({'arg': {'required': True}}, {}, 'missing required arguments: arg'), ) -@pytest.mark.parametrize('argspec, expected, am, stdin', [(s[0], s[2], s[0], s[1]) for s in VALID_SPECS], - indirect=['am', 'stdin']) -def test_validator_success(am, mocker, argspec, expected): +@pytest.fixture +def complex_argspec(): + arg_spec = dict( + foo=dict(required=True, aliases=['dup']), + bar=dict(), + bam=dict(), + baz=dict(fallback=(basic.env_fallback, ['BAZ'])), + bar1=dict(type='bool'), + zardoz=dict(choices=['one', 'two']), + ) + mut_ex = (('bar', 'bam'),) + req_to = (('bam', 'baz'),) - type_ = argspec['arg']['type'] - if isinstance(type_, MagicMock): - assert type_.called + kwargs = dict( + argument_spec=arg_spec, + mutually_exclusive=mut_ex, + required_together=req_to, + no_log=True, + add_file_common_args=True, + supports_check_mode=True, + ) + return kwargs + + +@pytest.fixture +def options_argspec_list(): + options_spec = dict( + foo=dict(required=True, aliases=['dup']), + bar=dict(), + bam=dict(), + baz=dict(fallback=(basic.env_fallback, ['BAZ'])), + bam1=dict(), + bam2=dict(default='test'), + bam3=dict(type='bool'), + ) + + arg_spec = dict( + foobar=dict( + type='list', + elements='dict', + options=options_spec, + mutually_exclusive=[ + ['bam', 'bam1'] + ], + required_if=[ + ['foo', 'hello', ['bam']], + ['foo', 'bam2', ['bam2']] + ], + required_one_of=[ + ['bar', 'bam'] + ], + required_together=[ + ['bam1', 'baz'] + ] + ) + ) + + kwargs = dict( + argument_spec=arg_spec, + no_log=True, + add_file_common_args=True, + supports_check_mode=True + ) + return kwargs + + +@pytest.fixture +def options_argspec_dict(): + # should test ok, for options in dict format. + kwargs = options_argspec_list() + kwargs['argument_spec']['foobar']['type'] = 'dict' + + return kwargs + + +# +# Tests for one aspect of arg_spec +# + +@pytest.mark.parametrize('argspec, expected, stdin', [(s[0], s[2], s[1]) for s in VALID_SPECS], + indirect=['stdin']) +def test_validator_basic_types(argspec, expected, stdin): + + am = basic.AnsibleModule(argspec) + + if 'type' in argspec['arg']: + type_ = getattr(builtins, argspec['arg']['type']) else: - assert isinstance(am.params['arg'], type_) + type_ = str + + assert isinstance(am.params['arg'], type_) assert am.params['arg'] == expected +@pytest.mark.parametrize('stdin', [{'arg': 42}], indirect=['stdin']) +def test_validator_function(mocker, stdin): + # Type is a callable + MOCK_VALIDATOR_SUCCESS = mocker.MagicMock(return_value=27) + argspec = {'arg': {'type': MOCK_VALIDATOR_SUCCESS}} + am = basic.AnsibleModule(argspec) + + assert isinstance(am.params['arg'], int) + assert am.params['arg'] == 27 + + @pytest.mark.parametrize('argspec, expected, stdin', [(s[0], s[2], s[1]) for s in INVALID_SPECS], indirect=['stdin']) def test_validator_fail(stdin, capfd, argspec, expected): - with pytest.raises(SystemExit) as ecm: - m = basic.AnsibleModule(argument_spec=argspec) + with pytest.raises(SystemExit): + basic.AnsibleModule(argument_spec=argspec) out, err = capfd.readouterr() assert not err assert expected in json.loads(out)['msg'] assert json.loads(out)['failed'] + + +class TestComplexArgSpecs: + """Test with a more complex arg_spec""" + + @pytest.mark.parametrize('stdin', [{'foo': 'hello'}, {'dup': 'hello'}], indirect=['stdin']) + def test_complex_required(self, stdin, complex_argspec): + """Test that the complex argspec works if we give it its required param as either the canonical or aliased name""" + am = basic.AnsibleModule(**complex_argspec) + assert isinstance(am.params['foo'], str) + assert am.params['foo'] == 'hello' + + @pytest.mark.parametrize('stdin', [{'foo': 'hello', 'bam': 'test'}], indirect=['stdin']) + def test_complex_type_fallback(self, mocker, stdin, complex_argspec): + """Test that the complex argspec works if we get a required parameter via fallback""" + environ = os.environ.copy() + environ['BAZ'] = 'test data' + mocker.patch('ansible.module_utils.basic.os.environ', environ) + + am = basic.AnsibleModule(**complex_argspec) + + assert isinstance(am.params['baz'], str) + assert am.params['baz'] == 'test data' + + @pytest.mark.parametrize('stdin', [{'foo': 'hello', 'bar': 'bad', 'bam': 'bad2'}], indirect=['stdin']) + def test_fail_mutually_exclusive(self, capfd, stdin, complex_argspec): + """Fail because of mutually exclusive parameters""" + with pytest.raises(SystemExit): + am = basic.AnsibleModule(**complex_argspec) + + out, err = capfd.readouterr() + results = json.loads(out) + + assert results['failed'] + assert results['msg'] == "parameters are mutually exclusive: bar, bam" + + @pytest.mark.parametrize('stdin', [{'foo': 'hello', 'bam': 'bad2'}], indirect=['stdin']) + def test_fail_required_together(self, capfd, stdin, complex_argspec): + """Fail because only one of a required_together pair of parameters was specified""" + with pytest.raises(SystemExit): + am = basic.AnsibleModule(**complex_argspec) + + out, err = capfd.readouterr() + results = json.loads(out) + + assert results['failed'] + assert results['msg'] == "parameters are required together: bam, baz" + + @pytest.mark.parametrize('stdin', [{'foo': 'hello', 'bar': 'hi'}], indirect=['stdin']) + def test_fail_required_together_and_default(self, capfd, stdin, complex_argspec): + """Fail because one of a required_together pair of parameters has a default and the other was not specified""" + complex_argspec['argument_spec']['baz'] = {'default': 42} + with pytest.raises(SystemExit): + am = basic.AnsibleModule(**complex_argspec) + + out, err = capfd.readouterr() + results = json.loads(out) + + assert results['failed'] + assert results['msg'] == "parameters are required together: bam, baz" + + @pytest.mark.parametrize('stdin', [{'foo': 'hello'}], indirect=['stdin']) + def test_fail_required_together_and_fallback(self, capfd, mocker, stdin, complex_argspec): + """Fail because one of a required_together pair of parameters has a fallback and the other was not specified""" + environ = os.environ.copy() + environ['BAZ'] = 'test data' + mocker.patch('ansible.module_utils.basic.os.environ', environ) + + with pytest.raises(SystemExit): + am = basic.AnsibleModule(**complex_argspec) + + out, err = capfd.readouterr() + results = json.loads(out) + + assert results['failed'] + assert results['msg'] == "parameters are required together: bam, baz" + + +class TestComplexOptions: + """Test arg spec options""" + + # (Paramaters, expected value of module.params['foobar']) + OPTIONS_PARAMS_LIST = ( + ({'foobar': [{"foo": "hello", "bam": "good"}, {"foo": "test", "bar": "good"}]}, + [{'foo': 'hello', 'bam': 'good', 'bam2': 'test', 'bar': None, 'baz': None, 'bam1': None, 'bam3': None}, + {'foo': 'test', 'bam': None, 'bam2': 'test', 'bar': 'good', 'baz': None, 'bam1': None, 'bam3': None}, + ]), + # Alias for required param + ({'foobar': [{"dup": "test", "bar": "good"}]}, + [{'foo': 'test', 'dup': 'test', 'bam': None, 'bam2': 'test', 'bar': 'good', 'baz': None, 'bam1': None, 'bam3': None}] + ), + # Required_if utilizing default value of the requirement + ({'foobar': [{"foo": "bam2", "bar": "required_one_of"}]}, + [{'bam': None, 'bam1': None, 'bam2': 'test', 'bam3': None, 'bar': 'required_one_of', 'baz': None, 'foo': 'bam2'}] + ), + # Check that a bool option is converted + ({"foobar": [{"foo": "required", "bam": "good", "bam3": "yes"}]}, + [{'bam': 'good', 'bam1': None, 'bam2': 'test', 'bam3': True, 'bar': None, 'baz': None, 'foo': 'required'}] + ), + ) + + # (Paramaters, expected value of module.params['foobar']) + OPTIONS_PARAMS_DICT = ( + ({'foobar': {"foo": "hello", "bam": "good"}}, + {'foo': 'hello', 'bam': 'good', 'bam2': 'test', 'bar': None, 'baz': None, 'bam1': None, 'bam3': None} + ), + # Alias for required param + ({'foobar': {"dup": "test", "bar": "good"}}, + {'foo': 'test', 'dup': 'test', 'bam': None, 'bam2': 'test', 'bar': 'good', 'baz': None, 'bam1': None, 'bam3': None} + ), + # Required_if utilizing default value of the requirement + ({'foobar': {"foo": "bam2", "bar": "required_one_of"}}, + {'bam': None, 'bam1': None, 'bam2': 'test', 'bam3': None, 'bar': 'required_one_of', 'baz': None, 'foo': 'bam2'} + ), + # Check that a bool option is converted + ({"foobar": {"foo": "required", "bam": "good", "bam3": "yes"}}, + {'bam': 'good', 'bam1': None, 'bam2': 'test', 'bam3': True, 'bar': None, 'baz': None, 'foo': 'required'} + ), + ) + + # (Paramaters, failure message) + FAILING_PARAMS_LIST = ( + # Missing required option + ({'foobar': [{}]}, 'missing required arguments: foo found in foobar'), + # Invalid option + ({'foobar': [{"foo": "hello", "bam": "good", "invalid": "bad"}]}, 'module: invalid found in foobar. Supported parameters include'), + # Mutually exclusive options found + ({'foobar': [{"foo": "test", "bam": "bad", "bam1": "bad", "baz": "req_to"}]}, + 'parameters are mutually exclusive: bam, bam1 found in foobar'), + # required_if fails + ({'foobar': [{"foo": "hello", "bar": "bad"}]}, + 'foo is hello but all of the following are missing: bam found in foobar'), + # Missing required_one_of option + ({'foobar': [{"foo": "test"}]}, + 'one of the following is required: bar, bam found in foobar'), + # Missing required_together option + ({'foobar': [{"foo": "test", "bar": "required_one_of", "bam1": "bad"}]}, + 'parameters are required together: bam1, baz found in foobar'), + ) + + # (Paramaters, failure message) + FAILING_PARAMS_DICT = ( + # Missing required option + ({'foobar': {}}, 'missing required arguments: foo found in foobar'), + # Invalid option + ({'foobar': {"foo": "hello", "bam": "good", "invalid": "bad"}}, + 'module: invalid found in foobar. Supported parameters include'), + # Mutually exclusive options found + ({'foobar': {"foo": "test", "bam": "bad", "bam1": "bad", "baz": "req_to"}}, + 'parameters are mutually exclusive: bam, bam1 found in foobar'), + # required_if fails + ({'foobar': {"foo": "hello", "bar": "bad"}}, + 'foo is hello but all of the following are missing: bam found in foobar'), + # Missing required_one_of option + ({'foobar': {"foo": "test"}}, + 'one of the following is required: bar, bam found in foobar'), + # Missing required_together option + ({'foobar': {"foo": "test", "bar": "required_one_of", "bam1": "bad"}}, + 'parameters are required together: bam1, baz found in foobar'), + ) + + @pytest.mark.parametrize('stdin, expected', OPTIONS_PARAMS_DICT, indirect=['stdin']) + def test_options_type_dict(self, stdin, options_argspec_dict, expected): + """Test that a basic creation with required and required_if works""" + # should test ok, tests basic foo requirement and required_if + am = basic.AnsibleModule(**options_argspec_dict) + + assert isinstance(am.params['foobar'], dict) + assert am.params['foobar'] == expected + + @pytest.mark.parametrize('stdin, expected', OPTIONS_PARAMS_LIST, indirect=['stdin']) + def test_options_type_list(self, stdin, options_argspec_list, expected): + """Test that a basic creation with required and required_if works""" + # should test ok, tests basic foo requirement and required_if + am = basic.AnsibleModule(**options_argspec_list) + + assert isinstance(am.params['foobar'], list) + assert am.params['foobar'] == expected + + @pytest.mark.parametrize('stdin, expected', FAILING_PARAMS_DICT, indirect=['stdin']) + def test_fail_validate_options_dict(self, capfd, stdin, options_argspec_dict, expected): + """Fail because one of a required_together pair of parameters has a default and the other was not specified""" + with pytest.raises(SystemExit): + am = basic.AnsibleModule(**options_argspec_dict) + + out, err = capfd.readouterr() + results = json.loads(out) + + assert results['failed'] + assert expected in results['msg'] + + @pytest.mark.parametrize('stdin, expected', FAILING_PARAMS_LIST, indirect=['stdin']) + def test_fail_validate_options_list(self, capfd, stdin, options_argspec_list, expected): + """Fail because one of a required_together pair of parameters has a default and the other was not specified""" + with pytest.raises(SystemExit): + am = basic.AnsibleModule(**options_argspec_list) + + out, err = capfd.readouterr() + results = json.loads(out) + + assert results['failed'] + assert expected in results['msg'] + + @pytest.mark.parametrize('stdin', [{'foobar': {'foo': 'required', 'bam1': 'test', 'bar': 'case'}}], indirect=['stdin']) + def test_fallback_in_option(self, mocker, stdin, options_argspec_dict): + """Test that the complex argspec works if we get a required parameter via fallback""" + environ = os.environ.copy() + environ['BAZ'] = 'test data' + mocker.patch('ansible.module_utils.basic.os.environ', environ) + + am = basic.AnsibleModule(**options_argspec_dict) + + assert isinstance(am.params['foobar']['baz'], str) + assert am.params['foobar']['baz'] == 'test data' + + +class TestLoadFileCommonArguments: + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_smoketest_load_file_common_args(self, am): + """With no file arguments, an empty dict is returned""" + am.selinux_mls_enabled = MagicMock() + am.selinux_mls_enabled.return_value = True + am.selinux_default_context = MagicMock() + am.selinux_default_context.return_value = 'unconfined_u:object_r:default_t:s0'.split(':', 3) + + assert am.load_file_common_arguments(params={}) == {} + + @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) + def test_load_file_common_args(self, am, mocker): + am.selinux_mls_enabled = MagicMock() + am.selinux_mls_enabled.return_value = True + am.selinux_default_context = MagicMock() + am.selinux_default_context.return_value = 'unconfined_u:object_r:default_t:s0'.split(':', 3) + + base_params = dict( + path='/path/to/file', + mode=0o600, + owner='root', + group='root', + seuser='_default', + serole='_default', + setype='_default', + selevel='_default', + ) + + extended_params = base_params.copy() + extended_params.update(dict( + follow=True, + foo='bar', + )) + + final_params = base_params.copy() + final_params.update(dict( + path='/path/to/real_file', + secontext=['unconfined_u', 'object_r', 'default_t', 's0'], + attributes=None, + )) + + # with the proper params specified, the returned dictionary should represent + # only those params which have something to do with the file arguments, excluding + # other params and updated as required with proper values which may have been + # massaged by the method + mocker.patch('os.path.islink', return_value=True) + mocker.patch('os.path.realpath', return_value='/path/to/real_file') + + res = am.load_file_common_arguments(params=extended_params) + + assert res == final_params diff --git a/test/units/module_utils/basic/test_atomic_move.py b/test/units/module_utils/basic/test_atomic_move.py new file mode 100644 index 00000000000..d1dc4d76762 --- /dev/null +++ b/test/units/module_utils/basic/test_atomic_move.py @@ -0,0 +1,217 @@ +# -*- coding: utf-8 -*- +# (c) 2012-2014, Michael DeHaan +# (c) 2016 Toshio Kuratomi +# (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +import errno +import json +from itertools import product + +import pytest + +from ansible.module_utils import basic + + +@pytest.fixture +def atomic_am(am, mocker): + am.selinux_enabled = mocker.MagicMock() + am.selinux_context = mocker.MagicMock() + am.selinux_default_context = mocker.MagicMock() + am.set_context_if_different = mocker.MagicMock() + + yield am + + +@pytest.fixture +def atomic_mocks(mocker): + environ = dict() + mocks = { + 'chmod': mocker.patch('os.chmod'), + 'chown': mocker.patch('os.chown'), + 'close': mocker.patch('os.close'), + 'environ': mocker.patch('os.environ', environ), + 'getlogin': mocker.patch('os.getlogin'), + 'getuid': mocker.patch('os.getuid'), + 'path_exists': mocker.patch('os.path.exists'), + 'rename': mocker.patch('os.rename'), + 'stat': mocker.patch('os.stat'), + 'umask': mocker.patch('os.umask'), + 'getpwuid': mocker.patch('pwd.getpwuid'), + 'copy2': mocker.patch('shutil.copy2'), + 'copyfileobj': mocker.patch('shutil.copyfileobj'), + 'move': mocker.patch('shutil.move'), + 'mkstemp': mocker.patch('tempfile.mkstemp'), + } + + mocks['getlogin'].return_value = 'root' + mocks['getuid'].return_value = 0 + mocks['getpwuid'].return_value = ('root', '', 0, 0, '', '', '') + mocks['umask'].side_effect = [18, 0] + mocks['rename'].return_value = None + + yield mocks + + +@pytest.fixture +def fake_stat(mocker): + stat1 = mocker.MagicMock() + stat1.st_mode = 0o0644 + stat1.st_uid = 0 + stat1.st_gid = 0 + yield stat1 + + +@pytest.mark.parametrize('stdin, selinux', product([{}], (True, False)), indirect=['stdin']) +def test_new_file(atomic_am, atomic_mocks, mocker, selinux): + # test destination does not exist, login name = 'root', no environment, os.rename() succeeds + mock_context = atomic_am.selinux_default_context.return_value + atomic_mocks['path_exists'].return_value = False + atomic_am.selinux_enabled.return_value = selinux + + atomic_am.atomic_move('/path/to/src', '/path/to/dest') + + atomic_mocks['rename'].assert_called_with(b'/path/to/src', b'/path/to/dest') + assert atomic_mocks['chmod'].call_args_list == [mocker.call(b'/path/to/dest', basic.DEFAULT_PERM & ~18)] + + if selinux: + assert atomic_am.selinux_default_context.call_args_list == [mocker.call('/path/to/dest')] + assert atomic_am.set_context_if_different.call_args_list == [mocker.call('/path/to/dest', mock_context, False)] + else: + assert not atomic_am.selinux_default_context.called + assert not atomic_am.set_context_if_different.called + + +@pytest.mark.parametrize('stdin, selinux', product([{}], (True, False)), indirect=['stdin']) +def test_existing_file(atomic_am, atomic_mocks, fake_stat, mocker, selinux): + # Test destination already present + mock_context = atomic_am.selinux_context.return_value + atomic_mocks['stat'].return_value = fake_stat + atomic_mocks['path_exists'].return_value = True + atomic_am.selinux_enabled.return_value = selinux + + atomic_am.atomic_move('/path/to/src', '/path/to/dest') + + atomic_mocks['rename'].assert_called_with(b'/path/to/src', b'/path/to/dest') + assert atomic_mocks['chmod'].call_args_list == [mocker.call(b'/path/to/src', basic.DEFAULT_PERM & ~18)] + + if selinux: + assert atomic_am.set_context_if_different.call_args_list == [mocker.call('/path/to/dest', mock_context, False)] + assert atomic_am.selinux_context.call_args_list == [mocker.call('/path/to/dest')] + else: + assert not atomic_am.selinux_default_context.called + assert not atomic_am.set_context_if_different.called + + +@pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) +def test_no_tty_fallback(atomic_am, atomic_mocks, fake_stat, mocker): + """Raise OSError when using getlogin() to simulate no tty cornercase""" + mock_context = atomic_am.selinux_context.return_value + atomic_mocks['stat'].return_value = fake_stat + atomic_mocks['path_exists'].return_value = True + atomic_am.selinux_enabled.return_value = True + atomic_mocks['getlogin'].side_effect = OSError() + atomic_mocks['environ']['LOGNAME'] = 'root' + + atomic_am.atomic_move('/path/to/src', '/path/to/dest') + + atomic_mocks['rename'].assert_called_with(b'/path/to/src', b'/path/to/dest') + assert atomic_mocks['chmod'].call_args_list == [mocker.call(b'/path/to/src', basic.DEFAULT_PERM & ~18)] + + assert atomic_am.set_context_if_different.call_args_list == [mocker.call('/path/to/dest', mock_context, False)] + assert atomic_am.selinux_context.call_args_list == [mocker.call('/path/to/dest')] + + +@pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) +def test_existing_file_stat_failure(atomic_am, atomic_mocks, mocker): + """Failure to stat an existing file in order to copy permissions propogates the error (unless EPERM)""" + atomic_mocks['stat'].side_effect = OSError() + atomic_mocks['path_exists'].return_value = True + + with pytest.raises(OSError): + atomic_am.atomic_move('/path/to/src', '/path/to/dest') + + +@pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) +def test_existing_file_stat_perms_failure(atomic_am, atomic_mocks, mocker): + """Failure to stat an existing file to copy the permissions due to permissions passes fine""" + # and now have os.stat return EPERM, which should not fail + mock_context = atomic_am.selinux_context.return_value + atomic_mocks['stat'].side_effect = OSError(errno.EPERM, 'testing os stat with EPERM') + atomic_mocks['path_exists'].return_value = True + atomic_am.selinux_enabled.return_value = True + + atomic_am.atomic_move('/path/to/src', '/path/to/dest') + + atomic_mocks['rename'].assert_called_with(b'/path/to/src', b'/path/to/dest') + # FIXME: Should atomic_move() set a default permission value when it cannot retrieve the + # existing file's permissions? (Right now it's up to the calling code. + # assert atomic_mocks['chmod'].call_args_list == [mocker.call(b'/path/to/src', basic.DEFAULT_PERM & ~18)] + assert atomic_am.set_context_if_different.call_args_list == [mocker.call('/path/to/dest', mock_context, False)] + assert atomic_am.selinux_context.call_args_list == [mocker.call('/path/to/dest')] + + +@pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) +def test_rename_failure(atomic_am, atomic_mocks, mocker, capfd): + """Test os.rename fails with EIO, causing it to bail out""" + atomic_mocks['path_exists'].side_effect = [False, False] + atomic_mocks['rename'].side_effect = OSError(errno.EIO, 'failing with EIO') + + with pytest.raises(SystemExit): + atomic_am.atomic_move('/path/to/src', '/path/to/dest') + + out, err = capfd.readouterr() + results = json.loads(out) + + assert 'Could not replace file' in results['msg'] + assert 'failing with EIO' in results['msg'] + assert results['failed'] + + +@pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) +def test_rename_perms_fail_temp_creation_fails(atomic_am, atomic_mocks, mocker, capfd): + """Test os.rename fails with EPERM working but failure in mkstemp""" + atomic_mocks['path_exists'].return_value = False + atomic_mocks['close'].return_value = None + atomic_mocks['rename'].side_effect = [OSError(errno.EPERM, 'failing with EPERM'), None] + atomic_mocks['mkstemp'].return_value = None + atomic_mocks['mkstemp'].side_effect = OSError() + atomic_am.selinux_enabled.return_value = False + + with pytest.raises(SystemExit): + atomic_am.atomic_move('/path/to/src', '/path/to/dest') + + out, err = capfd.readouterr() + results = json.loads(out) + + assert 'is not writable by the current user' in results['msg'] + assert results['failed'] + + +@pytest.mark.parametrize('stdin, selinux', product([{}], (True, False)), indirect=['stdin']) +def test_rename_perms_fail_temp_succeeds(atomic_am, atomic_mocks, fake_stat, mocker, selinux): + """Test os.rename raising an error but fallback to using mkstemp works""" + mock_context = atomic_am.selinux_default_context.return_value + atomic_mocks['path_exists'].return_value = False + atomic_mocks['rename'].side_effect = [OSError(errno.EPERM, 'failing with EPERM'), None] + atomic_mocks['stat'].return_value = [fake_stat, fake_stat, fake_stat] + atomic_mocks['stat'].side_effect = None + atomic_mocks['mkstemp'].return_value = (None, '/path/to/tempfile') + atomic_mocks['mkstemp'].side_effect = None + atomic_am.selinux_enabled.return_value = selinux + + atomic_am.atomic_move('/path/to/src', '/path/to/dest') + assert atomic_mocks['rename'].call_args_list == [mocker.call(b'/path/to/src', b'/path/to/dest'), + mocker.call(b'/path/to/tempfile', b'/path/to/dest')] + assert atomic_mocks['chmod'].call_args_list == [mocker.call(b'/path/to/dest', basic.DEFAULT_PERM & ~18)] + + if selinux: + assert atomic_am.selinux_default_context.call_args_list == [mocker.call('/path/to/dest')] + assert atomic_am.set_context_if_different.call_args_list == [mocker.call(b'/path/to/tempfile', mock_context, False), + mocker.call('/path/to/dest', mock_context, False)] + else: + assert not atomic_am.selinux_default_context.called + assert not atomic_am.set_context_if_different.called diff --git a/test/units/module_utils/basic/test_deprecate_warn.py b/test/units/module_utils/basic/test_deprecate_warn.py index 43d2dec23ff..4d6999d6641 100644 --- a/test/units/module_utils/basic/test_deprecate_warn.py +++ b/test/units/module_utils/basic/test_deprecate_warn.py @@ -4,16 +4,9 @@ # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) import json -import sys -from collections import MutableMapping -from io import BytesIO import pytest -import ansible.module_utils.basic -from ansible.module_utils.six import PY3, string_types -from ansible.module_utils._text import to_bytes - @pytest.mark.parametrize('stdin', [{}], indirect=['stdin']) def test_warn(am, capfd): diff --git a/test/units/module_utils/basic/test_dict_converters.py b/test/units/module_utils/basic/test_dict_converters.py new file mode 100644 index 00000000000..f63ed9c6edc --- /dev/null +++ b/test/units/module_utils/basic/test_dict_converters.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +# (c) 2012-2014, Michael DeHaan +# (c) 2016 Toshio Kuratomi +# (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +from units.mock.procenv import ModuleTestCase + +from ansible.module_utils.six.moves import builtins + +realimport = builtins.__import__ + + +class TestTextifyContainers(ModuleTestCase): + def test_module_utils_basic_json_dict_converters(self): + from ansible.module_utils.basic import json_dict_unicode_to_bytes, json_dict_bytes_to_unicode + + test_data = dict( + item1=u"Fóo", + item2=[u"Bár", u"Bam"], + item3=dict(sub1=u"Súb"), + item4=(u"föo", u"bär", u"©"), + item5=42, + ) + res = json_dict_unicode_to_bytes(test_data) + res2 = json_dict_bytes_to_unicode(res) + + self.assertEqual(test_data, res2) diff --git a/test/units/module_utils/basic/test_exit_json.py b/test/units/module_utils/basic/test_exit_json.py index 234a1cc1130..cf63b98768a 100644 --- a/test/units/module_utils/basic/test_exit_json.py +++ b/test/units/module_utils/basic/test_exit_json.py @@ -7,8 +7,6 @@ from __future__ import (absolute_import, division) __metaclass__ = type import json -import sys -from itertools import chain import pytest @@ -103,7 +101,7 @@ class TestAnsibleModuleExitValuesRemoved: for s, r, e in DATA), # pylint: disable=undefined-variable indirect=['am', 'stdin']) def test_exit_json_removes_values(self, am, capfd, return_val, expected): - with pytest.raises(SystemExit) as ctx: + with pytest.raises(SystemExit): am.exit_json(**return_val) out, err = capfd.readouterr() @@ -116,7 +114,7 @@ class TestAnsibleModuleExitValuesRemoved: indirect=['am', 'stdin']) def test_fail_json_removes_values(self, am, capfd, return_val, expected): expected['failed'] = True - with pytest.raises(SystemExit) as ctx: + with pytest.raises(SystemExit): am.fail_json(**return_val) == expected out, err = capfd.readouterr() diff --git a/test/units/module_utils/basic/test_filesystem.py b/test/units/module_utils/basic/test_filesystem.py new file mode 100644 index 00000000000..abe52c14ea1 --- /dev/null +++ b/test/units/module_utils/basic/test_filesystem.py @@ -0,0 +1,136 @@ +# -*- coding: utf-8 -*- +# (c) 2012-2014, Michael DeHaan +# (c) 2016 Toshio Kuratomi +# (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +from units.mock.procenv import ModuleTestCase + +from ansible.compat.tests.mock import patch, MagicMock +from ansible.module_utils.six.moves import builtins + +realimport = builtins.__import__ + + +class TestOtherFilesystem(ModuleTestCase): + def test_module_utils_basic_ansible_module_user_and_group(self): + from ansible.module_utils import basic + basic._ANSIBLE_ARGS = None + + am = basic.AnsibleModule( + argument_spec=dict(), + ) + + mock_stat = MagicMock() + mock_stat.st_uid = 0 + mock_stat.st_gid = 0 + + with patch('os.lstat', return_value=mock_stat): + self.assertEqual(am.user_and_group('/path/to/file'), (0, 0)) + + def test_module_utils_basic_ansible_module_find_mount_point(self): + from ansible.module_utils import basic + basic._ANSIBLE_ARGS = None + + am = basic.AnsibleModule( + argument_spec=dict(), + ) + + def _mock_ismount(path): + if path == b'/': + return True + return False + + with patch('os.path.ismount', side_effect=_mock_ismount): + self.assertEqual(am.find_mount_point('/root/fs/../mounted/path/to/whatever'), '/') + + def _mock_ismount(path): + if path == b'/subdir/mount': + return True + if path == b'/': + return True + return False + + with patch('os.path.ismount', side_effect=_mock_ismount): + self.assertEqual(am.find_mount_point('/subdir/mount/path/to/whatever'), '/subdir/mount') + + def test_module_utils_basic_ansible_module_set_owner_if_different(self): + from ansible.module_utils import basic + basic._ANSIBLE_ARGS = None + + am = basic.AnsibleModule( + argument_spec=dict(), + ) + + self.assertEqual(am.set_owner_if_different('/path/to/file', None, True), True) + self.assertEqual(am.set_owner_if_different('/path/to/file', None, False), False) + + am.user_and_group = MagicMock(return_value=(500, 500)) + + with patch('os.lchown', return_value=None) as m: + self.assertEqual(am.set_owner_if_different('/path/to/file', 0, False), True) + m.assert_called_with(b'/path/to/file', 0, -1) + + def _mock_getpwnam(*args, **kwargs): + mock_pw = MagicMock() + mock_pw.pw_uid = 0 + return mock_pw + + m.reset_mock() + with patch('pwd.getpwnam', side_effect=_mock_getpwnam): + self.assertEqual(am.set_owner_if_different('/path/to/file', 'root', False), True) + m.assert_called_with(b'/path/to/file', 0, -1) + + with patch('pwd.getpwnam', side_effect=KeyError): + self.assertRaises(SystemExit, am.set_owner_if_different, '/path/to/file', 'root', False) + + m.reset_mock() + am.check_mode = True + self.assertEqual(am.set_owner_if_different('/path/to/file', 0, False), True) + self.assertEqual(m.called, False) + am.check_mode = False + + with patch('os.lchown', side_effect=OSError) as m: + self.assertRaises(SystemExit, am.set_owner_if_different, '/path/to/file', 'root', False) + + def test_module_utils_basic_ansible_module_set_group_if_different(self): + from ansible.module_utils import basic + basic._ANSIBLE_ARGS = None + + am = basic.AnsibleModule( + argument_spec=dict(), + ) + + self.assertEqual(am.set_group_if_different('/path/to/file', None, True), True) + self.assertEqual(am.set_group_if_different('/path/to/file', None, False), False) + + am.user_and_group = MagicMock(return_value=(500, 500)) + + with patch('os.lchown', return_value=None) as m: + self.assertEqual(am.set_group_if_different('/path/to/file', 0, False), True) + m.assert_called_with(b'/path/to/file', -1, 0) + + def _mock_getgrnam(*args, **kwargs): + mock_gr = MagicMock() + mock_gr.gr_gid = 0 + return mock_gr + + m.reset_mock() + with patch('grp.getgrnam', side_effect=_mock_getgrnam): + self.assertEqual(am.set_group_if_different('/path/to/file', 'root', False), True) + m.assert_called_with(b'/path/to/file', -1, 0) + + with patch('grp.getgrnam', side_effect=KeyError): + self.assertRaises(SystemExit, am.set_group_if_different, '/path/to/file', 'root', False) + + m.reset_mock() + am.check_mode = True + self.assertEqual(am.set_group_if_different('/path/to/file', 0, False), True) + self.assertEqual(m.called, False) + am.check_mode = False + + with patch('os.lchown', side_effect=OSError) as m: + self.assertRaises(SystemExit, am.set_group_if_different, '/path/to/file', 'root', False) diff --git a/test/units/module_utils/basic/test_get_module_path.py b/test/units/module_utils/basic/test_get_module_path.py new file mode 100644 index 00000000000..641aed9a3cd --- /dev/null +++ b/test/units/module_utils/basic/test_get_module_path.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- +# (c) 2012-2014, Michael DeHaan +# (c) 2016 Toshio Kuratomi +# (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +from units.mock.procenv import ModuleTestCase + +from ansible.compat.tests.mock import patch +from ansible.module_utils.six.moves import builtins + +realimport = builtins.__import__ + + +class TestGetModulePath(ModuleTestCase): + def test_module_utils_basic_get_module_path(self): + from ansible.module_utils.basic import get_module_path + with patch('os.path.realpath', return_value='/path/to/foo/'): + self.assertEqual(get_module_path(), '/path/to/foo') diff --git a/test/units/module_utils/basic/test_heuristic_log_sanitize.py b/test/units/module_utils/basic/test_heuristic_log_sanitize.py index 234ae0ca5d7..19345bd4b24 100644 --- a/test/units/module_utils/basic/test_heuristic_log_sanitize.py +++ b/test/units/module_utils/basic/test_heuristic_log_sanitize.py @@ -20,11 +20,7 @@ from __future__ import (absolute_import, division) __metaclass__ = type -import sys -import syslog - from ansible.compat.tests import unittest -from ansible.compat.tests.mock import patch, MagicMock from ansible.module_utils.basic import heuristic_log_sanitize diff --git a/test/units/module_utils/basic/test_imports.py b/test/units/module_utils/basic/test_imports.py new file mode 100644 index 00000000000..9f12fa6f37a --- /dev/null +++ b/test/units/module_utils/basic/test_imports.py @@ -0,0 +1,132 @@ +# -*- coding: utf-8 -*- +# (c) 2012-2014, Michael DeHaan +# (c) 2016 Toshio Kuratomi +# (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +import sys + +from units.mock.procenv import ModuleTestCase + +from ansible.compat.tests import unittest +from ansible.compat.tests.mock import patch, MagicMock +from ansible.module_utils.six.moves import builtins + +realimport = builtins.__import__ + + +class TestImports(ModuleTestCase): + + def clear_modules(self, mods): + for mod in mods: + if mod in sys.modules: + del sys.modules[mod] + + @patch.object(builtins, '__import__') + def test_module_utils_basic_import_syslog(self, mock_import): + def _mock_import(name, *args, **kwargs): + if name == 'syslog': + raise ImportError + return realimport(name, *args, **kwargs) + + self.clear_modules(['syslog', 'ansible.module_utils.basic']) + mod = builtins.__import__('ansible.module_utils.basic') + self.assertTrue(mod.module_utils.basic.HAS_SYSLOG) + + self.clear_modules(['syslog', 'ansible.module_utils.basic']) + mock_import.side_effect = _mock_import + mod = builtins.__import__('ansible.module_utils.basic') + self.assertFalse(mod.module_utils.basic.HAS_SYSLOG) + + @patch.object(builtins, '__import__') + def test_module_utils_basic_import_selinux(self, mock_import): + def _mock_import(name, *args, **kwargs): + if name == 'selinux': + raise ImportError + return realimport(name, *args, **kwargs) + + try: + self.clear_modules(['selinux', 'ansible.module_utils.basic']) + mod = builtins.__import__('ansible.module_utils.basic') + self.assertTrue(mod.module_utils.basic.HAVE_SELINUX) + except ImportError: + # no selinux on test system, so skip + pass + + self.clear_modules(['selinux', 'ansible.module_utils.basic']) + mock_import.side_effect = _mock_import + mod = builtins.__import__('ansible.module_utils.basic') + self.assertFalse(mod.module_utils.basic.HAVE_SELINUX) + + @patch.object(builtins, '__import__') + def test_module_utils_basic_import_json(self, mock_import): + def _mock_import(name, *args, **kwargs): + if name == 'json': + raise ImportError + elif name == 'simplejson': + sj = MagicMock() + sj.__version__ = '3.10.0' + return sj + return realimport(name, *args, **kwargs) + + self.clear_modules(['json', 'ansible.module_utils.basic']) + mod = builtins.__import__('ansible.module_utils.basic') + + self.clear_modules(['json', 'ansible.module_utils.basic']) + mock_import.side_effect = _mock_import + mod = builtins.__import__('ansible.module_utils.basic') + + # FIXME: doesn't work yet + # @patch.object(builtins, 'bytes') + # def test_module_utils_basic_bytes(self, mock_bytes): + # mock_bytes.side_effect = NameError() + # from ansible.module_utils import basic + + @patch.object(builtins, '__import__') + @unittest.skipIf(sys.version_info[0] >= 3, "literal_eval is available in every version of Python3") + def test_module_utils_basic_import_literal_eval(self, mock_import): + def _mock_import(name, *args, **kwargs): + try: + fromlist = kwargs.get('fromlist', args[2]) + except IndexError: + fromlist = [] + if name == 'ast' and 'literal_eval' in fromlist: + raise ImportError + return realimport(name, *args, **kwargs) + + mock_import.side_effect = _mock_import + self.clear_modules(['ast', 'ansible.module_utils.basic']) + mod = builtins.__import__('ansible.module_utils.basic') + self.assertEqual(mod.module_utils.basic.literal_eval("'1'"), "1") + self.assertEqual(mod.module_utils.basic.literal_eval("1"), 1) + self.assertEqual(mod.module_utils.basic.literal_eval("-1"), -1) + self.assertEqual(mod.module_utils.basic.literal_eval("(1,2,3)"), (1, 2, 3)) + self.assertEqual(mod.module_utils.basic.literal_eval("[1]"), [1]) + self.assertEqual(mod.module_utils.basic.literal_eval("True"), True) + self.assertEqual(mod.module_utils.basic.literal_eval("False"), False) + self.assertEqual(mod.module_utils.basic.literal_eval("None"), None) + # self.assertEqual(mod.module_utils.basic.literal_eval('{"a": 1}'), dict(a=1)) + self.assertRaises(ValueError, mod.module_utils.basic.literal_eval, "asdfasdfasdf") + + @patch.object(builtins, '__import__') + def test_module_utils_basic_import_systemd_journal(self, mock_import): + def _mock_import(name, *args, **kwargs): + try: + fromlist = kwargs.get('fromlist', args[2]) + except IndexError: + fromlist = [] + if name == 'systemd' and 'journal' in fromlist: + raise ImportError + return realimport(name, *args, **kwargs) + + self.clear_modules(['systemd', 'ansible.module_utils.basic']) + mod = builtins.__import__('ansible.module_utils.basic') + self.assertTrue(mod.module_utils.basic.has_journal) + + self.clear_modules(['systemd', 'ansible.module_utils.basic']) + mock_import.side_effect = _mock_import + mod = builtins.__import__('ansible.module_utils.basic') + self.assertFalse(mod.module_utils.basic.has_journal) diff --git a/test/units/module_utils/basic/test_log.py b/test/units/module_utils/basic/test_log.py index 80654bb4678..6b5c258f6c1 100644 --- a/test/units/module_utils/basic/test_log.py +++ b/test/units/module_utils/basic/test_log.py @@ -6,8 +6,6 @@ from __future__ import absolute_import, division, print_function __metaclass__ = type -import sys -import json import syslog from itertools import product @@ -16,15 +14,6 @@ import pytest import ansible.module_utils.basic from ansible.module_utils.six import PY3 -try: - # Python 3.4+ - from importlib import reload -except ImportError: - # Python 2 has reload as a builtin - - # Ignoring python3.0-3.3 (those have imp.reload if we decide we care) - pass - class TestAnsibleModuleLogSmokeTest: DATA = [u'Text string', u'Toshio くらとみ non-ascii test'] diff --git a/test/units/module_utils/basic/test_no_log.py b/test/units/module_utils/basic/test_no_log.py index 0c00a64cf52..ebd66a37567 100644 --- a/test/units/module_utils/basic/test_no_log.py +++ b/test/units/module_utils/basic/test_no_log.py @@ -1,34 +1,13 @@ # -*- coding: utf-8 -*- # (c) 2015, Toshio Kuratomi -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . +# (c) 2017, Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) -# Make coding more python3-ish -from __future__ import (absolute_import, division) +from __future__ import absolute_import, division, print_function __metaclass__ = type -import json -import sys -import syslog - from ansible.compat.tests import unittest -from ansible.compat.tests.mock import patch, MagicMock -from ansible.module_utils import basic -from ansible.module_utils.basic import heuristic_log_sanitize from ansible.module_utils.basic import return_values, remove_values diff --git a/test/units/module_utils/basic/test_platform_distribution.py b/test/units/module_utils/basic/test_platform_distribution.py new file mode 100644 index 00000000000..a1a00b2bb90 --- /dev/null +++ b/test/units/module_utils/basic/test_platform_distribution.py @@ -0,0 +1,108 @@ +# -*- coding: utf-8 -*- +# (c) 2012-2014, Michael DeHaan +# (c) 2016 Toshio Kuratomi +# (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +from units.mock.procenv import ModuleTestCase + +from ansible.compat.tests.mock import patch +from ansible.module_utils.six.moves import builtins + +realimport = builtins.__import__ + + +class TestPlatform(ModuleTestCase): + def test_module_utils_basic_get_platform(self): + with patch('platform.system', return_value='foo'): + from ansible.module_utils.basic import get_platform + self.assertEqual(get_platform(), 'foo') + + def test_module_utils_basic_get_distribution(self): + from ansible.module_utils.basic import get_distribution + + with patch('platform.system', return_value='Foo'): + self.assertEqual(get_distribution(), None) + + with patch('platform.system', return_value='Linux'): + with patch('platform.linux_distribution', return_value=["foo"]): + self.assertEqual(get_distribution(), "Foo") + + with patch('os.path.isfile', return_value=True): + with patch('platform.linux_distribution', side_effect=[("AmazonFooBar", )]): + self.assertEqual(get_distribution(), "Amazonfoobar") + + with patch('platform.linux_distribution', side_effect=(("", ), ("AmazonFooBam",))): + self.assertEqual(get_distribution(), "Amazon") + + with patch('platform.linux_distribution', side_effect=[("", ), ("", )]): + self.assertEqual(get_distribution(), "OtherLinux") + + def _dist(distname='', version='', id='', supported_dists=(), full_distribution_name=1): + if supported_dists != (): + return ("Bar", "2", "Two") + else: + return ("", "", "") + + with patch('platform.linux_distribution', side_effect=_dist): + self.assertEqual(get_distribution(), "Bar") + + with patch('platform.linux_distribution', side_effect=Exception("boo")): + with patch('platform.dist', return_value=("bar", "2", "Two")): + self.assertEqual(get_distribution(), "Bar") + + def test_module_utils_basic_get_distribution_version(self): + from ansible.module_utils.basic import get_distribution_version + + with patch('platform.system', return_value='Foo'): + self.assertEqual(get_distribution_version(), None) + + with patch('platform.system', return_value='Linux'): + with patch('platform.linux_distribution', return_value=("foo", "1", "One")): + self.assertEqual(get_distribution_version(), "1") + + with patch('os.path.isfile', return_value=True): + def _dist(distname='', version='', id='', supported_dists=(), full_distribution_name=1): + if supported_dists != (): + return ("AmazonFooBar", "2", "") + else: + return ("", "", "") + + with patch('platform.linux_distribution', side_effect=_dist): + self.assertEqual(get_distribution_version(), "2") + + with patch('platform.linux_distribution', side_effect=Exception("boo")): + with patch('platform.dist', return_value=("bar", "3", "Three")): + self.assertEqual(get_distribution_version(), "3") + + def test_module_utils_basic_load_platform_subclass(self): + class LinuxTest: + pass + + class Foo(LinuxTest): + platform = "Linux" + distribution = None + + class Bar(LinuxTest): + platform = "Linux" + distribution = "Bar" + + from ansible.module_utils.basic import load_platform_subclass + + # match just the platform class, not a specific distribution + with patch('ansible.module_utils.basic.get_platform', return_value="Linux"): + with patch('ansible.module_utils.basic.get_distribution', return_value=None): + self.assertIs(type(load_platform_subclass(LinuxTest)), Foo) + + # match both the distribution and platform class + with patch('ansible.module_utils.basic.get_platform', return_value="Linux"): + with patch('ansible.module_utils.basic.get_distribution', return_value="Bar"): + self.assertIs(type(load_platform_subclass(LinuxTest)), Bar) + + # if neither match, the fallback should be the top-level class + with patch('ansible.module_utils.basic.get_platform', return_value="Foo"): + with patch('ansible.module_utils.basic.get_distribution', return_value=None): + self.assertIs(type(load_platform_subclass(LinuxTest)), LinuxTest) diff --git a/test/units/module_utils/basic/test_selinux.py b/test/units/module_utils/basic/test_selinux.py new file mode 100644 index 00000000000..c95d9acc97b --- /dev/null +++ b/test/units/module_utils/basic/test_selinux.py @@ -0,0 +1,252 @@ +# -*- coding: utf-8 -*- +# (c) 2012-2014, Michael DeHaan +# (c) 2016 Toshio Kuratomi +# (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +import errno +import json + +from units.mock.procenv import ModuleTestCase, swap_stdin_and_argv + +from ansible.compat.tests.mock import patch, MagicMock, mock_open, Mock +from ansible.module_utils.six.moves import builtins + +realimport = builtins.__import__ + + +class TestSELinux(ModuleTestCase): + def test_module_utils_basic_ansible_module_selinux_mls_enabled(self): + from ansible.module_utils import basic + basic._ANSIBLE_ARGS = None + + am = basic.AnsibleModule( + argument_spec=dict(), + ) + + basic.HAVE_SELINUX = False + self.assertEqual(am.selinux_mls_enabled(), False) + + basic.HAVE_SELINUX = True + basic.selinux = Mock() + with patch.dict('sys.modules', {'selinux': basic.selinux}): + with patch('selinux.is_selinux_mls_enabled', return_value=0): + self.assertEqual(am.selinux_mls_enabled(), False) + with patch('selinux.is_selinux_mls_enabled', return_value=1): + self.assertEqual(am.selinux_mls_enabled(), True) + delattr(basic, 'selinux') + + def test_module_utils_basic_ansible_module_selinux_initial_context(self): + from ansible.module_utils import basic + basic._ANSIBLE_ARGS = None + + am = basic.AnsibleModule( + argument_spec=dict(), + ) + + am.selinux_mls_enabled = MagicMock() + am.selinux_mls_enabled.return_value = False + self.assertEqual(am.selinux_initial_context(), [None, None, None]) + am.selinux_mls_enabled.return_value = True + self.assertEqual(am.selinux_initial_context(), [None, None, None, None]) + + def test_module_utils_basic_ansible_module_selinux_enabled(self): + from ansible.module_utils import basic + basic._ANSIBLE_ARGS = None + + am = basic.AnsibleModule( + argument_spec=dict(), + ) + + # we first test the cases where the python selinux lib is + # not installed, which has two paths: one in which the system + # does have selinux installed (and the selinuxenabled command + # is present and returns 0 when run), or selinux is not installed + basic.HAVE_SELINUX = False + am.get_bin_path = MagicMock() + am.get_bin_path.return_value = '/path/to/selinuxenabled' + am.run_command = MagicMock() + am.run_command.return_value = (0, '', '') + self.assertRaises(SystemExit, am.selinux_enabled) + am.get_bin_path.return_value = None + self.assertEqual(am.selinux_enabled(), False) + + # finally we test the case where the python selinux lib is installed, + # and both possibilities there (enabled vs. disabled) + basic.HAVE_SELINUX = True + basic.selinux = Mock() + with patch.dict('sys.modules', {'selinux': basic.selinux}): + with patch('selinux.is_selinux_enabled', return_value=0): + self.assertEqual(am.selinux_enabled(), False) + with patch('selinux.is_selinux_enabled', return_value=1): + self.assertEqual(am.selinux_enabled(), True) + delattr(basic, 'selinux') + + def test_module_utils_basic_ansible_module_selinux_default_context(self): + from ansible.module_utils import basic + basic._ANSIBLE_ARGS = None + + am = basic.AnsibleModule( + argument_spec=dict(), + ) + + am.selinux_initial_context = MagicMock(return_value=[None, None, None, None]) + am.selinux_enabled = MagicMock(return_value=True) + + # we first test the cases where the python selinux lib is not installed + basic.HAVE_SELINUX = False + self.assertEqual(am.selinux_default_context(path='/foo/bar'), [None, None, None, None]) + + # all following tests assume the python selinux bindings are installed + basic.HAVE_SELINUX = True + + basic.selinux = Mock() + + with patch.dict('sys.modules', {'selinux': basic.selinux}): + # next, we test with a mocked implementation of selinux.matchpathcon to simulate + # an actual context being found + with patch('selinux.matchpathcon', return_value=[0, 'unconfined_u:object_r:default_t:s0']): + self.assertEqual(am.selinux_default_context(path='/foo/bar'), ['unconfined_u', 'object_r', 'default_t', 's0']) + + # we also test the case where matchpathcon returned a failure + with patch('selinux.matchpathcon', return_value=[-1, '']): + self.assertEqual(am.selinux_default_context(path='/foo/bar'), [None, None, None, None]) + + # finally, we test where an OSError occurred during matchpathcon's call + with patch('selinux.matchpathcon', side_effect=OSError): + self.assertEqual(am.selinux_default_context(path='/foo/bar'), [None, None, None, None]) + + delattr(basic, 'selinux') + + def test_module_utils_basic_ansible_module_selinux_context(self): + from ansible.module_utils import basic + basic._ANSIBLE_ARGS = None + + am = basic.AnsibleModule( + argument_spec=dict(), + ) + + am.selinux_initial_context = MagicMock(return_value=[None, None, None, None]) + am.selinux_enabled = MagicMock(return_value=True) + + # we first test the cases where the python selinux lib is not installed + basic.HAVE_SELINUX = False + self.assertEqual(am.selinux_context(path='/foo/bar'), [None, None, None, None]) + + # all following tests assume the python selinux bindings are installed + basic.HAVE_SELINUX = True + + basic.selinux = Mock() + + with patch.dict('sys.modules', {'selinux': basic.selinux}): + # next, we test with a mocked implementation of selinux.lgetfilecon_raw to simulate + # an actual context being found + with patch('selinux.lgetfilecon_raw', return_value=[0, 'unconfined_u:object_r:default_t:s0']): + self.assertEqual(am.selinux_context(path='/foo/bar'), ['unconfined_u', 'object_r', 'default_t', 's0']) + + # we also test the case where matchpathcon returned a failure + with patch('selinux.lgetfilecon_raw', return_value=[-1, '']): + self.assertEqual(am.selinux_context(path='/foo/bar'), [None, None, None, None]) + + # finally, we test where an OSError occurred during matchpathcon's call + e = OSError() + e.errno = errno.ENOENT + with patch('selinux.lgetfilecon_raw', side_effect=e): + self.assertRaises(SystemExit, am.selinux_context, path='/foo/bar') + + e = OSError() + with patch('selinux.lgetfilecon_raw', side_effect=e): + self.assertRaises(SystemExit, am.selinux_context, path='/foo/bar') + + delattr(basic, 'selinux') + + def test_module_utils_basic_ansible_module_is_special_selinux_path(self): + from ansible.module_utils import basic + + args = json.dumps(dict(ANSIBLE_MODULE_ARGS={'_ansible_selinux_special_fs': "nfs,nfsd,foos"})) + + with swap_stdin_and_argv(stdin_data=args): + basic._ANSIBLE_ARGS = None + am = basic.AnsibleModule( + argument_spec=dict(), + ) + + def _mock_find_mount_point(path): + if path.startswith('/some/path'): + return '/some/path' + elif path.startswith('/weird/random/fstype'): + return '/weird/random/fstype' + return '/' + + am.find_mount_point = MagicMock(side_effect=_mock_find_mount_point) + am.selinux_context = MagicMock(return_value=['foo_u', 'foo_r', 'foo_t', 's0']) + + m = mock_open() + m.side_effect = OSError + + with patch.object(builtins, 'open', m, create=True): + self.assertEqual(am.is_special_selinux_path('/some/path/that/should/be/nfs'), (False, None)) + + mount_data = [ + '/dev/disk1 / ext4 rw,seclabel,relatime,data=ordered 0 0\n', + '1.1.1.1:/path/to/nfs /some/path nfs ro 0 0\n', + 'whatever /weird/random/fstype foos rw 0 0\n', + ] + + # mock_open has a broken readlines() implementation apparently... + # this should work by default but doesn't, so we fix it + m = mock_open(read_data=''.join(mount_data)) + m.return_value.readlines.return_value = mount_data + + with patch.object(builtins, 'open', m, create=True): + self.assertEqual(am.is_special_selinux_path('/some/random/path'), (False, None)) + self.assertEqual(am.is_special_selinux_path('/some/path/that/should/be/nfs'), (True, ['foo_u', 'foo_r', 'foo_t', 's0'])) + self.assertEqual(am.is_special_selinux_path('/weird/random/fstype/path'), (True, ['foo_u', 'foo_r', 'foo_t', 's0'])) + + def test_module_utils_basic_ansible_module_set_context_if_different(self): + from ansible.module_utils import basic + basic._ANSIBLE_ARGS = None + + am = basic.AnsibleModule( + argument_spec=dict(), + ) + + basic.HAVE_SELINUX = False + + am.selinux_enabled = MagicMock(return_value=False) + self.assertEqual(am.set_context_if_different('/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], True), True) + self.assertEqual(am.set_context_if_different('/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], False), False) + + basic.HAVE_SELINUX = True + + am.selinux_enabled = MagicMock(return_value=True) + am.selinux_context = MagicMock(return_value=['bar_u', 'bar_r', None, None]) + am.is_special_selinux_path = MagicMock(return_value=(False, None)) + + basic.selinux = Mock() + with patch.dict('sys.modules', {'selinux': basic.selinux}): + with patch('selinux.lsetfilecon', return_value=0) as m: + self.assertEqual(am.set_context_if_different('/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], False), True) + m.assert_called_with('/path/to/file', 'foo_u:foo_r:foo_t:s0') + m.reset_mock() + am.check_mode = True + self.assertEqual(am.set_context_if_different('/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], False), True) + self.assertEqual(m.called, False) + am.check_mode = False + + with patch('selinux.lsetfilecon', return_value=1) as m: + self.assertRaises(SystemExit, am.set_context_if_different, '/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], True) + + with patch('selinux.lsetfilecon', side_effect=OSError) as m: + self.assertRaises(SystemExit, am.set_context_if_different, '/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], True) + + am.is_special_selinux_path = MagicMock(return_value=(True, ['sp_u', 'sp_r', 'sp_t', 's0'])) + + with patch('selinux.lsetfilecon', return_value=0) as m: + self.assertEqual(am.set_context_if_different('/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], False), True) + m.assert_called_with('/path/to/file', 'sp_u:sp_r:sp_t:s0') + + delattr(basic, 'selinux') diff --git a/test/units/module_utils/test_basic.py b/test/units/module_utils/test_basic.py deleted file mode 100644 index 4df155bc084..00000000000 --- a/test/units/module_utils/test_basic.py +++ /dev/null @@ -1,1275 +0,0 @@ -# -*- coding: utf-8 -*- -# (c) 2012-2014, Michael DeHaan -# (c) 2016 Toshio Kuratomi -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . - -# Make coding more python3-ish -from __future__ import (absolute_import, division) -__metaclass__ = type - -import errno -import json -import os -import sys -from io import BytesIO, StringIO - -from units.mock.procenv import ModuleTestCase, swap_stdin_and_argv - -from ansible.compat.tests import unittest -from ansible.compat.tests.mock import patch, MagicMock, mock_open, Mock, call -from ansible.module_utils.six.moves import builtins - -realimport = builtins.__import__ - - -class TestModuleUtilsBasic(ModuleTestCase): - - def clear_modules(self, mods): - for mod in mods: - if mod in sys.modules: - del sys.modules[mod] - - @patch.object(builtins, '__import__') - def test_module_utils_basic_import_syslog(self, mock_import): - def _mock_import(name, *args, **kwargs): - if name == 'syslog': - raise ImportError - return realimport(name, *args, **kwargs) - - self.clear_modules(['syslog', 'ansible.module_utils.basic']) - mod = builtins.__import__('ansible.module_utils.basic') - self.assertTrue(mod.module_utils.basic.HAS_SYSLOG) - - self.clear_modules(['syslog', 'ansible.module_utils.basic']) - mock_import.side_effect = _mock_import - mod = builtins.__import__('ansible.module_utils.basic') - self.assertFalse(mod.module_utils.basic.HAS_SYSLOG) - - @patch.object(builtins, '__import__') - def test_module_utils_basic_import_selinux(self, mock_import): - def _mock_import(name, *args, **kwargs): - if name == 'selinux': - raise ImportError - return realimport(name, *args, **kwargs) - - try: - self.clear_modules(['selinux', 'ansible.module_utils.basic']) - mod = builtins.__import__('ansible.module_utils.basic') - self.assertTrue(mod.module_utils.basic.HAVE_SELINUX) - except ImportError: - # no selinux on test system, so skip - pass - - self.clear_modules(['selinux', 'ansible.module_utils.basic']) - mock_import.side_effect = _mock_import - mod = builtins.__import__('ansible.module_utils.basic') - self.assertFalse(mod.module_utils.basic.HAVE_SELINUX) - - @patch.object(builtins, '__import__') - def test_module_utils_basic_import_json(self, mock_import): - def _mock_import(name, *args, **kwargs): - if name == 'json': - raise ImportError - elif name == 'simplejson': - sj = MagicMock() - sj.__version__ = '3.10.0' - return sj - return realimport(name, *args, **kwargs) - - self.clear_modules(['json', 'ansible.module_utils.basic']) - mod = builtins.__import__('ansible.module_utils.basic') - - self.clear_modules(['json', 'ansible.module_utils.basic']) - mock_import.side_effect = _mock_import - mod = builtins.__import__('ansible.module_utils.basic') - - # FIXME: doesn't work yet - # @patch.object(builtins, 'bytes') - # def test_module_utils_basic_bytes(self, mock_bytes): - # mock_bytes.side_effect = NameError() - # from ansible.module_utils import basic - - @patch.object(builtins, '__import__') - @unittest.skipIf(sys.version_info[0] >= 3, "literal_eval is available in every version of Python3") - def test_module_utils_basic_import_literal_eval(self, mock_import): - def _mock_import(name, *args, **kwargs): - try: - fromlist = kwargs.get('fromlist', args[2]) - except IndexError: - fromlist = [] - if name == 'ast' and 'literal_eval' in fromlist: - raise ImportError - return realimport(name, *args, **kwargs) - - mock_import.side_effect = _mock_import - self.clear_modules(['ast', 'ansible.module_utils.basic']) - mod = builtins.__import__('ansible.module_utils.basic') - self.assertEqual(mod.module_utils.basic.literal_eval("'1'"), "1") - self.assertEqual(mod.module_utils.basic.literal_eval("1"), 1) - self.assertEqual(mod.module_utils.basic.literal_eval("-1"), -1) - self.assertEqual(mod.module_utils.basic.literal_eval("(1,2,3)"), (1, 2, 3)) - self.assertEqual(mod.module_utils.basic.literal_eval("[1]"), [1]) - self.assertEqual(mod.module_utils.basic.literal_eval("True"), True) - self.assertEqual(mod.module_utils.basic.literal_eval("False"), False) - self.assertEqual(mod.module_utils.basic.literal_eval("None"), None) - # self.assertEqual(mod.module_utils.basic.literal_eval('{"a": 1}'), dict(a=1)) - self.assertRaises(ValueError, mod.module_utils.basic.literal_eval, "asdfasdfasdf") - - @patch.object(builtins, '__import__') - def test_module_utils_basic_import_systemd_journal(self, mock_import): - def _mock_import(name, *args, **kwargs): - try: - fromlist = kwargs.get('fromlist', args[2]) - except IndexError: - fromlist = [] - if name == 'systemd' and 'journal' in fromlist: - raise ImportError - return realimport(name, *args, **kwargs) - - self.clear_modules(['systemd', 'ansible.module_utils.basic']) - mod = builtins.__import__('ansible.module_utils.basic') - self.assertTrue(mod.module_utils.basic.has_journal) - - self.clear_modules(['systemd', 'ansible.module_utils.basic']) - mock_import.side_effect = _mock_import - mod = builtins.__import__('ansible.module_utils.basic') - self.assertFalse(mod.module_utils.basic.has_journal) - - def test_module_utils_basic_get_platform(self): - with patch('platform.system', return_value='foo'): - from ansible.module_utils.basic import get_platform - self.assertEqual(get_platform(), 'foo') - - def test_module_utils_basic_get_distribution(self): - from ansible.module_utils.basic import get_distribution - - with patch('platform.system', return_value='Foo'): - self.assertEqual(get_distribution(), None) - - with patch('platform.system', return_value='Linux'): - with patch('platform.linux_distribution', return_value=["foo"]): - self.assertEqual(get_distribution(), "Foo") - - with patch('os.path.isfile', return_value=True): - with patch('platform.linux_distribution', side_effect=[("AmazonFooBar", )]): - self.assertEqual(get_distribution(), "Amazonfoobar") - - with patch('platform.linux_distribution', side_effect=(("", ), ("AmazonFooBam",))): - self.assertEqual(get_distribution(), "Amazon") - - with patch('platform.linux_distribution', side_effect=[("", ), ("", )]): - self.assertEqual(get_distribution(), "OtherLinux") - - def _dist(distname='', version='', id='', supported_dists=(), full_distribution_name=1): - if supported_dists != (): - return ("Bar", "2", "Two") - else: - return ("", "", "") - - with patch('platform.linux_distribution', side_effect=_dist): - self.assertEqual(get_distribution(), "Bar") - - with patch('platform.linux_distribution', side_effect=Exception("boo")): - with patch('platform.dist', return_value=("bar", "2", "Two")): - self.assertEqual(get_distribution(), "Bar") - - def test_module_utils_basic_get_distribution_version(self): - from ansible.module_utils.basic import get_distribution_version - - with patch('platform.system', return_value='Foo'): - self.assertEqual(get_distribution_version(), None) - - with patch('platform.system', return_value='Linux'): - with patch('platform.linux_distribution', return_value=("foo", "1", "One")): - self.assertEqual(get_distribution_version(), "1") - - with patch('os.path.isfile', return_value=True): - def _dist(distname='', version='', id='', supported_dists=(), full_distribution_name=1): - if supported_dists != (): - return ("AmazonFooBar", "2", "") - else: - return ("", "", "") - - with patch('platform.linux_distribution', side_effect=_dist): - self.assertEqual(get_distribution_version(), "2") - - with patch('platform.linux_distribution', side_effect=Exception("boo")): - with patch('platform.dist', return_value=("bar", "3", "Three")): - self.assertEqual(get_distribution_version(), "3") - - def test_module_utils_basic_load_platform_subclass(self): - class LinuxTest: - pass - - class Foo(LinuxTest): - platform = "Linux" - distribution = None - - class Bar(LinuxTest): - platform = "Linux" - distribution = "Bar" - - from ansible.module_utils.basic import load_platform_subclass - - # match just the platform class, not a specific distribution - with patch('ansible.module_utils.basic.get_platform', return_value="Linux"): - with patch('ansible.module_utils.basic.get_distribution', return_value=None): - self.assertIs(type(load_platform_subclass(LinuxTest)), Foo) - - # match both the distribution and platform class - with patch('ansible.module_utils.basic.get_platform', return_value="Linux"): - with patch('ansible.module_utils.basic.get_distribution', return_value="Bar"): - self.assertIs(type(load_platform_subclass(LinuxTest)), Bar) - - # if neither match, the fallback should be the top-level class - with patch('ansible.module_utils.basic.get_platform', return_value="Foo"): - with patch('ansible.module_utils.basic.get_distribution', return_value=None): - self.assertIs(type(load_platform_subclass(LinuxTest)), LinuxTest) - - def test_module_utils_basic_json_dict_converters(self): - from ansible.module_utils.basic import json_dict_unicode_to_bytes, json_dict_bytes_to_unicode - - test_data = dict( - item1=u"Fóo", - item2=[u"Bár", u"Bam"], - item3=dict(sub1=u"Súb"), - item4=(u"föo", u"bär", u"©"), - item5=42, - ) - res = json_dict_unicode_to_bytes(test_data) - res2 = json_dict_bytes_to_unicode(res) - - self.assertEqual(test_data, res2) - - def test_module_utils_basic_get_module_path(self): - from ansible.module_utils.basic import get_module_path - with patch('os.path.realpath', return_value='/path/to/foo/'): - self.assertEqual(get_module_path(), '/path/to/foo') - - def test_module_utils_basic_ansible_module_creation(self): - from ansible.module_utils import basic - from ansible.module_utils.parsing.convert_bool import BOOLEANS - - am = basic.AnsibleModule( - argument_spec=dict(), - ) - - arg_spec = dict( - foo=dict(required=True), - bar=dict(), - bam=dict(), - baz=dict(fallback=(basic.env_fallback, ['BAZ'])), - bar1=dict(type='bool', choices=BOOLEANS) - ) - mut_ex = (('bar', 'bam'),) - req_to = (('bam', 'baz'),) - - # should test ok - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={"foo": "hello"})) - - with swap_stdin_and_argv(stdin_data=args): - basic._ANSIBLE_ARGS = None - am = basic.AnsibleModule( - argument_spec=arg_spec, - mutually_exclusive=mut_ex, - required_together=req_to, - no_log=True, - check_invalid_arguments=False, - add_file_common_args=True, - supports_check_mode=True, - ) - - # FIXME: add asserts here to verify the basic config - - # fail, because a required param was not specified - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={})) - - with swap_stdin_and_argv(stdin_data=args): - basic._ANSIBLE_ARGS = None - self.assertRaises( - SystemExit, - basic.AnsibleModule, - argument_spec=arg_spec, - mutually_exclusive=mut_ex, - required_together=req_to, - no_log=True, - check_invalid_arguments=False, - add_file_common_args=True, - supports_check_mode=True, - ) - - # fail because of mutually exclusive parameters - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={"foo": "hello", "bar": "bad", "bam": "bad"})) - - with swap_stdin_and_argv(stdin_data=args): - basic._ANSIBLE_ARGS = None - self.assertRaises( - SystemExit, - basic.AnsibleModule, - argument_spec=arg_spec, - mutually_exclusive=mut_ex, - required_together=req_to, - no_log=True, - check_invalid_arguments=False, - add_file_common_args=True, - supports_check_mode=True, - ) - - # fail because a param required due to another param was not specified - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={"bam": "bad"})) - - with swap_stdin_and_argv(stdin_data=args): - basic._ANSIBLE_ARGS = None - self.assertRaises( - SystemExit, - basic.AnsibleModule, - argument_spec=arg_spec, - mutually_exclusive=mut_ex, - required_together=req_to, - no_log=True, - check_invalid_arguments=False, - add_file_common_args=True, - supports_check_mode=True, - ) - - # test fallback, should test ok as value of param required due to another param - # is set by environment variable - os.environ['BAZ'] = 'bad' - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={"foo": "hello", "bam": "bad"})) - - with swap_stdin_and_argv(stdin_data=args): - basic._ANSIBLE_ARGS = None - am = basic.AnsibleModule( - argument_spec=arg_spec, - mutually_exclusive=mut_ex, - required_together=req_to, - no_log=True, - check_invalid_arguments=False, - add_file_common_args=True, - supports_check_mode=True, - ) - os.environ.pop('BAZ', None) - - # should test ok, check for boolean values - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={"foo": "hello", "bar1": "yes"})) - with swap_stdin_and_argv(stdin_data=args): - basic._ANSIBLE_ARGS = None - am = basic.AnsibleModule( - argument_spec=arg_spec, - mutually_exclusive=mut_ex, - required_together=req_to, - no_log=True, - check_invalid_arguments=False, - add_file_common_args=True, - supports_check_mode=True, - ) - - def test_module_utils_basic_ansible_module_with_options_creation(self): - from ansible.module_utils import basic - from ansible.module_utils.parsing.convert_bool import BOOLEANS - - options_spec = dict( - foo=dict(required=True, aliases=['dup']), - bar=dict(), - bam=dict(), - baz=dict(fallback=(basic.env_fallback, ['BAZ'])), - bam1=dict(), - bam2=dict(default='test'), - bam3=dict(type='bool', choices=BOOLEANS) - ) - arg_spec = dict( - foobar=dict( - type='list', - elements='dict', - options=options_spec, - mutually_exclusive=[ - ['bam', 'bam1'] - ], - required_if=[ - ['foo', 'hello', ['bam']], - ['foo', 'bam2', ['bam2']] - ], - required_one_of=[ - ['bar', 'bam'] - ], - required_together=[ - ['bam1', 'baz'] - ] - ) - ) - - # should test ok, tests basic foo requirement and required_if - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={'foobar': [{"foo": "hello", "bam": "good"}, {"foo": "test", "bar": "good"}]})) - with swap_stdin_and_argv(stdin_data=args): - basic._ANSIBLE_ARGS = None - am = basic.AnsibleModule( - argument_spec=arg_spec, - no_log=True, - check_invalid_arguments=False, - add_file_common_args=True, - supports_check_mode=True - ) - - # should test ok, handles aliases - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={'foobar': [{"dup": "test", "bar": "good"}]})) - with swap_stdin_and_argv(stdin_data=args): - basic._ANSIBLE_ARGS = None - am = basic.AnsibleModule( - argument_spec=arg_spec, - no_log=True, - check_invalid_arguments=False, - add_file_common_args=True, - supports_check_mode=True - ) - - # fail, because a required param was not specified - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={'foobar': [{}]})) - with swap_stdin_and_argv(stdin_data=args): - basic._ANSIBLE_ARGS = None - self.assertRaises( - SystemExit, - basic.AnsibleModule, - argument_spec=arg_spec, - no_log=True, - check_invalid_arguments=False, - add_file_common_args=True, - supports_check_mode=True - ) - - # fail because of mutually exclusive parameters (mutually_exclusive, baz is added as it is required_together with bam1) - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={'foobar': [{"foo": "test", "bam": "bad", "bam1": "bad", "baz": "req_to"}]})) - with swap_stdin_and_argv(stdin_data=args): - basic._ANSIBLE_ARGS = None - self.assertRaises( - SystemExit, - basic.AnsibleModule, - argument_spec=arg_spec, - no_log=True, - check_invalid_arguments=False, - add_file_common_args=True, - supports_check_mode=True - ) - - # fail because a param required if for foo=hello is missing (required_if) - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={'foobar': [{"foo": "hello", "bar": "bad"}]})) - with swap_stdin_and_argv(stdin_data=args): - basic._ANSIBLE_ARGS = None - self.assertRaises( - SystemExit, - basic.AnsibleModule, - argument_spec=arg_spec, - no_log=True, - check_invalid_arguments=False, - add_file_common_args=True, - supports_check_mode=True - ) - - # fail because one of param is required (required_one_of) - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={'foobar': [{"foo": "test"}]})) - with swap_stdin_and_argv(stdin_data=args): - basic._ANSIBLE_ARGS = None - self.assertRaises( - SystemExit, - basic.AnsibleModule, - argument_spec=arg_spec, - no_log=True, - check_invalid_arguments=False, - add_file_common_args=True, - supports_check_mode=True - ) - - # fail because one parameter requires another (required_together, bar is added for the required_one_of field) - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={'foobar': [{"foo": "test", "bar": "required_one_of", "bam1": "bad"}]})) - with swap_stdin_and_argv(stdin_data=args): - basic._ANSIBLE_ARGS = None - self.assertRaises( - SystemExit, - basic.AnsibleModule, - argument_spec=arg_spec, - no_log=True, - check_invalid_arguments=False, - add_file_common_args=True, - supports_check_mode=True - ) - - # should test ok, the required param is set by default from spec (required_if together with default value, bar added for required_one_of - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={'foobar': [{"foo": "bam2", "bar": "required_one_of"}]})) - with swap_stdin_and_argv(stdin_data=args): - basic._ANSIBLE_ARGS = None - am = basic.AnsibleModule( - argument_spec=arg_spec, - no_log=True, - check_invalid_arguments=False, - add_file_common_args=True, - supports_check_mode=True - ) - - # should test ok, for options in dict format. - arg_spec = dict(foobar=dict(type='dict', options=options_spec)) - - # should test ok - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={'foobar': {"foo": "hello"}})) - with swap_stdin_and_argv(stdin_data=args): - basic._ANSIBLE_ARGS = None - am = basic.AnsibleModule( - argument_spec=arg_spec, - no_log=True, - check_invalid_arguments=False, - add_file_common_args=True, - supports_check_mode=True - ) - - # should fail, check for invalid agrument - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={'foobar': {"foo1": "hello"}})) - with swap_stdin_and_argv(stdin_data=args): - basic._ANSIBLE_ARGS = None - self.assertRaises( - SystemExit, - basic.AnsibleModule, - argument_spec=arg_spec, - no_log=True, - check_invalid_arguments=True, - add_file_common_args=True, - supports_check_mode=True - ) - - # test fallback, should test ok as value of param required due to another param - # is set by environment variable - os.environ['BAZ'] = 'bad' - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={"foo": "hello", "bam": "bad"})) - - with swap_stdin_and_argv(stdin_data=args): - basic._ANSIBLE_ARGS = None - am = basic.AnsibleModule( - argument_spec=arg_spec, - no_log=True, - check_invalid_arguments=False, - add_file_common_args=True, - supports_check_mode=True, - ) - os.environ.pop('BAZ', None) - - # should test ok, check for boolean values - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={"foo": "hello", "bam3": "yes"})) - with swap_stdin_and_argv(stdin_data=args): - basic._ANSIBLE_ARGS = None - am = basic.AnsibleModule( - argument_spec=arg_spec, - no_log=True, - check_invalid_arguments=False, - add_file_common_args=True, - supports_check_mode=True, - ) - - def test_module_utils_basic_ansible_module_type_check(self): - from ansible.module_utils import basic - - arg_spec = dict( - foo=dict(type='float'), - foo2=dict(type='float'), - foo3=dict(type='float'), - bar=dict(type='int'), - bar2=dict(type='int'), - ) - - # should test ok - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={ - "foo": 123.0, # float - "foo2": 123, # int - "foo3": "123", # string - "bar": 123, # int - "bar2": "123", # string - })) - - with swap_stdin_and_argv(stdin_data=args): - basic._ANSIBLE_ARGS = None - am = basic.AnsibleModule( - argument_spec=arg_spec, - no_log=True, - check_invalid_arguments=False, - add_file_common_args=True, - supports_check_mode=True, - ) - - # fail, because bar does not accept floating point numbers - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={"bar": 123.0})) - - with swap_stdin_and_argv(stdin_data=args): - basic._ANSIBLE_ARGS = None - self.assertRaises( - SystemExit, - basic.AnsibleModule, - argument_spec=arg_spec, - no_log=True, - check_invalid_arguments=False, - add_file_common_args=True, - supports_check_mode=True, - ) - - def test_module_utils_basic_ansible_module_options_type_check(self): - from ansible.module_utils import basic - - options_spec = dict( - foo=dict(type='float'), - foo2=dict(type='float'), - foo3=dict(type='float'), - bar=dict(type='int'), - bar2=dict(type='int'), - ) - - arg_spec = dict(foobar=dict(type='list', elements='dict', options=options_spec)) - # should test ok - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={'foobar': [{ - "foo": 123.0, # float - "foo2": 123, # int - "foo3": "123", # string - "bar": 123, # int - "bar2": "123", # string - }]})) - - with swap_stdin_and_argv(stdin_data=args): - basic._ANSIBLE_ARGS = None - am = basic.AnsibleModule( - argument_spec=arg_spec, - no_log=True, - check_invalid_arguments=False, - add_file_common_args=True, - supports_check_mode=True, - ) - - # fail, because bar does not accept floating point numbers - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={'foobar': [{"bar": 123.0}]})) - - with swap_stdin_and_argv(stdin_data=args): - basic._ANSIBLE_ARGS = None - self.assertRaises( - SystemExit, - basic.AnsibleModule, - argument_spec=arg_spec, - no_log=True, - check_invalid_arguments=False, - add_file_common_args=True, - supports_check_mode=True, - ) - - def test_module_utils_basic_ansible_module_load_file_common_arguments(self): - from ansible.module_utils import basic - basic._ANSIBLE_ARGS = None - - am = basic.AnsibleModule( - argument_spec=dict(), - ) - - am.selinux_mls_enabled = MagicMock() - am.selinux_mls_enabled.return_value = True - am.selinux_default_context = MagicMock() - am.selinux_default_context.return_value = 'unconfined_u:object_r:default_t:s0'.split(':', 3) - - # with no params, the result should be an empty dict - res = am.load_file_common_arguments(params=dict()) - self.assertEqual(res, dict()) - - base_params = dict( - path='/path/to/file', - mode=0o600, - owner='root', - group='root', - seuser='_default', - serole='_default', - setype='_default', - selevel='_default', - ) - - extended_params = base_params.copy() - extended_params.update(dict( - follow=True, - foo='bar', - )) - - final_params = base_params.copy() - final_params.update(dict( - path='/path/to/real_file', - secontext=['unconfined_u', 'object_r', 'default_t', 's0'], - attributes=None, - )) - - # with the proper params specified, the returned dictionary should represent - # only those params which have something to do with the file arguments, excluding - # other params and updated as required with proper values which may have been - # massaged by the method - with patch('os.path.islink', return_value=True): - with patch('os.path.realpath', return_value='/path/to/real_file'): - res = am.load_file_common_arguments(params=extended_params) - self.assertEqual(res, final_params) - - def test_module_utils_basic_ansible_module_selinux_mls_enabled(self): - from ansible.module_utils import basic - basic._ANSIBLE_ARGS = None - - am = basic.AnsibleModule( - argument_spec=dict(), - ) - - basic.HAVE_SELINUX = False - self.assertEqual(am.selinux_mls_enabled(), False) - - basic.HAVE_SELINUX = True - basic.selinux = Mock() - with patch.dict('sys.modules', {'selinux': basic.selinux}): - with patch('selinux.is_selinux_mls_enabled', return_value=0): - self.assertEqual(am.selinux_mls_enabled(), False) - with patch('selinux.is_selinux_mls_enabled', return_value=1): - self.assertEqual(am.selinux_mls_enabled(), True) - delattr(basic, 'selinux') - - def test_module_utils_basic_ansible_module_selinux_initial_context(self): - from ansible.module_utils import basic - basic._ANSIBLE_ARGS = None - - am = basic.AnsibleModule( - argument_spec=dict(), - ) - - am.selinux_mls_enabled = MagicMock() - am.selinux_mls_enabled.return_value = False - self.assertEqual(am.selinux_initial_context(), [None, None, None]) - am.selinux_mls_enabled.return_value = True - self.assertEqual(am.selinux_initial_context(), [None, None, None, None]) - - def test_module_utils_basic_ansible_module_selinux_enabled(self): - from ansible.module_utils import basic - basic._ANSIBLE_ARGS = None - - am = basic.AnsibleModule( - argument_spec=dict(), - ) - - # we first test the cases where the python selinux lib is - # not installed, which has two paths: one in which the system - # does have selinux installed (and the selinuxenabled command - # is present and returns 0 when run), or selinux is not installed - basic.HAVE_SELINUX = False - am.get_bin_path = MagicMock() - am.get_bin_path.return_value = '/path/to/selinuxenabled' - am.run_command = MagicMock() - am.run_command.return_value = (0, '', '') - self.assertRaises(SystemExit, am.selinux_enabled) - am.get_bin_path.return_value = None - self.assertEqual(am.selinux_enabled(), False) - - # finally we test the case where the python selinux lib is installed, - # and both possibilities there (enabled vs. disabled) - basic.HAVE_SELINUX = True - basic.selinux = Mock() - with patch.dict('sys.modules', {'selinux': basic.selinux}): - with patch('selinux.is_selinux_enabled', return_value=0): - self.assertEqual(am.selinux_enabled(), False) - with patch('selinux.is_selinux_enabled', return_value=1): - self.assertEqual(am.selinux_enabled(), True) - delattr(basic, 'selinux') - - def test_module_utils_basic_ansible_module_selinux_default_context(self): - from ansible.module_utils import basic - basic._ANSIBLE_ARGS = None - - am = basic.AnsibleModule( - argument_spec=dict(), - ) - - am.selinux_initial_context = MagicMock(return_value=[None, None, None, None]) - am.selinux_enabled = MagicMock(return_value=True) - - # we first test the cases where the python selinux lib is not installed - basic.HAVE_SELINUX = False - self.assertEqual(am.selinux_default_context(path='/foo/bar'), [None, None, None, None]) - - # all following tests assume the python selinux bindings are installed - basic.HAVE_SELINUX = True - - basic.selinux = Mock() - - with patch.dict('sys.modules', {'selinux': basic.selinux}): - # next, we test with a mocked implementation of selinux.matchpathcon to simulate - # an actual context being found - with patch('selinux.matchpathcon', return_value=[0, 'unconfined_u:object_r:default_t:s0']): - self.assertEqual(am.selinux_default_context(path='/foo/bar'), ['unconfined_u', 'object_r', 'default_t', 's0']) - - # we also test the case where matchpathcon returned a failure - with patch('selinux.matchpathcon', return_value=[-1, '']): - self.assertEqual(am.selinux_default_context(path='/foo/bar'), [None, None, None, None]) - - # finally, we test where an OSError occurred during matchpathcon's call - with patch('selinux.matchpathcon', side_effect=OSError): - self.assertEqual(am.selinux_default_context(path='/foo/bar'), [None, None, None, None]) - - delattr(basic, 'selinux') - - def test_module_utils_basic_ansible_module_selinux_context(self): - from ansible.module_utils import basic - basic._ANSIBLE_ARGS = None - - am = basic.AnsibleModule( - argument_spec=dict(), - ) - - am.selinux_initial_context = MagicMock(return_value=[None, None, None, None]) - am.selinux_enabled = MagicMock(return_value=True) - - # we first test the cases where the python selinux lib is not installed - basic.HAVE_SELINUX = False - self.assertEqual(am.selinux_context(path='/foo/bar'), [None, None, None, None]) - - # all following tests assume the python selinux bindings are installed - basic.HAVE_SELINUX = True - - basic.selinux = Mock() - - with patch.dict('sys.modules', {'selinux': basic.selinux}): - # next, we test with a mocked implementation of selinux.lgetfilecon_raw to simulate - # an actual context being found - with patch('selinux.lgetfilecon_raw', return_value=[0, 'unconfined_u:object_r:default_t:s0']): - self.assertEqual(am.selinux_context(path='/foo/bar'), ['unconfined_u', 'object_r', 'default_t', 's0']) - - # we also test the case where matchpathcon returned a failure - with patch('selinux.lgetfilecon_raw', return_value=[-1, '']): - self.assertEqual(am.selinux_context(path='/foo/bar'), [None, None, None, None]) - - # finally, we test where an OSError occurred during matchpathcon's call - e = OSError() - e.errno = errno.ENOENT - with patch('selinux.lgetfilecon_raw', side_effect=e): - self.assertRaises(SystemExit, am.selinux_context, path='/foo/bar') - - e = OSError() - with patch('selinux.lgetfilecon_raw', side_effect=e): - self.assertRaises(SystemExit, am.selinux_context, path='/foo/bar') - - delattr(basic, 'selinux') - - def test_module_utils_basic_ansible_module_is_special_selinux_path(self): - from ansible.module_utils import basic - - args = json.dumps(dict(ANSIBLE_MODULE_ARGS={'_ansible_selinux_special_fs': "nfs,nfsd,foos"})) - - with swap_stdin_and_argv(stdin_data=args): - basic._ANSIBLE_ARGS = None - am = basic.AnsibleModule( - argument_spec=dict(), - ) - - def _mock_find_mount_point(path): - if path.startswith('/some/path'): - return '/some/path' - elif path.startswith('/weird/random/fstype'): - return '/weird/random/fstype' - return '/' - - am.find_mount_point = MagicMock(side_effect=_mock_find_mount_point) - am.selinux_context = MagicMock(return_value=['foo_u', 'foo_r', 'foo_t', 's0']) - - m = mock_open() - m.side_effect = OSError - - with patch.object(builtins, 'open', m, create=True): - self.assertEqual(am.is_special_selinux_path('/some/path/that/should/be/nfs'), (False, None)) - - mount_data = [ - '/dev/disk1 / ext4 rw,seclabel,relatime,data=ordered 0 0\n', - '1.1.1.1:/path/to/nfs /some/path nfs ro 0 0\n', - 'whatever /weird/random/fstype foos rw 0 0\n', - ] - - # mock_open has a broken readlines() implementation apparently... - # this should work by default but doesn't, so we fix it - m = mock_open(read_data=''.join(mount_data)) - m.return_value.readlines.return_value = mount_data - - with patch.object(builtins, 'open', m, create=True): - self.assertEqual(am.is_special_selinux_path('/some/random/path'), (False, None)) - self.assertEqual(am.is_special_selinux_path('/some/path/that/should/be/nfs'), (True, ['foo_u', 'foo_r', 'foo_t', 's0'])) - self.assertEqual(am.is_special_selinux_path('/weird/random/fstype/path'), (True, ['foo_u', 'foo_r', 'foo_t', 's0'])) - - def test_module_utils_basic_ansible_module_user_and_group(self): - from ansible.module_utils import basic - basic._ANSIBLE_ARGS = None - - am = basic.AnsibleModule( - argument_spec=dict(), - ) - - mock_stat = MagicMock() - mock_stat.st_uid = 0 - mock_stat.st_gid = 0 - - with patch('os.lstat', return_value=mock_stat): - self.assertEqual(am.user_and_group('/path/to/file'), (0, 0)) - - def test_module_utils_basic_ansible_module_find_mount_point(self): - from ansible.module_utils import basic - basic._ANSIBLE_ARGS = None - - am = basic.AnsibleModule( - argument_spec=dict(), - ) - - def _mock_ismount(path): - if path == b'/': - return True - return False - - with patch('os.path.ismount', side_effect=_mock_ismount): - self.assertEqual(am.find_mount_point('/root/fs/../mounted/path/to/whatever'), '/') - - def _mock_ismount(path): - if path == b'/subdir/mount': - return True - if path == b'/': - return True - return False - - with patch('os.path.ismount', side_effect=_mock_ismount): - self.assertEqual(am.find_mount_point('/subdir/mount/path/to/whatever'), '/subdir/mount') - - def test_module_utils_basic_ansible_module_set_context_if_different(self): - from ansible.module_utils import basic - basic._ANSIBLE_ARGS = None - - am = basic.AnsibleModule( - argument_spec=dict(), - ) - - basic.HAVE_SELINUX = False - - am.selinux_enabled = MagicMock(return_value=False) - self.assertEqual(am.set_context_if_different('/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], True), True) - self.assertEqual(am.set_context_if_different('/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], False), False) - - basic.HAVE_SELINUX = True - - am.selinux_enabled = MagicMock(return_value=True) - am.selinux_context = MagicMock(return_value=['bar_u', 'bar_r', None, None]) - am.is_special_selinux_path = MagicMock(return_value=(False, None)) - - basic.selinux = Mock() - with patch.dict('sys.modules', {'selinux': basic.selinux}): - with patch('selinux.lsetfilecon', return_value=0) as m: - self.assertEqual(am.set_context_if_different('/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], False), True) - m.assert_called_with('/path/to/file', 'foo_u:foo_r:foo_t:s0') - m.reset_mock() - am.check_mode = True - self.assertEqual(am.set_context_if_different('/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], False), True) - self.assertEqual(m.called, False) - am.check_mode = False - - with patch('selinux.lsetfilecon', return_value=1) as m: - self.assertRaises(SystemExit, am.set_context_if_different, '/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], True) - - with patch('selinux.lsetfilecon', side_effect=OSError) as m: - self.assertRaises(SystemExit, am.set_context_if_different, '/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], True) - - am.is_special_selinux_path = MagicMock(return_value=(True, ['sp_u', 'sp_r', 'sp_t', 's0'])) - - with patch('selinux.lsetfilecon', return_value=0) as m: - self.assertEqual(am.set_context_if_different('/path/to/file', ['foo_u', 'foo_r', 'foo_t', 's0'], False), True) - m.assert_called_with('/path/to/file', 'sp_u:sp_r:sp_t:s0') - - delattr(basic, 'selinux') - - def test_module_utils_basic_ansible_module_set_owner_if_different(self): - from ansible.module_utils import basic - basic._ANSIBLE_ARGS = None - - am = basic.AnsibleModule( - argument_spec=dict(), - ) - - self.assertEqual(am.set_owner_if_different('/path/to/file', None, True), True) - self.assertEqual(am.set_owner_if_different('/path/to/file', None, False), False) - - am.user_and_group = MagicMock(return_value=(500, 500)) - - with patch('os.lchown', return_value=None) as m: - self.assertEqual(am.set_owner_if_different('/path/to/file', 0, False), True) - m.assert_called_with(b'/path/to/file', 0, -1) - - def _mock_getpwnam(*args, **kwargs): - mock_pw = MagicMock() - mock_pw.pw_uid = 0 - return mock_pw - - m.reset_mock() - with patch('pwd.getpwnam', side_effect=_mock_getpwnam): - self.assertEqual(am.set_owner_if_different('/path/to/file', 'root', False), True) - m.assert_called_with(b'/path/to/file', 0, -1) - - with patch('pwd.getpwnam', side_effect=KeyError): - self.assertRaises(SystemExit, am.set_owner_if_different, '/path/to/file', 'root', False) - - m.reset_mock() - am.check_mode = True - self.assertEqual(am.set_owner_if_different('/path/to/file', 0, False), True) - self.assertEqual(m.called, False) - am.check_mode = False - - with patch('os.lchown', side_effect=OSError) as m: - self.assertRaises(SystemExit, am.set_owner_if_different, '/path/to/file', 'root', False) - - def test_module_utils_basic_ansible_module_set_group_if_different(self): - from ansible.module_utils import basic - basic._ANSIBLE_ARGS = None - - am = basic.AnsibleModule( - argument_spec=dict(), - ) - - self.assertEqual(am.set_group_if_different('/path/to/file', None, True), True) - self.assertEqual(am.set_group_if_different('/path/to/file', None, False), False) - - am.user_and_group = MagicMock(return_value=(500, 500)) - - with patch('os.lchown', return_value=None) as m: - self.assertEqual(am.set_group_if_different('/path/to/file', 0, False), True) - m.assert_called_with(b'/path/to/file', -1, 0) - - def _mock_getgrnam(*args, **kwargs): - mock_gr = MagicMock() - mock_gr.gr_gid = 0 - return mock_gr - - m.reset_mock() - with patch('grp.getgrnam', side_effect=_mock_getgrnam): - self.assertEqual(am.set_group_if_different('/path/to/file', 'root', False), True) - m.assert_called_with(b'/path/to/file', -1, 0) - - with patch('grp.getgrnam', side_effect=KeyError): - self.assertRaises(SystemExit, am.set_group_if_different, '/path/to/file', 'root', False) - - m.reset_mock() - am.check_mode = True - self.assertEqual(am.set_group_if_different('/path/to/file', 0, False), True) - self.assertEqual(m.called, False) - am.check_mode = False - - with patch('os.lchown', side_effect=OSError) as m: - self.assertRaises(SystemExit, am.set_group_if_different, '/path/to/file', 'root', False) - - @patch('tempfile.mkstemp') - @patch('os.umask') - @patch('shutil.copyfileobj') - @patch('shutil.move') - @patch('shutil.copy2') - @patch('os.rename') - @patch('pwd.getpwuid') - @patch('os.getuid') - @patch('os.environ') - @patch('os.getlogin') - @patch('os.chown') - @patch('os.chmod') - @patch('os.stat') - @patch('os.path.exists') - @patch('os.close') - def test_module_utils_basic_ansible_module_atomic_move( - self, - _os_close, - _os_path_exists, - _os_stat, - _os_chmod, - _os_chown, - _os_getlogin, - _os_environ, - _os_getuid, - _pwd_getpwuid, - _os_rename, - _shutil_copy2, - _shutil_move, - _shutil_copyfileobj, - _os_umask, - _tempfile_mkstemp): - - from ansible.module_utils import basic - basic._ANSIBLE_ARGS = None - - am = basic.AnsibleModule( - argument_spec=dict(), - ) - - environ = dict() - _os_environ.__getitem__ = environ.__getitem__ - _os_environ.__setitem__ = environ.__setitem__ - - am.selinux_enabled = MagicMock() - am.selinux_context = MagicMock() - am.selinux_default_context = MagicMock() - am.set_context_if_different = MagicMock() - - # test destination does not exist, no selinux, login name = 'root', - # no environment, os.rename() succeeds - _os_path_exists.side_effect = [False, False] - _os_getlogin.return_value = 'root' - _os_getuid.return_value = 0 - _pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '') - _os_rename.return_value = None - _os_umask.side_effect = [18, 0] - am.selinux_enabled.return_value = False - _os_chmod.reset_mock() - _os_chown.reset_mock() - am.set_context_if_different.reset_mock() - am.atomic_move('/path/to/src', '/path/to/dest') - _os_rename.assert_called_with(b'/path/to/src', b'/path/to/dest') - self.assertEqual(_os_chmod.call_args_list, [call(b'/path/to/dest', basic.DEFAULT_PERM & ~18)]) - - # same as above, except selinux_enabled - _os_path_exists.side_effect = [False, False] - _os_getlogin.return_value = 'root' - _os_getuid.return_value = 0 - _pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '') - _os_rename.return_value = None - _os_umask.side_effect = [18, 0] - mock_context = MagicMock() - am.selinux_default_context.return_value = mock_context - am.selinux_enabled.return_value = True - _os_chmod.reset_mock() - _os_chown.reset_mock() - am.set_context_if_different.reset_mock() - am.selinux_default_context.reset_mock() - am.atomic_move('/path/to/src', '/path/to/dest') - _os_rename.assert_called_with(b'/path/to/src', b'/path/to/dest') - self.assertEqual(_os_chmod.call_args_list, [call(b'/path/to/dest', basic.DEFAULT_PERM & ~18)]) - self.assertEqual(am.selinux_default_context.call_args_list, [call('/path/to/dest')]) - self.assertEqual(am.set_context_if_different.call_args_list, [call('/path/to/dest', mock_context, False)]) - - # now with dest present, no selinux, also raise OSError when using - # os.getlogin() to test corner case with no tty - _os_path_exists.side_effect = [True, True] - _os_getlogin.side_effect = OSError() - _os_getuid.return_value = 0 - _pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '') - _os_rename.return_value = None - _os_umask.side_effect = [18, 0] - environ['LOGNAME'] = 'root' - stat1 = MagicMock() - stat1.st_mode = 0o0644 - stat1.st_uid = 0 - stat1.st_gid = 0 - _os_stat.side_effect = [stat1, ] - am.selinux_enabled.return_value = False - _os_chmod.reset_mock() - _os_chown.reset_mock() - am.set_context_if_different.reset_mock() - am.atomic_move('/path/to/src', '/path/to/dest') - _os_rename.assert_called_with(b'/path/to/src', b'/path/to/dest') - - # dest missing, selinux enabled - _os_path_exists.side_effect = [True, True] - _os_getlogin.return_value = 'root' - _os_getuid.return_value = 0 - _pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '') - _os_rename.return_value = None - _os_umask.side_effect = [18, 0] - stat1 = MagicMock() - stat1.st_mode = 0o0644 - stat1.st_uid = 0 - stat1.st_gid = 0 - _os_stat.side_effect = [stat1, ] - mock_context = MagicMock() - am.selinux_context.return_value = mock_context - am.selinux_enabled.return_value = True - _os_chmod.reset_mock() - _os_chown.reset_mock() - am.set_context_if_different.reset_mock() - am.selinux_default_context.reset_mock() - am.atomic_move('/path/to/src', '/path/to/dest') - _os_rename.assert_called_with(b'/path/to/src', b'/path/to/dest') - self.assertEqual(am.selinux_context.call_args_list, [call('/path/to/dest')]) - self.assertEqual(am.set_context_if_different.call_args_list, [call('/path/to/dest', mock_context, False)]) - - # now testing with exceptions raised - # have os.stat raise OSError which is not EPERM - _os_stat.side_effect = OSError() - _os_path_exists.side_effect = [True, True] - _os_getlogin.return_value = 'root' - _os_getuid.return_value = 0 - _pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '') - _os_rename.return_value = None - _os_umask.side_effect = [18, 0] - self.assertRaises(OSError, am.atomic_move, '/path/to/src', '/path/to/dest') - - # and now have os.stat return EPERM, which should not fail - _os_stat.side_effect = OSError(errno.EPERM, 'testing os stat with EPERM') - _os_path_exists.side_effect = [True, True] - _os_getlogin.return_value = 'root' - _os_getuid.return_value = 0 - _pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '') - _os_rename.return_value = None - _os_umask.side_effect = [18, 0] - # FIXME: we don't assert anything here yet - am.atomic_move('/path/to/src', '/path/to/dest') - - # now we test os.rename() raising errors... - # first we test with a bad errno to verify it bombs out - _os_path_exists.side_effect = [False, False] - _os_getlogin.return_value = 'root' - _os_getuid.return_value = 0 - _pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '') - _os_umask.side_effect = [18, 0] - _os_rename.side_effect = OSError(errno.EIO, 'failing with EIO') - self.assertRaises(SystemExit, am.atomic_move, '/path/to/src', '/path/to/dest') - - # next we test with EPERM so it continues to the alternate code for moving - # test with mkstemp raising an error first - _os_path_exists.side_effect = [False, False] - _os_getlogin.return_value = 'root' - _os_getuid.return_value = 0 - _os_close.return_value = None - _pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '') - _os_umask.side_effect = [18, 0] - _os_rename.side_effect = [OSError(errno.EPERM, 'failing with EPERM'), None] - _tempfile_mkstemp.return_value = None - _tempfile_mkstemp.side_effect = OSError() - am.selinux_enabled.return_value = False - self.assertRaises(SystemExit, am.atomic_move, '/path/to/src', '/path/to/dest') - - # then test with it creating a temp file - _os_path_exists.side_effect = [False, False, False] - _os_getlogin.return_value = 'root' - _os_getuid.return_value = 0 - _pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '') - _os_umask.side_effect = [18, 0] - _os_rename.side_effect = [OSError(errno.EPERM, 'failing with EPERM'), None] - mock_stat1 = MagicMock() - mock_stat2 = MagicMock() - mock_stat3 = MagicMock() - _os_stat.return_value = [mock_stat1, mock_stat2, mock_stat3] - _os_stat.side_effect = None - _tempfile_mkstemp.return_value = (None, '/path/to/tempfile') - _tempfile_mkstemp.side_effect = None - am.selinux_enabled.return_value = False - # FIXME: we don't assert anything here yet - am.atomic_move('/path/to/src', '/path/to/dest') - - # same as above, but with selinux enabled - _os_path_exists.side_effect = [False, False, False] - _os_getlogin.return_value = 'root' - _os_getuid.return_value = 0 - _pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '') - _os_umask.side_effect = [18, 0] - _os_rename.side_effect = [OSError(errno.EPERM, 'failing with EPERM'), None] - _tempfile_mkstemp.return_value = (None, None) - mock_context = MagicMock() - am.selinux_default_context.return_value = mock_context - am.selinux_enabled.return_value = True - am.atomic_move('/path/to/src', '/path/to/dest')