Expanding unit tests for module_utils/basic.py

This commit is contained in:
James Cammarata 2016-03-01 13:52:50 -05:00
parent 70e7be0346
commit e011f52557
3 changed files with 348 additions and 38 deletions

View file

@ -185,7 +185,7 @@ except ImportError:
pass pass
try: try:
from ast import literal_eval as _literal_eval from ast import literal_eval
except ImportError: except ImportError:
# a replacement for literal_eval that works with python 2.4. from: # a replacement for literal_eval that works with python 2.4. from:
# https://mail.python.org/pipermail/python-list/2009-September/551880.html # https://mail.python.org/pipermail/python-list/2009-September/551880.html
@ -193,7 +193,7 @@ except ImportError:
# ast.py # ast.py
from compiler import ast, parse from compiler import ast, parse
def _literal_eval(node_or_string): def literal_eval(node_or_string):
""" """
Safely evaluate an expression node or a string containing a Python Safely evaluate an expression node or a string containing a Python
expression. The string or node provided may only consist of the following expression. The string or node provided may only consist of the following
@ -223,6 +223,7 @@ except ImportError:
raise ValueError('malformed string') raise ValueError('malformed string')
return _convert(node_or_string) return _convert(node_or_string)
_literal_eval = literal_eval
FILE_COMMON_ARGUMENTS=dict( FILE_COMMON_ARGUMENTS=dict(
src = dict(), src = dict(),
@ -1254,9 +1255,9 @@ class AnsibleModule(object):
try: try:
result = None result = None
if not locals: if not locals:
result = _literal_eval(str) result = literal_eval(str)
else: else:
result = _literal_eval(str, None, locals) result = literal_eval(str, None, locals)
if include_exceptions: if include_exceptions:
return (result, None) return (result, None)
else: else:
@ -1749,7 +1750,7 @@ class AnsibleModule(object):
prefix=".ansible_tmp", dir=dest_dir, suffix=dest_file) prefix=".ansible_tmp", dir=dest_dir, suffix=dest_file)
except (OSError, IOError): except (OSError, IOError):
e = get_exception() e = get_exception()
self.fail_json(msg='The destination directory (%s) is not writable by the current user.' % dest_dir) self.fail_json(msg='The destination directory (%s) is not writable by the current user. Error was: %s' % (dest_dir, e))
try: # leaves tmp file behind when sudo and not root try: # leaves tmp file behind when sudo and not root
if switched_user and os.getuid() != 0: if switched_user and os.getuid() != 0:

View file

@ -21,12 +21,18 @@ from __future__ import (absolute_import, division)
__metaclass__ = type __metaclass__ = type
import errno import errno
import os
import sys import sys
from six.moves import builtins try:
import builtins
except ImportError:
import __builtin__ as builtins
from ansible.compat.tests import unittest from ansible.compat.tests import unittest
from ansible.compat.tests.mock import patch, MagicMock, mock_open, Mock from ansible.compat.tests.mock import patch, MagicMock, mock_open, Mock, call
realimport = builtins.__import__
class TestModuleUtilsBasic(unittest.TestCase): class TestModuleUtilsBasic(unittest.TestCase):
@ -36,17 +42,106 @@ class TestModuleUtilsBasic(unittest.TestCase):
def tearDown(self): def tearDown(self):
pass pass
def test_module_utils_basic_imports(self): def clear_modules(self, mods):
realimport = builtins.__import__ for mod in mods:
if mod in sys.modules:
del sys.modules[mod]
@patch.object(builtins, '__import__')
def test_module_utils_basic_import_syslog(self, mock_import):
def _mock_import(name, *args, **kwargs):
if name == 'syslog':
raise ImportError
return realimport(name, *args, **kwargs)
self.clear_modules(['syslog', 'ansible.module_utils.basic'])
mod = builtins.__import__('ansible.module_utils.basic')
self.assertTrue(mod.module_utils.basic.HAS_SYSLOG)
self.clear_modules(['syslog', 'ansible.module_utils.basic'])
mock_import.side_effect = _mock_import
mod = builtins.__import__('ansible.module_utils.basic')
self.assertFalse(mod.module_utils.basic.HAS_SYSLOG)
@patch.object(builtins, '__import__')
def test_module_utils_basic_import_selinux(self, mock_import):
def _mock_import(name, *args, **kwargs):
if name == 'selinux':
raise ImportError
return realimport(name, *args, **kwargs)
try:
self.clear_modules(['selinux', 'ansible.module_utils.basic'])
mod = builtins.__import__('ansible.module_utils.basic')
self.assertTrue(mod.module_utils.basic.HAVE_SELINUX)
except ImportError:
# no selinux on test system, so skip
pass
self.clear_modules(['selinux', 'ansible.module_utils.basic'])
mock_import.side_effect = _mock_import
mod = builtins.__import__('ansible.module_utils.basic')
self.assertFalse(mod.module_utils.basic.HAVE_SELINUX)
@patch.object(builtins, '__import__')
def test_module_utils_basic_import_json(self, mock_import):
def _mock_import(name, *args, **kwargs): def _mock_import(name, *args, **kwargs):
if name == 'json': if name == 'json':
raise ImportError() raise ImportError
realimport(name, *args, **kwargs) return realimport(name, *args, **kwargs)
with patch.object(builtins, '__import__', _mock_import, create=True) as m: self.clear_modules(['json', 'ansible.module_utils.basic'])
m('ansible.module_utils.basic') mod = builtins.__import__('ansible.module_utils.basic')
builtins.__import__('ansible.module_utils.basic')
self.clear_modules(['json', 'ansible.module_utils.basic'])
mock_import.side_effect = _mock_import
mod = builtins.__import__('ansible.module_utils.basic')
@patch.object(builtins, '__import__')
def test_module_utils_basic_import_literal_eval(self, mock_import):
def _mock_import(name, *args, **kwargs):
try:
fromlist = kwargs.get('fromlist', args[2])
except IndexError:
fromlist = []
if name == 'ast' and 'literal_eval' in fromlist:
raise ImportError
return realimport(name, *args, **kwargs)
mock_import.side_effect = _mock_import
del sys.modules['ast']
del sys.modules['ansible.module_utils.basic']
mod = builtins.__import__('ansible.module_utils.basic')
self.assertEqual(mod.module_utils.basic.literal_eval("'1'"), "1")
self.assertEqual(mod.module_utils.basic.literal_eval("1"), 1)
self.assertEqual(mod.module_utils.basic.literal_eval("-1"), -1)
self.assertEqual(mod.module_utils.basic.literal_eval("(1,2,3)"), (1,2,3))
self.assertEqual(mod.module_utils.basic.literal_eval("[1]"), [1])
self.assertEqual(mod.module_utils.basic.literal_eval("True"), True)
self.assertEqual(mod.module_utils.basic.literal_eval("False"), False)
self.assertEqual(mod.module_utils.basic.literal_eval("None"), None)
#self.assertEqual(mod.module_utils.basic.literal_eval('{"a": 1}'), dict(a=1))
self.assertRaises(ValueError, mod.module_utils.basic.literal_eval, "asdfasdfasdf")
@patch.object(builtins, '__import__')
def test_module_utils_basic_import_systemd_journal(self, mock_import):
def _mock_import(name, *args, **kwargs):
try:
fromlist = kwargs.get('fromlist', args[2])
except IndexError:
fromlist = []
if name == 'systemd' and 'journal' in fromlist:
raise ImportError
return realimport(name, *args, **kwargs)
self.clear_modules(['systemd', 'ansible.module_utils.basic'])
mod = builtins.__import__('ansible.module_utils.basic')
self.assertTrue(mod.module_utils.basic.has_journal)
self.clear_modules(['systemd', 'ansible.module_utils.basic'])
mock_import.side_effect = _mock_import
mod = builtins.__import__('ansible.module_utils.basic')
self.assertFalse(mod.module_utils.basic.has_journal)
def test_module_utils_basic_get_platform(self): def test_module_utils_basic_get_platform(self):
with patch('platform.system', return_value='foo'): with patch('platform.system', return_value='foo'):
@ -60,19 +155,19 @@ class TestModuleUtilsBasic(unittest.TestCase):
self.assertEqual(get_distribution(), None) self.assertEqual(get_distribution(), None)
with patch('platform.system', return_value='Linux'): with patch('platform.system', return_value='Linux'):
with patch('platform.linux_distribution', return_value=("foo", "1", "One")): with patch('platform.linux_distribution', return_value=["foo"]):
self.assertEqual(get_distribution(), "Foo") self.assertEqual(get_distribution(), "Foo")
with patch('os.path.isfile', return_value=True): with patch('os.path.isfile', return_value=True):
def _dist(distname='', version='', id='', supported_dists=(), full_distribution_name=1): with patch('platform.linux_distribution', side_effect=[("AmazonFooBar",)]):
if supported_dists != ():
return ("AmazonFooBar", "", "")
else:
return ("", "", "")
with patch('platform.linux_distribution', side_effect=_dist):
self.assertEqual(get_distribution(), "Amazonfoobar") self.assertEqual(get_distribution(), "Amazonfoobar")
with patch('platform.linux_distribution', side_effect=(("",), ("AmazonFooBam",))):
self.assertEqual(get_distribution(), "Amazon")
with patch('platform.linux_distribution', side_effect=[("",),("",)]):
self.assertEqual(get_distribution(), "OtherLinux")
def _dist(distname='', version='', id='', supported_dists=(), full_distribution_name=1): def _dist(distname='', version='', id='', supported_dists=(), full_distribution_name=1):
if supported_dists != (): if supported_dists != ():
return ("Bar", "2", "Two") return ("Bar", "2", "Two")
@ -678,17 +773,230 @@ class TestModuleUtilsBasic(unittest.TestCase):
self.assertEqual(am.set_mode_if_different('/path/to/file', 0o660, False), True) self.assertEqual(am.set_mode_if_different('/path/to/file', 0o660, False), True)
am.check_mode = False am.check_mode = False
# FIXME: this isn't working yet original_hasattr = hasattr
#with patch('os.lstat', side_effect=[mock_stat1, mock_stat2]): def _hasattr(obj, name):
# with patch('os.lchmod', return_value=None) as m_os: if obj == os and name == 'lchmod':
# del m_os.lchmod return False
# with patch('os.path.islink', return_value=False): return original_hasattr(obj, name)
# with patch('os.chmod', return_value=None) as m_chmod:
# self.assertEqual(am.set_mode_if_different('/path/to/file/no_lchmod', 0o660, False), True) # FIXME: this isn't working yet
# m_chmod.assert_called_with('/path/to/file', 0o660) with patch('os.lstat', side_effect=[mock_stat1, mock_stat2]):
# with patch('os.path.islink', return_value=True): with patch.object(builtins, 'hasattr', side_effect=_hasattr):
# with patch('os.chmod', return_value=None) as m_chmod: with patch('os.path.islink', return_value=False):
# with patch('os.stat', return_value=mock_stat2): with patch('os.chmod', return_value=None) as m_chmod:
# self.assertEqual(am.set_mode_if_different('/path/to/file', 0o660, False), True) self.assertEqual(am.set_mode_if_different('/path/to/file/no_lchmod', 0o660, False), True)
# m_chmod.assert_called_with('/path/to/file', 0o660) with patch('os.lstat', side_effect=[mock_stat1, mock_stat2]):
with patch.object(builtins, 'hasattr', side_effect=_hasattr):
with patch('os.path.islink', return_value=True):
with patch('os.chmod', return_value=None) as m_chmod:
with patch('os.stat', return_value=mock_stat2):
self.assertEqual(am.set_mode_if_different('/path/to/file', 0o660, False), True)
@patch('tempfile.NamedTemporaryFile')
@patch('os.umask')
@patch('shutil.copyfileobj')
@patch('shutil.move')
@patch('shutil.copy2')
@patch('os.rename')
@patch('pwd.getpwuid')
@patch('os.getuid')
@patch('os.environ')
@patch('os.getlogin')
@patch('os.chown')
@patch('os.chmod')
@patch('os.stat')
@patch('os.path.exists')
def test_module_utils_basic_ansible_module_atomic_move(
self,
_os_path_exists,
_os_stat,
_os_chmod,
_os_chown,
_os_getlogin,
_os_environ,
_os_getuid,
_pwd_getpwuid,
_os_rename,
_shutil_copy2,
_shutil_move,
_shutil_copyfileobj,
_os_umask,
_tempfile_NamedTemporaryFile,
):
from ansible.module_utils import basic
basic.MODULE_COMPLEX_ARGS = '{}'
am = basic.AnsibleModule(
argument_spec = dict(),
)
environ = dict()
_os_environ.__getitem__ = environ.__getitem__
_os_environ.__setitem__ = environ.__setitem__
am.selinux_enabled = MagicMock()
am.selinux_context = MagicMock()
am.selinux_default_context = MagicMock()
am.set_context_if_different = MagicMock()
# test destination does not exist, no selinux, login name = 'root',
# no environment, os.rename() succeeds
_os_path_exists.side_effect = [False, False]
_os_getlogin.return_value = 'root'
_os_getuid.return_value = 0
_pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '')
_os_rename.return_value = None
_os_umask.side_effect = [18, 0]
am.selinux_enabled.return_value = False
_os_chmod.reset_mock()
_os_chown.reset_mock()
am.set_context_if_different.reset_mock()
am.atomic_move('/path/to/src', '/path/to/dest')
_os_rename.assert_called_with('/path/to/src', '/path/to/dest')
self.assertEqual(_os_chmod.call_args_list, [call('/path/to/dest', basic.DEFAULT_PERM & ~18)])
# same as above, except selinux_enabled
_os_path_exists.side_effect = [False, False]
_os_getlogin.return_value = 'root'
_os_getuid.return_value = 0
_pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '')
_os_rename.return_value = None
_os_umask.side_effect = [18, 0]
mock_context = MagicMock()
am.selinux_default_context.return_value = mock_context
am.selinux_enabled.return_value = True
_os_chmod.reset_mock()
_os_chown.reset_mock()
am.set_context_if_different.reset_mock()
am.selinux_default_context.reset_mock()
am.atomic_move('/path/to/src', '/path/to/dest')
_os_rename.assert_called_with('/path/to/src', '/path/to/dest')
self.assertEqual(_os_chmod.call_args_list, [call('/path/to/dest', basic.DEFAULT_PERM & ~18)])
self.assertEqual(am.selinux_default_context.call_args_list, [call('/path/to/dest')])
self.assertEqual(am.set_context_if_different.call_args_list, [call('/path/to/dest', mock_context, False)])
# now with dest present, no selinux, also raise OSError when using
# os.getlogin() to test corner case with no tty
_os_path_exists.side_effect = [True, True]
_os_getlogin.side_effect = OSError()
_os_getuid.return_value = 0
_pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '')
_os_rename.return_value = None
_os_umask.side_effect = [18, 0]
environ['LOGNAME'] = 'root'
stat1 = MagicMock()
stat1.st_mode = 0o0644
stat1.st_uid = 0
stat1.st_gid = 0
_os_stat.side_effect = [stat1,]
am.selinux_enabled.return_value = False
_os_chmod.reset_mock()
_os_chown.reset_mock()
am.set_context_if_different.reset_mock()
am.atomic_move('/path/to/src', '/path/to/dest')
_os_rename.assert_called_with('/path/to/src', '/path/to/dest')
# dest missing, selinux enabled
_os_path_exists.side_effect = [True, True]
_os_getlogin.return_value = 'root'
_os_getuid.return_value = 0
_pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '')
_os_rename.return_value = None
_os_umask.side_effect = [18, 0]
stat1 = MagicMock()
stat1.st_mode = 0o0644
stat1.st_uid = 0
stat1.st_gid = 0
_os_stat.side_effect = [stat1,]
mock_context = MagicMock()
am.selinux_context.return_value = mock_context
am.selinux_enabled.return_value = True
_os_chmod.reset_mock()
_os_chown.reset_mock()
am.set_context_if_different.reset_mock()
am.selinux_default_context.reset_mock()
am.atomic_move('/path/to/src', '/path/to/dest')
_os_rename.assert_called_with('/path/to/src', '/path/to/dest')
self.assertEqual(am.selinux_context.call_args_list, [call('/path/to/dest')])
self.assertEqual(am.set_context_if_different.call_args_list, [call('/path/to/dest', mock_context, False)])
# now testing with exceptions raised
# have os.stat raise OSError which is not EPERM
_os_stat.side_effect = OSError()
_os_path_exists.side_effect = [True, True]
_os_getlogin.return_value = 'root'
_os_getuid.return_value = 0
_pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '')
_os_rename.return_value = None
_os_umask.side_effect = [18, 0]
self.assertRaises(OSError, am.atomic_move, '/path/to/src', '/path/to/dest')
# and now have os.stat return EPERM, which should not fail
_os_stat.side_effect = OSError(errno.EPERM, 'testing os stat with EPERM')
_os_path_exists.side_effect = [True, True]
_os_getlogin.return_value = 'root'
_os_getuid.return_value = 0
_pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '')
_os_rename.return_value = None
_os_umask.side_effect = [18, 0]
# FIXME: we don't assert anything here yet
am.atomic_move('/path/to/src', '/path/to/dest')
# now we test os.rename() raising errors...
# first we test with a bad errno to verify it bombs out
_os_path_exists.side_effect = [False, False]
_os_getlogin.return_value = 'root'
_os_getuid.return_value = 0
_pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '')
_os_umask.side_effect = [18, 0]
_os_rename.side_effect = OSError(errno.EIO, 'failing with EIO')
self.assertRaises(SystemExit, am.atomic_move, '/path/to/src', '/path/to/dest')
# next we test with EPERM so it continues to the alternate code for moving
# test with NamedTemporaryFile raising an error first
_os_path_exists.side_effect = [False, False]
_os_getlogin.return_value = 'root'
_os_getuid.return_value = 0
_pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '')
_os_umask.side_effect = [18, 0]
_os_rename.side_effect = [OSError(errno.EPERM, 'failing with EPERM'), None]
_tempfile_NamedTemporaryFile.return_value = None
_tempfile_NamedTemporaryFile.side_effect = OSError()
am.selinux_enabled.return_value = False
self.assertRaises(SystemExit, am.atomic_move, '/path/to/src', '/path/to/dest')
# then test with it creating a temp file
_os_path_exists.side_effect = [False, False]
_os_getlogin.return_value = 'root'
_os_getuid.return_value = 0
_pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '')
_os_umask.side_effect = [18, 0]
_os_rename.side_effect = [OSError(errno.EPERM, 'failing with EPERM'), None]
mock_stat1 = MagicMock()
mock_stat2 = MagicMock()
mock_stat3 = MagicMock()
_os_stat.return_value = [mock_stat1, mock_stat2, mock_stat3]
_os_stat.side_effect = None
mock_tempfile = MagicMock()
mock_tempfile.name = '/path/to/tempfile'
_tempfile_NamedTemporaryFile.return_value = mock_tempfile
_tempfile_NamedTemporaryFile.side_effect = None
am.selinux_enabled.return_value = False
# FIXME: we don't assert anything here yet
am.atomic_move('/path/to/src', '/path/to/dest')
# same as above, but with selinux enabled
_os_path_exists.side_effect = [False, False]
_os_getlogin.return_value = 'root'
_os_getuid.return_value = 0
_pwd_getpwuid.return_value = ('root', '', 0, 0, '', '', '')
_os_umask.side_effect = [18, 0]
_os_rename.side_effect = [OSError(errno.EPERM, 'failing with EPERM'), None]
mock_tempfile = MagicMock()
_tempfile_NamedTemporaryFile.return_value = mock_tempfile
mock_context = MagicMock()
am.selinux_default_context.return_value = mock_context
am.selinux_enabled.return_value = True
am.atomic_move('/path/to/src', '/path/to/dest')

View file

@ -27,10 +27,11 @@ import pipes
import os import os
from sys import version_info from sys import version_info
if version_info[0] == 2:
import __builtin__ as builtins try:
else:
import builtins import builtins
except ImportError:
import __builtin__ as builtins
from ansible import __version__ as ansible_version from ansible import __version__ as ansible_version
from ansible import constants as C from ansible import constants as C