7d64aebdd3
Change: - Regression introduced in #70785 - When macOS chmod ACL syntax is used, Solaris-derived chmods return with a status of 5. This is also used for our sshpass handling, because sshpass will return 5 on auth failure. This means on Solaris, we incorrectly assume auth failure when we reach this branch of logic and try to run chmod with macOS syntax. - We now wrap this specific use of chmod in an exception handler that looks for AnsibleAuthenticationFailure and skips over it. This adds another authentication attempt (something we normally avoid to prevent account lockout), but seems better than the regression of not allowing other fallbacks to be used. - Without this patch, if setfacl fails on Solaris (and sshpass is used), we do not try common_remote_group or world-readable tmpdir fallbacks. Test Plan: - New unit Signed-off-by: Rick Elrod <rick@elrod.me>
894 lines
35 KiB
Python
894 lines
35 KiB
Python
# -*- coding: utf-8 -*-
|
|
# (c) 2015, Florian Apolloner <florian@apolloner.eu>
|
|
#
|
|
# This file is part of Ansible
|
|
#
|
|
# Ansible is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# Ansible is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
# Make coding more python3-ish
|
|
from __future__ import (absolute_import, division, print_function)
|
|
__metaclass__ = type
|
|
|
|
import os
|
|
import re
|
|
|
|
from ansible import constants as C
|
|
from units.compat import unittest
|
|
from units.compat.mock import patch, MagicMock, mock_open
|
|
|
|
from ansible.errors import AnsibleError, AnsibleAuthenticationFailure
|
|
from ansible.module_utils.six import text_type
|
|
from ansible.module_utils.six.moves import shlex_quote, builtins
|
|
from ansible.module_utils._text import to_bytes
|
|
from ansible.playbook.play_context import PlayContext
|
|
from ansible.plugins.action import ActionBase
|
|
from ansible.template import Templar
|
|
from ansible.vars.clean import clean_facts
|
|
|
|
from units.mock.loader import DictDataLoader
|
|
|
|
|
|
python_module_replacers = br"""
|
|
#!/usr/bin/python
|
|
|
|
#ANSIBLE_VERSION = "<<ANSIBLE_VERSION>>"
|
|
#MODULE_COMPLEX_ARGS = "<<INCLUDE_ANSIBLE_MODULE_COMPLEX_ARGS>>"
|
|
#SELINUX_SPECIAL_FS="<<SELINUX_SPECIAL_FILESYSTEMS>>"
|
|
|
|
test = u'Toshio \u304f\u3089\u3068\u307f'
|
|
from ansible.module_utils.basic import *
|
|
"""
|
|
|
|
powershell_module_replacers = b"""
|
|
WINDOWS_ARGS = "<<INCLUDE_ANSIBLE_MODULE_JSON_ARGS>>"
|
|
# POWERSHELL_COMMON
|
|
"""
|
|
|
|
|
|
def _action_base():
|
|
fake_loader = DictDataLoader({
|
|
})
|
|
mock_module_loader = MagicMock()
|
|
mock_shared_loader_obj = MagicMock()
|
|
mock_shared_loader_obj.module_loader = mock_module_loader
|
|
mock_connection_loader = MagicMock()
|
|
|
|
mock_shared_loader_obj.connection_loader = mock_connection_loader
|
|
mock_connection = MagicMock()
|
|
|
|
play_context = MagicMock()
|
|
|
|
action_base = DerivedActionBase(task=None,
|
|
connection=mock_connection,
|
|
play_context=play_context,
|
|
loader=fake_loader,
|
|
templar=None,
|
|
shared_loader_obj=mock_shared_loader_obj)
|
|
return action_base
|
|
|
|
|
|
class DerivedActionBase(ActionBase):
|
|
TRANSFERS_FILES = False
|
|
|
|
def run(self, tmp=None, task_vars=None):
|
|
# We're not testing the plugin run() method, just the helper
|
|
# methods ActionBase defines
|
|
return super(DerivedActionBase, self).run(tmp=tmp, task_vars=task_vars)
|
|
|
|
|
|
class TestActionBase(unittest.TestCase):
|
|
|
|
def test_action_base_run(self):
|
|
mock_task = MagicMock()
|
|
mock_task.action = "foo"
|
|
mock_task.args = dict(a=1, b=2, c=3)
|
|
|
|
mock_connection = MagicMock()
|
|
|
|
play_context = PlayContext()
|
|
|
|
mock_task.async_val = None
|
|
action_base = DerivedActionBase(mock_task, mock_connection, play_context, None, None, None)
|
|
results = action_base.run()
|
|
self.assertEqual(results, dict())
|
|
|
|
mock_task.async_val = 0
|
|
action_base = DerivedActionBase(mock_task, mock_connection, play_context, None, None, None)
|
|
results = action_base.run()
|
|
self.assertEqual(results, {})
|
|
|
|
def test_action_base__configure_module(self):
|
|
fake_loader = DictDataLoader({
|
|
})
|
|
|
|
# create our fake task
|
|
mock_task = MagicMock()
|
|
mock_task.action = "copy"
|
|
mock_task.async_val = 0
|
|
mock_task.delegate_to = None
|
|
|
|
# create a mock connection, so we don't actually try and connect to things
|
|
mock_connection = MagicMock()
|
|
|
|
# create a mock shared loader object
|
|
def mock_find_plugin_with_context(name, options, collection_list=None):
|
|
mockctx = MagicMock()
|
|
if name == 'badmodule':
|
|
mockctx.resolved = False
|
|
mockctx.plugin_resolved_path = None
|
|
elif '.ps1' in options:
|
|
mockctx.resolved = True
|
|
mockctx.plugin_resolved_path = '/fake/path/to/%s.ps1' % name
|
|
else:
|
|
mockctx.resolved = True
|
|
mockctx.plugin_resolved_path = '/fake/path/to/%s' % name
|
|
return mockctx
|
|
|
|
mock_module_loader = MagicMock()
|
|
mock_module_loader.find_plugin_with_context.side_effect = mock_find_plugin_with_context
|
|
mock_shared_obj_loader = MagicMock()
|
|
mock_shared_obj_loader.module_loader = mock_module_loader
|
|
|
|
# we're using a real play context here
|
|
play_context = PlayContext()
|
|
|
|
# our test class
|
|
action_base = DerivedActionBase(
|
|
task=mock_task,
|
|
connection=mock_connection,
|
|
play_context=play_context,
|
|
loader=fake_loader,
|
|
templar=Templar(loader=fake_loader),
|
|
shared_loader_obj=mock_shared_obj_loader,
|
|
)
|
|
|
|
# test python module formatting
|
|
with patch.object(builtins, 'open', mock_open(read_data=to_bytes(python_module_replacers.strip(), encoding='utf-8'))):
|
|
with patch.object(os, 'rename'):
|
|
mock_task.args = dict(a=1, foo='fö〩')
|
|
mock_connection.module_implementation_preferences = ('',)
|
|
(style, shebang, data, path) = action_base._configure_module(mock_task.action, mock_task.args,
|
|
task_vars=dict(ansible_python_interpreter='/usr/bin/python'))
|
|
self.assertEqual(style, "new")
|
|
self.assertEqual(shebang, u"#!/usr/bin/python")
|
|
|
|
# test module not found
|
|
self.assertRaises(AnsibleError, action_base._configure_module, 'badmodule', mock_task.args, {})
|
|
|
|
# test powershell module formatting
|
|
with patch.object(builtins, 'open', mock_open(read_data=to_bytes(powershell_module_replacers.strip(), encoding='utf-8'))):
|
|
mock_task.action = 'win_copy'
|
|
mock_task.args = dict(b=2)
|
|
mock_connection.module_implementation_preferences = ('.ps1',)
|
|
(style, shebang, data, path) = action_base._configure_module('stat', mock_task.args, {})
|
|
self.assertEqual(style, "new")
|
|
self.assertEqual(shebang, u'#!powershell')
|
|
|
|
# test module not found
|
|
self.assertRaises(AnsibleError, action_base._configure_module, 'badmodule', mock_task.args, {})
|
|
|
|
def test_action_base__compute_environment_string(self):
|
|
fake_loader = DictDataLoader({
|
|
})
|
|
|
|
# create our fake task
|
|
mock_task = MagicMock()
|
|
mock_task.action = "copy"
|
|
mock_task.args = dict(a=1)
|
|
|
|
# create a mock connection, so we don't actually try and connect to things
|
|
def env_prefix(**args):
|
|
return ' '.join(['%s=%s' % (k, shlex_quote(text_type(v))) for k, v in args.items()])
|
|
mock_connection = MagicMock()
|
|
mock_connection._shell.env_prefix.side_effect = env_prefix
|
|
|
|
# we're using a real play context here
|
|
play_context = PlayContext()
|
|
|
|
# and we're using a real templar here too
|
|
templar = Templar(loader=fake_loader)
|
|
|
|
# our test class
|
|
action_base = DerivedActionBase(
|
|
task=mock_task,
|
|
connection=mock_connection,
|
|
play_context=play_context,
|
|
loader=fake_loader,
|
|
templar=templar,
|
|
shared_loader_obj=None,
|
|
)
|
|
|
|
# test standard environment setup
|
|
mock_task.environment = [dict(FOO='foo'), None]
|
|
env_string = action_base._compute_environment_string()
|
|
self.assertEqual(env_string, "FOO=foo")
|
|
|
|
# test where environment is not a list
|
|
mock_task.environment = dict(FOO='foo')
|
|
env_string = action_base._compute_environment_string()
|
|
self.assertEqual(env_string, "FOO=foo")
|
|
|
|
# test environment with a variable in it
|
|
templar.available_variables = dict(the_var='bar')
|
|
mock_task.environment = [dict(FOO='{{the_var}}')]
|
|
env_string = action_base._compute_environment_string()
|
|
self.assertEqual(env_string, "FOO=bar")
|
|
|
|
# test with a bad environment set
|
|
mock_task.environment = dict(FOO='foo')
|
|
mock_task.environment = ['hi there']
|
|
self.assertRaises(AnsibleError, action_base._compute_environment_string)
|
|
|
|
def test_action_base__early_needs_tmp_path(self):
|
|
# create our fake task
|
|
mock_task = MagicMock()
|
|
|
|
# create a mock connection, so we don't actually try and connect to things
|
|
mock_connection = MagicMock()
|
|
|
|
# we're using a real play context here
|
|
play_context = PlayContext()
|
|
|
|
# our test class
|
|
action_base = DerivedActionBase(
|
|
task=mock_task,
|
|
connection=mock_connection,
|
|
play_context=play_context,
|
|
loader=None,
|
|
templar=None,
|
|
shared_loader_obj=None,
|
|
)
|
|
|
|
self.assertFalse(action_base._early_needs_tmp_path())
|
|
|
|
action_base.TRANSFERS_FILES = True
|
|
self.assertTrue(action_base._early_needs_tmp_path())
|
|
|
|
def test_action_base__make_tmp_path(self):
|
|
# create our fake task
|
|
mock_task = MagicMock()
|
|
|
|
def get_shell_opt(opt):
|
|
|
|
ret = None
|
|
if opt == 'admin_users':
|
|
ret = ['root', 'toor', 'Administrator']
|
|
elif opt == 'remote_tmp':
|
|
ret = '~/.ansible/tmp'
|
|
|
|
return ret
|
|
|
|
# create a mock connection, so we don't actually try and connect to things
|
|
mock_connection = MagicMock()
|
|
mock_connection.transport = 'ssh'
|
|
mock_connection._shell.mkdtemp.return_value = 'mkdir command'
|
|
mock_connection._shell.join_path.side_effect = os.path.join
|
|
mock_connection._shell.get_option = get_shell_opt
|
|
mock_connection._shell.HOMES_RE = re.compile(r'(\'|\")?(~|\$HOME)(.*)')
|
|
|
|
# we're using a real play context here
|
|
play_context = PlayContext()
|
|
play_context.become = True
|
|
play_context.become_user = 'foo'
|
|
|
|
# our test class
|
|
action_base = DerivedActionBase(
|
|
task=mock_task,
|
|
connection=mock_connection,
|
|
play_context=play_context,
|
|
loader=None,
|
|
templar=None,
|
|
shared_loader_obj=None,
|
|
)
|
|
|
|
action_base._low_level_execute_command = MagicMock()
|
|
action_base._low_level_execute_command.return_value = dict(rc=0, stdout='/some/path')
|
|
self.assertEqual(action_base._make_tmp_path('root'), '/some/path/')
|
|
|
|
# empty path fails
|
|
action_base._low_level_execute_command.return_value = dict(rc=0, stdout='')
|
|
self.assertRaises(AnsibleError, action_base._make_tmp_path, 'root')
|
|
|
|
# authentication failure
|
|
action_base._low_level_execute_command.return_value = dict(rc=5, stdout='')
|
|
self.assertRaises(AnsibleError, action_base._make_tmp_path, 'root')
|
|
|
|
# ssh error
|
|
action_base._low_level_execute_command.return_value = dict(rc=255, stdout='', stderr='')
|
|
self.assertRaises(AnsibleError, action_base._make_tmp_path, 'root')
|
|
play_context.verbosity = 5
|
|
self.assertRaises(AnsibleError, action_base._make_tmp_path, 'root')
|
|
|
|
# general error
|
|
action_base._low_level_execute_command.return_value = dict(rc=1, stdout='some stuff here', stderr='')
|
|
self.assertRaises(AnsibleError, action_base._make_tmp_path, 'root')
|
|
action_base._low_level_execute_command.return_value = dict(rc=1, stdout='some stuff here', stderr='No space left on device')
|
|
self.assertRaises(AnsibleError, action_base._make_tmp_path, 'root')
|
|
|
|
def test_action_base__fixup_perms2(self):
|
|
mock_task = MagicMock()
|
|
mock_connection = MagicMock()
|
|
play_context = PlayContext()
|
|
action_base = DerivedActionBase(
|
|
task=mock_task,
|
|
connection=mock_connection,
|
|
play_context=play_context,
|
|
loader=None,
|
|
templar=None,
|
|
shared_loader_obj=None,
|
|
)
|
|
action_base._low_level_execute_command = MagicMock()
|
|
remote_paths = ['/tmp/foo/bar.txt', '/tmp/baz.txt']
|
|
remote_user = 'remoteuser1'
|
|
|
|
# Used for skipping down to common group dir.
|
|
CHMOD_ACL_FLAGS = ('+a', 'A+user:remoteuser2:r:allow')
|
|
|
|
def runWithNoExpectation(execute=False):
|
|
return action_base._fixup_perms2(
|
|
remote_paths,
|
|
remote_user=remote_user,
|
|
execute=execute)
|
|
|
|
def assertSuccess(execute=False):
|
|
self.assertEqual(runWithNoExpectation(execute), remote_paths)
|
|
|
|
def assertThrowRegex(regex, execute=False):
|
|
self.assertRaisesRegexp(
|
|
AnsibleError,
|
|
regex,
|
|
action_base._fixup_perms2,
|
|
remote_paths,
|
|
remote_user=remote_user,
|
|
execute=execute)
|
|
|
|
def get_shell_option_for_arg(args_kv, default):
|
|
'''A helper for get_shell_option. Returns a function that, if
|
|
called with ``option`` that exists in args_kv, will return the
|
|
value, else will return ``default`` for every other given arg'''
|
|
def _helper(option, *args, **kwargs):
|
|
return args_kv.get(option, default)
|
|
return _helper
|
|
|
|
action_base.get_become_option = MagicMock()
|
|
action_base.get_become_option.return_value = 'remoteuser2'
|
|
|
|
# Step 1: On windows, we just return remote_paths
|
|
action_base._connection._shell._IS_WINDOWS = True
|
|
assertSuccess(execute=False)
|
|
assertSuccess(execute=True)
|
|
|
|
# But if we're not on windows....we have more work to do.
|
|
action_base._connection._shell._IS_WINDOWS = False
|
|
|
|
# Step 2: We're /not/ becoming an unprivileged user
|
|
action_base._remote_chmod = MagicMock()
|
|
action_base._is_become_unprivileged = MagicMock()
|
|
action_base._is_become_unprivileged.return_value = False
|
|
# Two subcases:
|
|
# - _remote_chmod rc is 0
|
|
# - _remote-chmod rc is not 0, something failed
|
|
action_base._remote_chmod.return_value = {
|
|
'rc': 0,
|
|
'stdout': 'some stuff here',
|
|
'stderr': '',
|
|
}
|
|
assertSuccess(execute=True)
|
|
|
|
# When execute=False, we just get the list back. But add it here for
|
|
# completion. chmod is never called.
|
|
assertSuccess()
|
|
|
|
action_base._remote_chmod.return_value = {
|
|
'rc': 1,
|
|
'stdout': 'some stuff here',
|
|
'stderr': 'and here',
|
|
}
|
|
assertThrowRegex(
|
|
'Failed to set execute bit on remote files',
|
|
execute=True)
|
|
|
|
# Step 3: we are becoming unprivileged
|
|
action_base._is_become_unprivileged.return_value = True
|
|
|
|
# Step 3a: setfacl
|
|
action_base._remote_set_user_facl = MagicMock()
|
|
action_base._remote_set_user_facl.return_value = {
|
|
'rc': 0,
|
|
'stdout': '',
|
|
'stderr': '',
|
|
}
|
|
assertSuccess()
|
|
|
|
# Step 3b: chmod +x if we need to
|
|
# To get here, setfacl failed, so mock it as such.
|
|
action_base._remote_set_user_facl.return_value = {
|
|
'rc': 1,
|
|
'stdout': '',
|
|
'stderr': '',
|
|
}
|
|
action_base._remote_chmod.return_value = {
|
|
'rc': 1,
|
|
'stdout': 'some stuff here',
|
|
'stderr': '',
|
|
}
|
|
assertThrowRegex(
|
|
'Failed to set file mode on remote temporary file',
|
|
execute=True)
|
|
action_base._remote_chmod.return_value = {
|
|
'rc': 0,
|
|
'stdout': 'some stuff here',
|
|
'stderr': '',
|
|
}
|
|
assertSuccess(execute=True)
|
|
|
|
# Step 3c: chown
|
|
action_base._remote_chown = MagicMock()
|
|
action_base._remote_chown.return_value = {
|
|
'rc': 0,
|
|
'stdout': '',
|
|
'stderr': '',
|
|
}
|
|
assertSuccess()
|
|
action_base._remote_chown.return_value = {
|
|
'rc': 1,
|
|
'stdout': '',
|
|
'stderr': '',
|
|
}
|
|
remote_user = 'root'
|
|
action_base._get_admin_users = MagicMock()
|
|
action_base._get_admin_users.return_value = ['root']
|
|
assertThrowRegex('user would be unable to read the file.')
|
|
remote_user = 'remoteuser1'
|
|
|
|
# Step 3d: chmod +a on osx
|
|
assertSuccess()
|
|
action_base._remote_chmod.assert_called_with(
|
|
['remoteuser2 allow read'] + remote_paths,
|
|
'+a')
|
|
|
|
# This case can cause Solaris chmod to return 5 which the ssh plugin
|
|
# treats as failure. To prevent a regression and ensure we still try the
|
|
# rest of the cases below, we mock the thrown exception here.
|
|
# This function ensures that only the macOS case (+a) throws this.
|
|
def raise_if_plus_a(definitely_not_underscore, mode):
|
|
if mode == '+a':
|
|
raise AnsibleAuthenticationFailure()
|
|
return {'rc': 0, 'stdout': '', 'stderr': ''}
|
|
|
|
action_base._remote_chmod.side_effect = raise_if_plus_a
|
|
assertSuccess()
|
|
|
|
# Step 3e: Common group
|
|
def rc_1_if_chmod_acl(definitely_not_underscore, mode):
|
|
rc = 0
|
|
if mode in CHMOD_ACL_FLAGS:
|
|
rc = 1
|
|
return {'rc': rc, 'stdout': '', 'stderr': ''}
|
|
|
|
action_base._remote_chmod = MagicMock()
|
|
action_base._remote_chmod.side_effect = rc_1_if_chmod_acl
|
|
|
|
get_shell_option = action_base.get_shell_option
|
|
action_base.get_shell_option = MagicMock()
|
|
action_base.get_shell_option.side_effect = get_shell_option_for_arg(
|
|
{
|
|
'common_remote_group': 'commongroup',
|
|
},
|
|
None)
|
|
action_base._remote_chgrp = MagicMock()
|
|
action_base._remote_chgrp.return_value = {
|
|
'rc': 0,
|
|
'stdout': '',
|
|
'stderr': '',
|
|
}
|
|
# TODO: Add test to assert warning is shown if
|
|
# ALLOW_WORLD_READABLE_TMPFILES is set in this case.
|
|
assertSuccess()
|
|
action_base._remote_chgrp.assert_called_once_with(
|
|
remote_paths,
|
|
'commongroup')
|
|
|
|
# Step 4: world-readable tmpdir
|
|
action_base.get_shell_option.side_effect = get_shell_option_for_arg(
|
|
{
|
|
'world_readable_temp': True,
|
|
'common_remote_group': None,
|
|
},
|
|
None)
|
|
action_base._remote_chmod.return_value = {
|
|
'rc': 0,
|
|
'stdout': 'some stuff here',
|
|
'stderr': '',
|
|
}
|
|
assertSuccess()
|
|
action_base._remote_chmod = MagicMock()
|
|
action_base._remote_chmod.return_value = {
|
|
'rc': 1,
|
|
'stdout': 'some stuff here',
|
|
'stderr': '',
|
|
}
|
|
assertThrowRegex('Failed to set file mode on remote files')
|
|
|
|
# Otherwise if we make it here in this state, we hit the catch-all
|
|
action_base.get_shell_option.side_effect = get_shell_option_for_arg(
|
|
{},
|
|
None)
|
|
assertThrowRegex('on the temporary files Ansible needs to create')
|
|
|
|
def test_action_base__remove_tmp_path(self):
|
|
# create our fake task
|
|
mock_task = MagicMock()
|
|
|
|
# create a mock connection, so we don't actually try and connect to things
|
|
mock_connection = MagicMock()
|
|
mock_connection._shell.remove.return_value = 'rm some stuff'
|
|
|
|
# we're using a real play context here
|
|
play_context = PlayContext()
|
|
|
|
# our test class
|
|
action_base = DerivedActionBase(
|
|
task=mock_task,
|
|
connection=mock_connection,
|
|
play_context=play_context,
|
|
loader=None,
|
|
templar=None,
|
|
shared_loader_obj=None,
|
|
)
|
|
|
|
action_base._low_level_execute_command = MagicMock()
|
|
# these don't really return anything or raise errors, so
|
|
# we're pretty much calling these for coverage right now
|
|
action_base._remove_tmp_path('/bad/path/dont/remove')
|
|
action_base._remove_tmp_path('/good/path/to/ansible-tmp-thing')
|
|
|
|
@patch('os.unlink')
|
|
@patch('os.fdopen')
|
|
@patch('tempfile.mkstemp')
|
|
def test_action_base__transfer_data(self, mock_mkstemp, mock_fdopen, mock_unlink):
|
|
# create our fake task
|
|
mock_task = MagicMock()
|
|
|
|
# create a mock connection, so we don't actually try and connect to things
|
|
mock_connection = MagicMock()
|
|
mock_connection.put_file.return_value = None
|
|
|
|
# we're using a real play context here
|
|
play_context = PlayContext()
|
|
|
|
# our test class
|
|
action_base = DerivedActionBase(
|
|
task=mock_task,
|
|
connection=mock_connection,
|
|
play_context=play_context,
|
|
loader=None,
|
|
templar=None,
|
|
shared_loader_obj=None,
|
|
)
|
|
|
|
mock_afd = MagicMock()
|
|
mock_afile = MagicMock()
|
|
mock_mkstemp.return_value = (mock_afd, mock_afile)
|
|
|
|
mock_unlink.return_value = None
|
|
|
|
mock_afo = MagicMock()
|
|
mock_afo.write.return_value = None
|
|
mock_afo.flush.return_value = None
|
|
mock_afo.close.return_value = None
|
|
mock_fdopen.return_value = mock_afo
|
|
|
|
self.assertEqual(action_base._transfer_data('/path/to/remote/file', 'some data'), '/path/to/remote/file')
|
|
self.assertEqual(action_base._transfer_data('/path/to/remote/file', 'some mixed data: fö〩'), '/path/to/remote/file')
|
|
self.assertEqual(action_base._transfer_data('/path/to/remote/file', dict(some_key='some value')), '/path/to/remote/file')
|
|
self.assertEqual(action_base._transfer_data('/path/to/remote/file', dict(some_key='fö〩')), '/path/to/remote/file')
|
|
|
|
mock_afo.write.side_effect = Exception()
|
|
self.assertRaises(AnsibleError, action_base._transfer_data, '/path/to/remote/file', '')
|
|
|
|
def test_action_base__execute_remote_stat(self):
|
|
# create our fake task
|
|
mock_task = MagicMock()
|
|
|
|
# create a mock connection, so we don't actually try and connect to things
|
|
mock_connection = MagicMock()
|
|
|
|
# we're using a real play context here
|
|
play_context = PlayContext()
|
|
|
|
# our test class
|
|
action_base = DerivedActionBase(
|
|
task=mock_task,
|
|
connection=mock_connection,
|
|
play_context=play_context,
|
|
loader=None,
|
|
templar=None,
|
|
shared_loader_obj=None,
|
|
)
|
|
|
|
action_base._execute_module = MagicMock()
|
|
|
|
# test normal case
|
|
action_base._execute_module.return_value = dict(stat=dict(checksum='1111111111111111111111111111111111', exists=True))
|
|
res = action_base._execute_remote_stat(path='/path/to/file', all_vars=dict(), follow=False)
|
|
self.assertEqual(res['checksum'], '1111111111111111111111111111111111')
|
|
|
|
# test does not exist
|
|
action_base._execute_module.return_value = dict(stat=dict(exists=False))
|
|
res = action_base._execute_remote_stat(path='/path/to/file', all_vars=dict(), follow=False)
|
|
self.assertFalse(res['exists'])
|
|
self.assertEqual(res['checksum'], '1')
|
|
|
|
# test no checksum in result from _execute_module
|
|
action_base._execute_module.return_value = dict(stat=dict(exists=True))
|
|
res = action_base._execute_remote_stat(path='/path/to/file', all_vars=dict(), follow=False)
|
|
self.assertTrue(res['exists'])
|
|
self.assertEqual(res['checksum'], '')
|
|
|
|
# test stat call failed
|
|
action_base._execute_module.return_value = dict(failed=True, msg="because I said so")
|
|
self.assertRaises(AnsibleError, action_base._execute_remote_stat, path='/path/to/file', all_vars=dict(), follow=False)
|
|
|
|
def test_action_base__execute_module(self):
|
|
# create our fake task
|
|
mock_task = MagicMock()
|
|
mock_task.action = 'copy'
|
|
mock_task.args = dict(a=1, b=2, c=3)
|
|
|
|
# create a mock connection, so we don't actually try and connect to things
|
|
def build_module_command(env_string, shebang, cmd, arg_path=None):
|
|
to_run = [env_string, cmd]
|
|
if arg_path:
|
|
to_run.append(arg_path)
|
|
return " ".join(to_run)
|
|
|
|
def get_option(option):
|
|
return {'admin_users': ['root', 'toor']}.get(option)
|
|
|
|
mock_connection = MagicMock()
|
|
mock_connection.build_module_command.side_effect = build_module_command
|
|
mock_connection.socket_path = None
|
|
mock_connection._shell.get_remote_filename.return_value = 'copy.py'
|
|
mock_connection._shell.join_path.side_effect = os.path.join
|
|
mock_connection._shell.tmpdir = '/var/tmp/mytempdir'
|
|
mock_connection._shell.get_option = get_option
|
|
|
|
# we're using a real play context here
|
|
play_context = PlayContext()
|
|
|
|
# our test class
|
|
action_base = DerivedActionBase(
|
|
task=mock_task,
|
|
connection=mock_connection,
|
|
play_context=play_context,
|
|
loader=None,
|
|
templar=None,
|
|
shared_loader_obj=None,
|
|
)
|
|
|
|
# fake a lot of methods as we test those elsewhere
|
|
action_base._configure_module = MagicMock()
|
|
action_base._supports_check_mode = MagicMock()
|
|
action_base._is_pipelining_enabled = MagicMock()
|
|
action_base._make_tmp_path = MagicMock()
|
|
action_base._transfer_data = MagicMock()
|
|
action_base._compute_environment_string = MagicMock()
|
|
action_base._low_level_execute_command = MagicMock()
|
|
action_base._fixup_perms2 = MagicMock()
|
|
|
|
action_base._configure_module.return_value = ('new', '#!/usr/bin/python', 'this is the module data', 'path')
|
|
action_base._is_pipelining_enabled.return_value = False
|
|
action_base._compute_environment_string.return_value = ''
|
|
action_base._connection.has_pipelining = False
|
|
action_base._make_tmp_path.return_value = '/the/tmp/path'
|
|
action_base._low_level_execute_command.return_value = dict(stdout='{"rc": 0, "stdout": "ok"}')
|
|
self.assertEqual(action_base._execute_module(module_name=None, module_args=None), dict(_ansible_parsed=True, rc=0, stdout="ok", stdout_lines=['ok']))
|
|
self.assertEqual(
|
|
action_base._execute_module(
|
|
module_name='foo',
|
|
module_args=dict(z=9, y=8, x=7),
|
|
task_vars=dict(a=1)
|
|
),
|
|
dict(
|
|
_ansible_parsed=True,
|
|
rc=0,
|
|
stdout="ok",
|
|
stdout_lines=['ok'],
|
|
)
|
|
)
|
|
|
|
# test with needing/removing a remote tmp path
|
|
action_base._configure_module.return_value = ('old', '#!/usr/bin/python', 'this is the module data', 'path')
|
|
action_base._is_pipelining_enabled.return_value = False
|
|
action_base._make_tmp_path.return_value = '/the/tmp/path'
|
|
self.assertEqual(action_base._execute_module(), dict(_ansible_parsed=True, rc=0, stdout="ok", stdout_lines=['ok']))
|
|
|
|
action_base._configure_module.return_value = ('non_native_want_json', '#!/usr/bin/python', 'this is the module data', 'path')
|
|
self.assertEqual(action_base._execute_module(), dict(_ansible_parsed=True, rc=0, stdout="ok", stdout_lines=['ok']))
|
|
|
|
play_context.become = True
|
|
play_context.become_user = 'foo'
|
|
self.assertEqual(action_base._execute_module(), dict(_ansible_parsed=True, rc=0, stdout="ok", stdout_lines=['ok']))
|
|
|
|
# test an invalid shebang return
|
|
action_base._configure_module.return_value = ('new', '', 'this is the module data', 'path')
|
|
action_base._is_pipelining_enabled.return_value = False
|
|
action_base._make_tmp_path.return_value = '/the/tmp/path'
|
|
self.assertRaises(AnsibleError, action_base._execute_module)
|
|
|
|
# test with check mode enabled, once with support for check
|
|
# mode and once with support disabled to raise an error
|
|
play_context.check_mode = True
|
|
action_base._configure_module.return_value = ('new', '#!/usr/bin/python', 'this is the module data', 'path')
|
|
self.assertEqual(action_base._execute_module(), dict(_ansible_parsed=True, rc=0, stdout="ok", stdout_lines=['ok']))
|
|
action_base._supports_check_mode = False
|
|
self.assertRaises(AnsibleError, action_base._execute_module)
|
|
|
|
def test_action_base_sudo_only_if_user_differs(self):
|
|
fake_loader = MagicMock()
|
|
fake_loader.get_basedir.return_value = os.getcwd()
|
|
play_context = PlayContext()
|
|
|
|
action_base = DerivedActionBase(None, None, play_context, fake_loader, None, None)
|
|
action_base.get_become_option = MagicMock(return_value='root')
|
|
action_base._get_remote_user = MagicMock(return_value='root')
|
|
|
|
action_base._connection = MagicMock(exec_command=MagicMock(return_value=(0, '', '')))
|
|
|
|
action_base._connection._shell = shell = MagicMock(append_command=MagicMock(return_value=('JOINED CMD')))
|
|
|
|
action_base._connection.become = become = MagicMock()
|
|
become.build_become_command.return_value = 'foo'
|
|
|
|
action_base._low_level_execute_command('ECHO', sudoable=True)
|
|
become.build_become_command.assert_not_called()
|
|
|
|
action_base._get_remote_user.return_value = 'apo'
|
|
action_base._low_level_execute_command('ECHO', sudoable=True, executable='/bin/csh')
|
|
become.build_become_command.assert_called_once_with("ECHO", shell)
|
|
|
|
become.build_become_command.reset_mock()
|
|
|
|
with patch.object(C, 'BECOME_ALLOW_SAME_USER', new=True):
|
|
action_base._get_remote_user.return_value = 'root'
|
|
action_base._low_level_execute_command('ECHO SAME', sudoable=True)
|
|
become.build_become_command.assert_called_once_with("ECHO SAME", shell)
|
|
|
|
def test__remote_expand_user_relative_pathing(self):
|
|
action_base = _action_base()
|
|
action_base._play_context.remote_addr = 'bar'
|
|
action_base._low_level_execute_command = MagicMock(return_value={'stdout': b'../home/user'})
|
|
action_base._connection._shell.join_path.return_value = '../home/user/foo'
|
|
with self.assertRaises(AnsibleError) as cm:
|
|
action_base._remote_expand_user('~/foo')
|
|
self.assertEqual(
|
|
cm.exception.message,
|
|
"'bar' returned an invalid relative home directory path containing '..'"
|
|
)
|
|
|
|
|
|
class TestActionBaseCleanReturnedData(unittest.TestCase):
|
|
def test(self):
|
|
|
|
fake_loader = DictDataLoader({
|
|
})
|
|
mock_module_loader = MagicMock()
|
|
mock_shared_loader_obj = MagicMock()
|
|
mock_shared_loader_obj.module_loader = mock_module_loader
|
|
connection_loader_paths = ['/tmp/asdfadf', '/usr/lib64/whatever',
|
|
'dfadfasf',
|
|
'foo.py',
|
|
'.*',
|
|
# FIXME: a path with parans breaks the regex
|
|
# '(.*)',
|
|
'/path/to/ansible/lib/ansible/plugins/connection/custom_connection.py',
|
|
'/path/to/ansible/lib/ansible/plugins/connection/ssh.py']
|
|
|
|
def fake_all(path_only=None):
|
|
for path in connection_loader_paths:
|
|
yield path
|
|
|
|
mock_connection_loader = MagicMock()
|
|
mock_connection_loader.all = fake_all
|
|
|
|
mock_shared_loader_obj.connection_loader = mock_connection_loader
|
|
mock_connection = MagicMock()
|
|
# mock_connection._shell.env_prefix.side_effect = env_prefix
|
|
|
|
# action_base = DerivedActionBase(mock_task, mock_connection, play_context, None, None, None)
|
|
action_base = DerivedActionBase(task=None,
|
|
connection=mock_connection,
|
|
play_context=None,
|
|
loader=fake_loader,
|
|
templar=None,
|
|
shared_loader_obj=mock_shared_loader_obj)
|
|
data = {'ansible_playbook_python': '/usr/bin/python',
|
|
# 'ansible_rsync_path': '/usr/bin/rsync',
|
|
'ansible_python_interpreter': '/usr/bin/python',
|
|
'ansible_ssh_some_var': 'whatever',
|
|
'ansible_ssh_host_key_somehost': 'some key here',
|
|
'some_other_var': 'foo bar'}
|
|
data = clean_facts(data)
|
|
self.assertNotIn('ansible_playbook_python', data)
|
|
self.assertNotIn('ansible_python_interpreter', data)
|
|
self.assertIn('ansible_ssh_host_key_somehost', data)
|
|
self.assertIn('some_other_var', data)
|
|
|
|
|
|
class TestActionBaseParseReturnedData(unittest.TestCase):
|
|
|
|
def test_fail_no_json(self):
|
|
action_base = _action_base()
|
|
rc = 0
|
|
stdout = 'foo\nbar\n'
|
|
err = 'oopsy'
|
|
returned_data = {'rc': rc,
|
|
'stdout': stdout,
|
|
'stdout_lines': stdout.splitlines(),
|
|
'stderr': err}
|
|
res = action_base._parse_returned_data(returned_data)
|
|
self.assertFalse(res['_ansible_parsed'])
|
|
self.assertTrue(res['failed'])
|
|
self.assertEqual(res['module_stderr'], err)
|
|
|
|
def test_json_empty(self):
|
|
action_base = _action_base()
|
|
rc = 0
|
|
stdout = '{}\n'
|
|
err = ''
|
|
returned_data = {'rc': rc,
|
|
'stdout': stdout,
|
|
'stdout_lines': stdout.splitlines(),
|
|
'stderr': err}
|
|
res = action_base._parse_returned_data(returned_data)
|
|
del res['_ansible_parsed'] # we always have _ansible_parsed
|
|
self.assertEqual(len(res), 0)
|
|
self.assertFalse(res)
|
|
|
|
def test_json_facts(self):
|
|
action_base = _action_base()
|
|
rc = 0
|
|
stdout = '{"ansible_facts": {"foo": "bar", "ansible_blip": "blip_value"}}\n'
|
|
err = ''
|
|
|
|
returned_data = {'rc': rc,
|
|
'stdout': stdout,
|
|
'stdout_lines': stdout.splitlines(),
|
|
'stderr': err}
|
|
res = action_base._parse_returned_data(returned_data)
|
|
self.assertTrue(res['ansible_facts'])
|
|
self.assertIn('ansible_blip', res['ansible_facts'])
|
|
# TODO: Should this be an AnsibleUnsafe?
|
|
# self.assertIsInstance(res['ansible_facts'], AnsibleUnsafe)
|
|
|
|
def test_json_facts_add_host(self):
|
|
action_base = _action_base()
|
|
rc = 0
|
|
stdout = '''{"ansible_facts": {"foo": "bar", "ansible_blip": "blip_value"},
|
|
"add_host": {"host_vars": {"some_key": ["whatever the add_host object is"]}
|
|
}
|
|
}\n'''
|
|
err = ''
|
|
|
|
returned_data = {'rc': rc,
|
|
'stdout': stdout,
|
|
'stdout_lines': stdout.splitlines(),
|
|
'stderr': err}
|
|
res = action_base._parse_returned_data(returned_data)
|
|
self.assertTrue(res['ansible_facts'])
|
|
self.assertIn('ansible_blip', res['ansible_facts'])
|
|
self.assertIn('add_host', res)
|
|
# TODO: Should this be an AnsibleUnsafe?
|
|
# self.assertIsInstance(res['ansible_facts'], AnsibleUnsafe)
|