diff --git a/lib/ansible/module_utils/basic.py b/lib/ansible/module_utils/basic.py index df591444d99..e1eefb9c60b 100644 --- a/lib/ansible/module_utils/basic.py +++ b/lib/ansible/module_utils/basic.py @@ -185,7 +185,7 @@ except ImportError: pass try: - from ast import literal_eval as _literal_eval + from ast import literal_eval except ImportError: # a replacement for literal_eval that works with python 2.4. from: # https://mail.python.org/pipermail/python-list/2009-September/551880.html @@ -193,7 +193,7 @@ except ImportError: # ast.py from compiler import ast, parse - def _literal_eval(node_or_string): + def literal_eval(node_or_string): """ Safely evaluate an expression node or a string containing a Python expression. The string or node provided may only consist of the following @@ -223,6 +223,7 @@ except ImportError: raise ValueError('malformed string') return _convert(node_or_string) +_literal_eval = literal_eval FILE_COMMON_ARGUMENTS=dict( src = dict(), @@ -1254,9 +1255,9 @@ class AnsibleModule(object): try: result = None if not locals: - result = _literal_eval(str) + result = literal_eval(str) else: - result = _literal_eval(str, None, locals) + result = literal_eval(str, None, locals) if include_exceptions: return (result, None) else: @@ -1749,7 +1750,7 @@ class AnsibleModule(object): prefix=".ansible_tmp", dir=dest_dir, suffix=dest_file) except (OSError, IOError): e = get_exception() - self.fail_json(msg='The destination directory (%s) is not writable by the current user.' % dest_dir) + self.fail_json(msg='The destination directory (%s) is not writable by the current user. Error was: %s' % (dest_dir, e)) try: # leaves tmp file behind when sudo and not root if switched_user and os.getuid() != 0: diff --git a/test/units/module_utils/test_basic.py b/test/units/module_utils/test_basic.py index 86473dd2037..04556736b62 100644 --- a/test/units/module_utils/test_basic.py +++ b/test/units/module_utils/test_basic.py @@ -21,12 +21,18 @@ from __future__ import (absolute_import, division) __metaclass__ = type import errno +import os import sys -from six.moves import builtins +try: + import builtins +except ImportError: + import __builtin__ as builtins from ansible.compat.tests import unittest -from ansible.compat.tests.mock import patch, MagicMock, mock_open, Mock +from ansible.compat.tests.mock import patch, MagicMock, mock_open, Mock, call + +realimport = builtins.__import__ class TestModuleUtilsBasic(unittest.TestCase): @@ -36,17 +42,106 @@ class TestModuleUtilsBasic(unittest.TestCase): def tearDown(self): pass - def test_module_utils_basic_imports(self): - realimport = builtins.__import__ + def clear_modules(self, mods): + for mod in mods: + if mod in sys.modules: + del sys.modules[mod] + @patch.object(builtins, '__import__') + def test_module_utils_basic_import_syslog(self, mock_import): + def _mock_import(name, *args, **kwargs): + if name == 'syslog': + raise ImportError + return realimport(name, *args, **kwargs) + + self.clear_modules(['syslog', 'ansible.module_utils.basic']) + mod = builtins.__import__('ansible.module_utils.basic') + self.assertTrue(mod.module_utils.basic.HAS_SYSLOG) + + self.clear_modules(['syslog', 'ansible.module_utils.basic']) + mock_import.side_effect = _mock_import + mod = builtins.__import__('ansible.module_utils.basic') + self.assertFalse(mod.module_utils.basic.HAS_SYSLOG) + + @patch.object(builtins, '__import__') + def test_module_utils_basic_import_selinux(self, mock_import): + def _mock_import(name, *args, **kwargs): + if name == 'selinux': + raise ImportError + return realimport(name, *args, **kwargs) + + try: + self.clear_modules(['selinux', 'ansible.module_utils.basic']) + mod = builtins.__import__('ansible.module_utils.basic') + self.assertTrue(mod.module_utils.basic.HAVE_SELINUX) + except ImportError: + # no selinux on test system, so skip + pass + + self.clear_modules(['selinux', 'ansible.module_utils.basic']) + mock_import.side_effect = _mock_import + mod = builtins.__import__('ansible.module_utils.basic') + self.assertFalse(mod.module_utils.basic.HAVE_SELINUX) + + @patch.object(builtins, '__import__') + def test_module_utils_basic_import_json(self, mock_import): def _mock_import(name, *args, **kwargs): if name == 'json': - raise ImportError() - realimport(name, *args, **kwargs) + raise ImportError + return realimport(name, *args, **kwargs) - with patch.object(builtins, '__import__', _mock_import, create=True) as m: - m('ansible.module_utils.basic') - builtins.__import__('ansible.module_utils.basic') + self.clear_modules(['json', 'ansible.module_utils.basic']) + mod = builtins.__import__('ansible.module_utils.basic') + + self.clear_modules(['json', 'ansible.module_utils.basic']) + mock_import.side_effect = _mock_import + mod = builtins.__import__('ansible.module_utils.basic') + + @patch.object(builtins, '__import__') + def test_module_utils_basic_import_literal_eval(self, mock_import): + def _mock_import(name, *args, **kwargs): + try: + fromlist = kwargs.get('fromlist', args[2]) + except IndexError: + fromlist = [] + if name == 'ast' and 'literal_eval' in fromlist: + raise ImportError + return realimport(name, *args, **kwargs) + + mock_import.side_effect = _mock_import + del sys.modules['ast'] + del sys.modules['ansible.module_utils.basic'] + mod = builtins.__import__('ansible.module_utils.basic') + self.assertEqual(mod.module_utils.basic.literal_eval("'1'"), "1") + self.assertEqual(mod.module_utils.basic.literal_eval("1"), 1) + self.assertEqual(mod.module_utils.basic.literal_eval("-1"), -1) + self.assertEqual(mod.module_utils.basic.literal_eval("(1,2,3)"), (1,2,3)) + self.assertEqual(mod.module_utils.basic.literal_eval("[1]"), [1]) + self.assertEqual(mod.module_utils.basic.literal_eval("True"), True) + self.assertEqual(mod.module_utils.basic.literal_eval("False"), False) + self.assertEqual(mod.module_utils.basic.literal_eval("None"), None) + #self.assertEqual(mod.module_utils.basic.literal_eval('{"a": 1}'), dict(a=1)) + self.assertRaises(ValueError, mod.module_utils.basic.literal_eval, "asdfasdfasdf") + + @patch.object(builtins, '__import__') + def test_module_utils_basic_import_systemd_journal(self, mock_import): + def _mock_import(name, *args, **kwargs): + try: + fromlist = kwargs.get('fromlist', args[2]) + except IndexError: + fromlist = [] + if name == 'systemd' and 'journal' in fromlist: + raise ImportError + return realimport(name, *args, **kwargs) + + self.clear_modules(['systemd', 'ansible.module_utils.basic']) + mod = builtins.__import__('ansible.module_utils.basic') + self.assertTrue(mod.module_utils.basic.has_journal) + + self.clear_modules(['systemd', 'ansible.module_utils.basic']) + mock_import.side_effect = _mock_import + mod = builtins.__import__('ansible.module_utils.basic') + self.assertFalse(mod.module_utils.basic.has_journal) def test_module_utils_basic_get_platform(self): with patch('platform.system', return_value='foo'): @@ -60,19 +155,19 @@ class TestModuleUtilsBasic(unittest.TestCase): self.assertEqual(get_distribution(), None) with patch('platform.system', return_value='Linux'): - with patch('platform.linux_distribution', return_value=("foo", "1", "One")): + with patch('platform.linux_distribution', return_value=["foo"]): self.assertEqual(get_distribution(), "Foo") with patch('os.path.isfile', return_value=True): - def _dist(distname='', version='', id='', supported_dists=(), full_distribution_name=1): - if supported_dists != (): - return ("AmazonFooBar", "", "") - else: - return ("", "", "") - - with patch('platform.linux_distribution', side_effect=_dist): + with patch('platform.linux_distribution', side_effect=[("AmazonFooBar",)]): self.assertEqual(get_distribution(), "Amazonfoobar") + with patch('platform.linux_distribution', side_effect=(("",), ("AmazonFooBam",))): + self.assertEqual(get_distribution(), "Amazon") + + with patch('platform.linux_distribution', side_effect=[("",),("",)]): + self.assertEqual(get_distribution(), "OtherLinux") + def _dist(distname='', version='', id='', supported_dists=(), full_distribution_name=1): if supported_dists != (): return ("Bar", "2", "Two") @@ -678,17 +773,230 @@ class TestModuleUtilsBasic(unittest.TestCase): self.assertEqual(am.set_mode_if_different('/path/to/file', 0o660, False), True) am.check_mode = False - # FIXME: this isn't working yet - #with patch('os.lstat', side_effect=[mock_stat1, mock_stat2]): - # with patch('os.lchmod', return_value=None) as m_os: - # del m_os.lchmod - # with patch('os.path.islink', return_value=False): - # with patch('os.chmod', return_value=None) as m_chmod: - # self.assertEqual(am.set_mode_if_different('/path/to/file/no_lchmod', 0o660, False), True) - # m_chmod.assert_called_with('/path/to/file', 0o660) - # with patch('os.path.islink', return_value=True): - # with patch('os.chmod', return_value=None) as m_chmod: - # with patch('os.stat', return_value=mock_stat2): - # self.assertEqual(am.set_mode_if_different('/path/to/file', 0o660, False), True) - # m_chmod.assert_called_with('/path/to/file', 0o660) + original_hasattr = hasattr + def _hasattr(obj, name): + if obj == os and name == 'lchmod': + return False + return original_hasattr(obj, name) + + # FIXME: this isn't working yet + with patch('os.lstat', side_effect=[mock_stat1, mock_stat2]): + with patch.object(builtins, 'hasattr', side_effect=_hasattr): + with patch('os.path.islink', return_value=False): + with patch('os.chmod', return_value=None) as m_chmod: + self.assertEqual(am.set_mode_if_different('/path/to/file/no_lchmod', 0o660, False), True) + with patch('os.lstat', side_effect=[mock_stat1, mock_stat2]): + with patch.object(builtins, 'hasattr', side_effect=_hasattr): + with patch('os.path.islink', return_value=True): + with patch('os.chmod', return_value=None) as m_chmod: + with patch('os.stat', return_value=mock_stat2): + self.assertEqual(am.set_mode_if_different('/path/to/file', 0o660, False), True) + + @patch('tempfile.NamedTemporaryFile') + @patch('os.umask') + @patch('shutil.copyfileobj') + @patch('shutil.move') + @patch('shutil.copy2') + @patch('os.rename') + @patch('pwd.getpwuid') + @patch('os.getuid') + @patch('os.environ') + @patch('os.getlogin') + @patch('os.chown') + @patch('os.chmod') + @patch('os.stat') + @patch('os.path.exists') + def test_module_utils_basic_ansible_module_atomic_move( + self, + _os_path_exists, + _os_stat, + _os_chmod, + _os_chown, + _os_getlogin, + _os_environ, + _os_getuid, + _pwd_getpwuid, + _os_rename, + _shutil_copy2, + _shutil_move, + _shutil_copyfileobj, + _os_umask, + _tempfile_NamedTemporaryFile, + ): + + from ansible.module_utils import basic + + basic.MODULE_COMPLEX_ARGS = '{}' + am = basic.AnsibleModule( + argument_spec = dict(), + ) + + environ = dict() + _os_environ.__getitem__ = environ.__getitem__ + _os_environ.__setitem__ = environ.__setitem__ + + am.selinux_enabled = MagicMock() + am.selinux_context = MagicMock() + am.selinux_default_context = MagicMock() + am.set_context_if_different = MagicMock() + + # test destination does not exist, no selinux, login name = 'root', + # no environment, os.rename() succeeds + _os_path_exists.side_effect = [False, False] + _os_getlogin.return_value = 'root' + _os_getuid.return_value = 0 + _pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '') + _os_rename.return_value = None + _os_umask.side_effect = [18, 0] + am.selinux_enabled.return_value = False + _os_chmod.reset_mock() + _os_chown.reset_mock() + am.set_context_if_different.reset_mock() + am.atomic_move('/path/to/src', '/path/to/dest') + _os_rename.assert_called_with('/path/to/src', '/path/to/dest') + self.assertEqual(_os_chmod.call_args_list, [call('/path/to/dest', basic.DEFAULT_PERM & ~18)]) + + # same as above, except selinux_enabled + _os_path_exists.side_effect = [False, False] + _os_getlogin.return_value = 'root' + _os_getuid.return_value = 0 + _pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '') + _os_rename.return_value = None + _os_umask.side_effect = [18, 0] + mock_context = MagicMock() + am.selinux_default_context.return_value = mock_context + am.selinux_enabled.return_value = True + _os_chmod.reset_mock() + _os_chown.reset_mock() + am.set_context_if_different.reset_mock() + am.selinux_default_context.reset_mock() + am.atomic_move('/path/to/src', '/path/to/dest') + _os_rename.assert_called_with('/path/to/src', '/path/to/dest') + self.assertEqual(_os_chmod.call_args_list, [call('/path/to/dest', basic.DEFAULT_PERM & ~18)]) + self.assertEqual(am.selinux_default_context.call_args_list, [call('/path/to/dest')]) + self.assertEqual(am.set_context_if_different.call_args_list, [call('/path/to/dest', mock_context, False)]) + + # now with dest present, no selinux, also raise OSError when using + # os.getlogin() to test corner case with no tty + _os_path_exists.side_effect = [True, True] + _os_getlogin.side_effect = OSError() + _os_getuid.return_value = 0 + _pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '') + _os_rename.return_value = None + _os_umask.side_effect = [18, 0] + environ['LOGNAME'] = 'root' + stat1 = MagicMock() + stat1.st_mode = 0o0644 + stat1.st_uid = 0 + stat1.st_gid = 0 + _os_stat.side_effect = [stat1,] + am.selinux_enabled.return_value = False + _os_chmod.reset_mock() + _os_chown.reset_mock() + am.set_context_if_different.reset_mock() + am.atomic_move('/path/to/src', '/path/to/dest') + _os_rename.assert_called_with('/path/to/src', '/path/to/dest') + + # dest missing, selinux enabled + _os_path_exists.side_effect = [True, True] + _os_getlogin.return_value = 'root' + _os_getuid.return_value = 0 + _pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '') + _os_rename.return_value = None + _os_umask.side_effect = [18, 0] + stat1 = MagicMock() + stat1.st_mode = 0o0644 + stat1.st_uid = 0 + stat1.st_gid = 0 + _os_stat.side_effect = [stat1,] + mock_context = MagicMock() + am.selinux_context.return_value = mock_context + am.selinux_enabled.return_value = True + _os_chmod.reset_mock() + _os_chown.reset_mock() + am.set_context_if_different.reset_mock() + am.selinux_default_context.reset_mock() + am.atomic_move('/path/to/src', '/path/to/dest') + _os_rename.assert_called_with('/path/to/src', '/path/to/dest') + self.assertEqual(am.selinux_context.call_args_list, [call('/path/to/dest')]) + self.assertEqual(am.set_context_if_different.call_args_list, [call('/path/to/dest', mock_context, False)]) + + # now testing with exceptions raised + # have os.stat raise OSError which is not EPERM + _os_stat.side_effect = OSError() + _os_path_exists.side_effect = [True, True] + _os_getlogin.return_value = 'root' + _os_getuid.return_value = 0 + _pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '') + _os_rename.return_value = None + _os_umask.side_effect = [18, 0] + self.assertRaises(OSError, am.atomic_move, '/path/to/src', '/path/to/dest') + + # and now have os.stat return EPERM, which should not fail + _os_stat.side_effect = OSError(errno.EPERM, 'testing os stat with EPERM') + _os_path_exists.side_effect = [True, True] + _os_getlogin.return_value = 'root' + _os_getuid.return_value = 0 + _pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '') + _os_rename.return_value = None + _os_umask.side_effect = [18, 0] + # FIXME: we don't assert anything here yet + am.atomic_move('/path/to/src', '/path/to/dest') + + # now we test os.rename() raising errors... + # first we test with a bad errno to verify it bombs out + _os_path_exists.side_effect = [False, False] + _os_getlogin.return_value = 'root' + _os_getuid.return_value = 0 + _pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '') + _os_umask.side_effect = [18, 0] + _os_rename.side_effect = OSError(errno.EIO, 'failing with EIO') + self.assertRaises(SystemExit, am.atomic_move, '/path/to/src', '/path/to/dest') + + # next we test with EPERM so it continues to the alternate code for moving + # test with NamedTemporaryFile raising an error first + _os_path_exists.side_effect = [False, False] + _os_getlogin.return_value = 'root' + _os_getuid.return_value = 0 + _pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '') + _os_umask.side_effect = [18, 0] + _os_rename.side_effect = [OSError(errno.EPERM, 'failing with EPERM'), None] + _tempfile_NamedTemporaryFile.return_value = None + _tempfile_NamedTemporaryFile.side_effect = OSError() + am.selinux_enabled.return_value = False + self.assertRaises(SystemExit, am.atomic_move, '/path/to/src', '/path/to/dest') + + # then test with it creating a temp file + _os_path_exists.side_effect = [False, False] + _os_getlogin.return_value = 'root' + _os_getuid.return_value = 0 + _pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '') + _os_umask.side_effect = [18, 0] + _os_rename.side_effect = [OSError(errno.EPERM, 'failing with EPERM'), None] + mock_stat1 = MagicMock() + mock_stat2 = MagicMock() + mock_stat3 = MagicMock() + _os_stat.return_value = [mock_stat1, mock_stat2, mock_stat3] + _os_stat.side_effect = None + mock_tempfile = MagicMock() + mock_tempfile.name = '/path/to/tempfile' + _tempfile_NamedTemporaryFile.return_value = mock_tempfile + _tempfile_NamedTemporaryFile.side_effect = None + am.selinux_enabled.return_value = False + # FIXME: we don't assert anything here yet + am.atomic_move('/path/to/src', '/path/to/dest') + + # same as above, but with selinux enabled + _os_path_exists.side_effect = [False, False] + _os_getlogin.return_value = 'root' + _os_getuid.return_value = 0 + _pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '') + _os_umask.side_effect = [18, 0] + _os_rename.side_effect = [OSError(errno.EPERM, 'failing with EPERM'), None] + mock_tempfile = MagicMock() + _tempfile_NamedTemporaryFile.return_value = mock_tempfile + mock_context = MagicMock() + am.selinux_default_context.return_value = mock_context + am.selinux_enabled.return_value = True + am.atomic_move('/path/to/src', '/path/to/dest') diff --git a/test/units/plugins/action/test_action.py b/test/units/plugins/action/test_action.py index 85ac29e4ca6..ea44e315642 100644 --- a/test/units/plugins/action/test_action.py +++ b/test/units/plugins/action/test_action.py @@ -27,10 +27,11 @@ import pipes import os from sys import version_info -if version_info[0] == 2: - import __builtin__ as builtins -else: + +try: import builtins +except ImportError: + import __builtin__ as builtins from ansible import __version__ as ansible_version from ansible import constants as C