372a29744b
Python's Exception constructor already takes a `message` as a parameter, which you can then get at by doing str(e) (e.message was deprecated). The reason I bothered to make this change was because I was debugging with pdb and I noticed that AnsibleErrors don't give useful information in pdb (probably because they don't have a __repr__ method that prints the `msg` attribute). (Pdb) c > /Users/marca/dev/git-repos/ansible/lib/ansible/runner/__init__.py(599)_executor() -> msg = str(ae) (Pdb) ae AnsibleError()
922 lines
36 KiB
Python
922 lines
36 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import traceback
|
|
import unittest
|
|
import os
|
|
import os.path
|
|
import re
|
|
import tempfile
|
|
import yaml
|
|
import passlib.hash
|
|
import string
|
|
import StringIO
|
|
import copy
|
|
|
|
from nose.plugins.skip import SkipTest
|
|
|
|
import ansible.utils
|
|
import ansible.errors
|
|
import ansible.constants as C
|
|
import ansible.utils.template as template2
|
|
from ansible.module_utils.splitter import split_args
|
|
|
|
from ansible import __version__
|
|
|
|
import sys
|
|
reload(sys)
|
|
sys.setdefaultencoding("utf8")
|
|
|
|
class TestUtils(unittest.TestCase):
|
|
|
|
def _is_fips(self):
|
|
try:
|
|
data = open('/proc/sys/crypto/fips_enabled').read().strip()
|
|
except:
|
|
return False
|
|
if data != '1':
|
|
return False
|
|
return True
|
|
|
|
def test_before_comment(self):
|
|
''' see if we can detect the part of a string before a comment. Used by INI parser in inventory '''
|
|
|
|
input = "before # comment"
|
|
expected = "before "
|
|
actual = ansible.utils.before_comment(input)
|
|
self.assertEqual(expected, actual)
|
|
|
|
input = "before \# not a comment"
|
|
expected = "before # not a comment"
|
|
actual = ansible.utils.before_comment(input)
|
|
self.assertEqual(expected, actual)
|
|
|
|
input = ""
|
|
expected = ""
|
|
actual = ansible.utils.before_comment(input)
|
|
self.assertEqual(expected, actual)
|
|
|
|
input = "#"
|
|
expected = ""
|
|
actual = ansible.utils.before_comment(input)
|
|
self.assertEqual(expected, actual)
|
|
|
|
#####################################
|
|
### check_conditional tests
|
|
|
|
def test_check_conditional_jinja2_literals(self):
|
|
# see http://jinja.pocoo.org/docs/templates/#literals
|
|
|
|
# none
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
None, '/', {}), True)
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'', '/', {}), True)
|
|
|
|
# list
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
['true'], '/', {}), True)
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
['false'], '/', {}), False)
|
|
|
|
# non basestring or list
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
{}, '/', {}), {})
|
|
|
|
# boolean
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'true', '/', {}), True)
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'false', '/', {}), False)
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'True', '/', {}), True)
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'False', '/', {}), False)
|
|
|
|
# integer
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'1', '/', {}), True)
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'0', '/', {}), False)
|
|
|
|
# string, beware, a string is truthy unless empty
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'"yes"', '/', {}), True)
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'"no"', '/', {}), True)
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'""', '/', {}), False)
|
|
|
|
|
|
def test_check_conditional_jinja2_variable_literals(self):
|
|
# see http://jinja.pocoo.org/docs/templates/#literals
|
|
|
|
# boolean
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'var', '/', {'var': 'True'}), True)
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'var', '/', {'var': 'true'}), True)
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'var', '/', {'var': 'False'}), False)
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'var', '/', {'var': 'false'}), False)
|
|
|
|
# integer
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'var', '/', {'var': '1'}), True)
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'var', '/', {'var': 1}), True)
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'var', '/', {'var': '0'}), False)
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'var', '/', {'var': 0}), False)
|
|
|
|
# string, beware, a string is truthy unless empty
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'var', '/', {'var': '"yes"'}), True)
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'var', '/', {'var': '"no"'}), True)
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'var', '/', {'var': '""'}), False)
|
|
|
|
# Python boolean in Jinja2 expression
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'var', '/', {'var': True}), True)
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'var', '/', {'var': False}), False)
|
|
|
|
|
|
def test_check_conditional_jinja2_expression(self):
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'1 == 1', '/', {}), True)
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'bar == 42', '/', {'bar': 42}), True)
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'bar != 42', '/', {'bar': 42}), False)
|
|
|
|
|
|
def test_check_conditional_jinja2_expression_in_variable(self):
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'var', '/', {'var': '1 == 1'}), True)
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'var', '/', {'var': 'bar == 42', 'bar': 42}), True)
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
'var', '/', {'var': 'bar != 42', 'bar': 42}), False)
|
|
|
|
def test_check_conditional_jinja2_unicode(self):
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
u'"\u00df"', '/', {}), True)
|
|
self.assertEqual(ansible.utils.check_conditional(
|
|
u'var == "\u00df"', '/', {'var': u'\u00df'}), True)
|
|
|
|
|
|
#####################################
|
|
### key-value parsing
|
|
|
|
def test_parse_kv_basic(self):
|
|
self.assertEqual(ansible.utils.parse_kv('a=simple b="with space" c="this=that"'),
|
|
{'a': 'simple', 'b': 'with space', 'c': 'this=that'})
|
|
self.assertEqual(ansible.utils.parse_kv('msg=АБВГД'),
|
|
{'msg': 'АБВГД'})
|
|
|
|
|
|
def test_jsonify(self):
|
|
self.assertEqual(ansible.utils.jsonify(None), '{}')
|
|
self.assertEqual(ansible.utils.jsonify(dict(foo='bar', baz=['qux'])),
|
|
'{"baz": ["qux"], "foo": "bar"}')
|
|
expected = '''{
|
|
"baz": [
|
|
"qux"
|
|
],
|
|
"foo": "bar"
|
|
}'''
|
|
self.assertEqual(ansible.utils.jsonify(dict(foo='bar', baz=['qux']), format=True), expected)
|
|
|
|
def test_is_failed(self):
|
|
self.assertEqual(ansible.utils.is_failed(dict(rc=0)), False)
|
|
self.assertEqual(ansible.utils.is_failed(dict(rc=1)), True)
|
|
self.assertEqual(ansible.utils.is_failed(dict()), False)
|
|
self.assertEqual(ansible.utils.is_failed(dict(failed=False)), False)
|
|
self.assertEqual(ansible.utils.is_failed(dict(failed=True)), True)
|
|
self.assertEqual(ansible.utils.is_failed(dict(failed='True')), True)
|
|
self.assertEqual(ansible.utils.is_failed(dict(failed='true')), True)
|
|
|
|
def test_is_changed(self):
|
|
self.assertEqual(ansible.utils.is_changed(dict()), False)
|
|
self.assertEqual(ansible.utils.is_changed(dict(changed=False)), False)
|
|
self.assertEqual(ansible.utils.is_changed(dict(changed=True)), True)
|
|
self.assertEqual(ansible.utils.is_changed(dict(changed='True')), True)
|
|
self.assertEqual(ansible.utils.is_changed(dict(changed='true')), True)
|
|
|
|
def test_path_dwim(self):
|
|
self.assertEqual(ansible.utils.path_dwim(None, __file__),
|
|
__file__)
|
|
self.assertEqual(ansible.utils.path_dwim(None, '~'),
|
|
os.path.expanduser('~'))
|
|
self.assertEqual(ansible.utils.path_dwim(None, 'TestUtils.py'),
|
|
__file__.rstrip('c'))
|
|
|
|
def test_path_dwim_relative(self):
|
|
self.assertEqual(ansible.utils.path_dwim_relative(__file__, 'units', 'TestUtils.py',
|
|
os.path.dirname(os.path.dirname(__file__))),
|
|
__file__.rstrip('c'))
|
|
|
|
def test_json_loads(self):
|
|
self.assertEqual(ansible.utils.json_loads('{"foo": "bar"}'), dict(foo='bar'))
|
|
|
|
def test_parse_json(self):
|
|
# leading junk
|
|
self.assertEqual(ansible.utils.parse_json('ansible\n{"foo": "bar"}'), dict(foo="bar"))
|
|
|
|
# No closing quotation
|
|
try:
|
|
rc = ansible.utils.parse_json('foo=bar "')
|
|
print rc
|
|
except ValueError:
|
|
pass
|
|
else:
|
|
traceback.print_exc()
|
|
raise AssertionError('Incorrect exception, expected ValueError')
|
|
|
|
# Failed to parse
|
|
try:
|
|
ansible.utils.parse_json('{')
|
|
except ValueError:
|
|
pass
|
|
else:
|
|
raise AssertionError('Incorrect exception, expected ValueError')
|
|
|
|
def test_parse_yaml(self):
|
|
#json
|
|
self.assertEqual(ansible.utils.parse_yaml('{"foo": "bar"}'), dict(foo='bar'))
|
|
|
|
# broken json
|
|
try:
|
|
ansible.utils.parse_yaml('{')
|
|
except ansible.errors.AnsibleError:
|
|
pass
|
|
else:
|
|
raise AssertionError
|
|
|
|
# broken json with path_hint
|
|
try:
|
|
ansible.utils.parse_yaml('{', path_hint='foo')
|
|
except ansible.errors.AnsibleError:
|
|
pass
|
|
else:
|
|
raise AssertionError
|
|
|
|
# yaml with front-matter
|
|
self.assertEqual(ansible.utils.parse_yaml("---\nfoo: bar"), dict(foo='bar'))
|
|
# yaml no front-matter
|
|
self.assertEqual(ansible.utils.parse_yaml('foo: bar'), dict(foo='bar'))
|
|
# yaml indented first line (See #6348)
|
|
self.assertEqual(ansible.utils.parse_yaml(' - foo: bar\n baz: qux'), [dict(foo='bar', baz='qux')])
|
|
|
|
def test_process_common_errors(self):
|
|
# no quote
|
|
self.assertTrue('YAML thought it' in ansible.utils.process_common_errors('', 'foo: {{bar}}', 6))
|
|
|
|
# extra colon
|
|
self.assertTrue('an extra unquoted colon' in ansible.utils.process_common_errors('', 'foo: bar:', 8))
|
|
|
|
# match
|
|
self.assertTrue('same kind of quote' in ansible.utils.process_common_errors('', 'foo: "{{bar}}"baz', 6))
|
|
self.assertTrue('same kind of quote' in ansible.utils.process_common_errors('', "foo: '{{bar}}'baz", 6))
|
|
|
|
# unbalanced
|
|
self.assertTrue('We could be wrong' in ansible.utils.process_common_errors('', 'foo: "bad" "wolf"', 6))
|
|
self.assertTrue('We could be wrong' in ansible.utils.process_common_errors('', "foo: 'bad' 'wolf'", 6))
|
|
|
|
|
|
def test_process_yaml_error(self):
|
|
data = 'foo: bar\n baz: qux'
|
|
try:
|
|
ansible.utils.parse_yaml(data)
|
|
except yaml.YAMLError, exc:
|
|
try:
|
|
ansible.utils.process_yaml_error(exc, data, __file__)
|
|
except ansible.errors.AnsibleYAMLValidationFailed, e:
|
|
self.assertTrue('Syntax Error while loading' in str(e))
|
|
else:
|
|
raise AssertionError('Incorrect exception, expected AnsibleYAMLValidationFailed')
|
|
|
|
data = 'foo: bar\n baz: {{qux}}'
|
|
try:
|
|
ansible.utils.parse_yaml(data)
|
|
except yaml.YAMLError, exc:
|
|
try:
|
|
ansible.utils.process_yaml_error(exc, data, __file__)
|
|
except ansible.errors.AnsibleYAMLValidationFailed, e:
|
|
self.assertTrue('Syntax Error while loading' in str(e))
|
|
else:
|
|
raise AssertionError('Incorrect exception, expected AnsibleYAMLValidationFailed')
|
|
|
|
data = '\xFF'
|
|
try:
|
|
ansible.utils.parse_yaml(data)
|
|
except yaml.YAMLError, exc:
|
|
try:
|
|
ansible.utils.process_yaml_error(exc, data, __file__)
|
|
except ansible.errors.AnsibleYAMLValidationFailed, e:
|
|
self.assertTrue('Check over' in str(e))
|
|
else:
|
|
raise AssertionError('Incorrect exception, expected AnsibleYAMLValidationFailed')
|
|
|
|
data = '\xFF'
|
|
try:
|
|
ansible.utils.parse_yaml(data)
|
|
except yaml.YAMLError, exc:
|
|
try:
|
|
ansible.utils.process_yaml_error(exc, data, None)
|
|
except ansible.errors.AnsibleYAMLValidationFailed, e:
|
|
self.assertTrue('Could not parse YAML.' in str(e))
|
|
else:
|
|
raise AssertionError('Incorrect exception, expected AnsibleYAMLValidationFailed')
|
|
|
|
def test_parse_yaml_from_file(self):
|
|
test = os.path.join(os.path.dirname(__file__), 'inventory_test_data',
|
|
'common_vars.yml')
|
|
encrypted = os.path.join(os.path.dirname(__file__), 'inventory_test_data',
|
|
'encrypted.yml')
|
|
broken = os.path.join(os.path.dirname(__file__), 'inventory_test_data',
|
|
'broken.yml')
|
|
|
|
try:
|
|
ansible.utils.parse_yaml_from_file(os.path.dirname(__file__))
|
|
except ansible.errors.AnsibleError:
|
|
pass
|
|
else:
|
|
raise AssertionError('Incorrect exception, expected AnsibleError')
|
|
|
|
self.assertEqual(ansible.utils.parse_yaml_from_file(test), yaml.safe_load(open(test)))
|
|
|
|
self.assertEqual(ansible.utils.parse_yaml_from_file(encrypted, 'ansible'), dict(foo='bar'))
|
|
|
|
try:
|
|
ansible.utils.parse_yaml_from_file(broken)
|
|
except ansible.errors.AnsibleYAMLValidationFailed, e:
|
|
self.assertTrue('Syntax Error while loading' in str(e))
|
|
else:
|
|
raise AssertionError('Incorrect exception, expected AnsibleYAMLValidationFailed')
|
|
|
|
def test_merge_hash(self):
|
|
self.assertEqual(ansible.utils.merge_hash(dict(foo='bar', baz='qux'), dict(foo='baz')),
|
|
dict(foo='baz', baz='qux'))
|
|
self.assertEqual(ansible.utils.merge_hash(dict(foo=dict(bar='baz')), dict(foo=dict(bar='qux'))),
|
|
dict(foo=dict(bar='qux')))
|
|
|
|
def test_md5s(self):
|
|
if self._is_fips():
|
|
raise SkipTest('MD5 unavailable on FIPs enabled systems')
|
|
self.assertEqual(ansible.utils.md5s('ansible'), '640c8a5376aa12fa15cf02130ce239a6')
|
|
# Need a test that causes UnicodeEncodeError See 4221
|
|
|
|
def test_md5(self):
|
|
if self._is_fips():
|
|
raise SkipTest('MD5 unavailable on FIPs enabled systems')
|
|
self.assertEqual(ansible.utils.md5(os.path.join(os.path.dirname(__file__), 'ansible.cfg')),
|
|
'fb7b5b90ea63f04bde33e804b6fad42c')
|
|
self.assertEqual(ansible.utils.md5(os.path.join(os.path.dirname(__file__), 'ansible.cf')),
|
|
None)
|
|
|
|
def test_checksum_s(self):
|
|
self.assertEqual(ansible.utils.checksum_s('ansible'), 'bef45157a43c9e5f469d188810814a4a8ab9f2ed')
|
|
# Need a test that causes UnicodeEncodeError See 4221
|
|
|
|
def test_checksum(self):
|
|
self.assertEqual(ansible.utils.checksum(os.path.join(os.path.dirname(__file__), 'ansible.cfg')),
|
|
'658b67c8ac7595adde7048425ff1f9aba270721a')
|
|
self.assertEqual(ansible.utils.checksum(os.path.join(os.path.dirname(__file__), 'ansible.cf')),
|
|
None)
|
|
|
|
def test_default(self):
|
|
self.assertEqual(ansible.utils.default(None, lambda: {}), {})
|
|
self.assertEqual(ansible.utils.default(dict(foo='bar'), lambda: {}), dict(foo='bar'))
|
|
|
|
def test__gitinfo(self):
|
|
# this fails if not run from git clone
|
|
# self.assertEqual('last updated' in ansible.utils._gitinfo())
|
|
# missing test for git submodule
|
|
# missing test outside of git clone
|
|
pass
|
|
|
|
def test_version(self):
|
|
version = ansible.utils.version('ansible')
|
|
self.assertTrue(version.startswith('ansible %s' % __version__))
|
|
# this fails if not run from git clone
|
|
# self.assertEqual('last updated' in version)
|
|
|
|
def test_getch(self):
|
|
# figure out how to test this
|
|
pass
|
|
|
|
def test_sanitize_output(self):
|
|
self.assertEqual(ansible.utils.sanitize_output('password=foo'), 'password=VALUE_HIDDEN')
|
|
self.assertEqual(ansible.utils.sanitize_output('foo=user:pass@foo/whatever'),
|
|
'foo=user:********@foo/whatever')
|
|
self.assertEqual(ansible.utils.sanitize_output('foo=http://username:pass@wherever/foo'),
|
|
'foo=http://username:********@wherever/foo')
|
|
self.assertEqual(ansible.utils.sanitize_output('foo=http://wherever/foo'),
|
|
'foo=http://wherever/foo')
|
|
|
|
def test_increment_debug(self):
|
|
ansible.utils.VERBOSITY = 0
|
|
ansible.utils.increment_debug(None, None, None, None)
|
|
self.assertEqual(ansible.utils.VERBOSITY, 1)
|
|
|
|
def test_base_parser(self):
|
|
output = ansible.utils.base_parser(output_opts=True)
|
|
self.assertTrue(output.has_option('--one-line') and output.has_option('--tree'))
|
|
|
|
runas = ansible.utils.base_parser(runas_opts=True)
|
|
for opt in ['--sudo', '--sudo-user', '--user', '--su', '--su-user']:
|
|
self.assertTrue(runas.has_option(opt))
|
|
|
|
async = ansible.utils.base_parser(async_opts=True)
|
|
self.assertTrue(async.has_option('--poll') and async.has_option('--background'))
|
|
|
|
connect = ansible.utils.base_parser(connect_opts=True)
|
|
self.assertTrue(connect.has_option('--connection'))
|
|
|
|
subset = ansible.utils.base_parser(subset_opts=True)
|
|
self.assertTrue(subset.has_option('--limit'))
|
|
|
|
check = ansible.utils.base_parser(check_opts=True)
|
|
self.assertTrue(check.has_option('--check'))
|
|
|
|
diff = ansible.utils.base_parser(diff_opts=True)
|
|
self.assertTrue(diff.has_option('--diff'))
|
|
|
|
def test_do_encrypt(self):
|
|
salt_chars = string.ascii_letters + string.digits + './'
|
|
salt = ansible.utils.random_password(length=8, chars=salt_chars)
|
|
hash = ansible.utils.do_encrypt('ansible', 'sha256_crypt', salt=salt)
|
|
self.assertTrue(passlib.hash.sha256_crypt.verify('ansible', hash))
|
|
|
|
hash = ansible.utils.do_encrypt('ansible', 'sha256_crypt')
|
|
self.assertTrue(passlib.hash.sha256_crypt.verify('ansible', hash))
|
|
|
|
try:
|
|
ansible.utils.do_encrypt('ansible', 'ansible')
|
|
except ansible.errors.AnsibleError:
|
|
pass
|
|
else:
|
|
raise AssertionError('Incorrect exception, expected AnsibleError')
|
|
|
|
def test_do_encrypt_md5(self):
|
|
if self._is_fips():
|
|
raise SkipTest('MD5 unavailable on FIPS systems')
|
|
hash = ansible.utils.do_encrypt('ansible', 'md5_crypt', salt_size=4)
|
|
self.assertTrue(passlib.hash.md5_crypt.verify('ansible', hash))
|
|
|
|
def test_last_non_blank_line(self):
|
|
self.assertEqual(ansible.utils.last_non_blank_line('a\n\nb\n\nc'), 'c')
|
|
self.assertEqual(ansible.utils.last_non_blank_line(''), '')
|
|
|
|
def test_filter_leading_non_json_lines(self):
|
|
self.assertEqual(ansible.utils.filter_leading_non_json_lines('a\nb\nansible!\n{"foo": "bar"}'),
|
|
'{"foo": "bar"}\n')
|
|
self.assertEqual(ansible.utils.filter_leading_non_json_lines('a\nb\nansible!\n["foo", "bar"]'),
|
|
'["foo", "bar"]\n')
|
|
|
|
def test_boolean(self):
|
|
self.assertEqual(ansible.utils.boolean("true"), True)
|
|
self.assertEqual(ansible.utils.boolean("True"), True)
|
|
self.assertEqual(ansible.utils.boolean("TRUE"), True)
|
|
self.assertEqual(ansible.utils.boolean("t"), True)
|
|
self.assertEqual(ansible.utils.boolean("T"), True)
|
|
self.assertEqual(ansible.utils.boolean("Y"), True)
|
|
self.assertEqual(ansible.utils.boolean("y"), True)
|
|
self.assertEqual(ansible.utils.boolean("1"), True)
|
|
self.assertEqual(ansible.utils.boolean(1), True)
|
|
self.assertEqual(ansible.utils.boolean("false"), False)
|
|
self.assertEqual(ansible.utils.boolean("False"), False)
|
|
self.assertEqual(ansible.utils.boolean("0"), False)
|
|
self.assertEqual(ansible.utils.boolean(0), False)
|
|
self.assertEqual(ansible.utils.boolean("foo"), False)
|
|
|
|
def test_make_sudo_cmd(self):
|
|
cmd = ansible.utils.make_sudo_cmd(C.DEFAULT_SUDO_EXE, 'root', '/bin/sh', '/bin/ls')
|
|
self.assertTrue(isinstance(cmd, tuple))
|
|
self.assertEqual(len(cmd), 3)
|
|
self.assertTrue('-u root' in cmd[0])
|
|
self.assertTrue('-p "[sudo via ansible, key=' in cmd[0] and cmd[1].startswith('[sudo via ansible, key'))
|
|
self.assertTrue('echo SUDO-SUCCESS-' in cmd[0] and cmd[2].startswith('SUDO-SUCCESS-'))
|
|
self.assertTrue('sudo -k' in cmd[0])
|
|
|
|
def test_make_su_cmd(self):
|
|
cmd = ansible.utils.make_su_cmd('root', '/bin/sh', '/bin/ls')
|
|
self.assertTrue(isinstance(cmd, tuple))
|
|
self.assertEqual(len(cmd), 3)
|
|
self.assertTrue('root -c "/bin/sh' in cmd[0] or ' root -c /bin/sh' in cmd[0])
|
|
self.assertTrue('echo SUDO-SUCCESS-' in cmd[0] and cmd[2].startswith('SUDO-SUCCESS-'))
|
|
|
|
def test_to_unicode(self):
|
|
uni = ansible.utils.to_unicode(u'ansible')
|
|
self.assertTrue(isinstance(uni, unicode))
|
|
self.assertEqual(uni, u'ansible')
|
|
|
|
none = ansible.utils.to_unicode(None)
|
|
self.assertTrue(isinstance(none, type(None)))
|
|
self.assertTrue(none is None)
|
|
|
|
utf8 = ansible.utils.to_unicode('ansible')
|
|
self.assertTrue(isinstance(utf8, unicode))
|
|
self.assertEqual(utf8, u'ansible')
|
|
|
|
def test_is_list_of_strings(self):
|
|
self.assertEqual(ansible.utils.is_list_of_strings(['foo', 'bar', u'baz']), True)
|
|
self.assertEqual(ansible.utils.is_list_of_strings(['foo', 'bar', True]), False)
|
|
self.assertEqual(ansible.utils.is_list_of_strings(['one', 2, 'three']), False)
|
|
|
|
def test_contains_vars(self):
|
|
self.assertTrue(ansible.utils.contains_vars('{{foo}}'))
|
|
self.assertTrue(ansible.utils.contains_vars('$foo'))
|
|
self.assertFalse(ansible.utils.contains_vars('foo'))
|
|
|
|
def test_safe_eval(self):
|
|
# Not basestring
|
|
self.assertEqual(ansible.utils.safe_eval(len), len)
|
|
self.assertEqual(ansible.utils.safe_eval(1), 1)
|
|
self.assertEqual(ansible.utils.safe_eval(len, include_exceptions=True), (len, None))
|
|
self.assertEqual(ansible.utils.safe_eval(1, include_exceptions=True), (1, None))
|
|
|
|
# module
|
|
self.assertEqual(ansible.utils.safe_eval('foo.bar('), 'foo.bar(')
|
|
self.assertEqual(ansible.utils.safe_eval('foo.bar(', include_exceptions=True), ('foo.bar(', None))
|
|
|
|
# import
|
|
self.assertEqual(ansible.utils.safe_eval('import foo'), 'import foo')
|
|
self.assertEqual(ansible.utils.safe_eval('import foo', include_exceptions=True), ('import foo', None))
|
|
|
|
# valid simple eval
|
|
self.assertEqual(ansible.utils.safe_eval('True'), True)
|
|
self.assertEqual(ansible.utils.safe_eval('True', include_exceptions=True), (True, None))
|
|
|
|
# valid eval with lookup
|
|
self.assertEqual(ansible.utils.safe_eval('foo + bar', dict(foo=1, bar=2)), 3)
|
|
self.assertEqual(ansible.utils.safe_eval('foo + bar', dict(foo=1, bar=2), include_exceptions=True), (3, None))
|
|
|
|
# invalid eval
|
|
self.assertEqual(ansible.utils.safe_eval('foo'), 'foo')
|
|
nameerror = ansible.utils.safe_eval('foo', include_exceptions=True)
|
|
self.assertTrue(isinstance(nameerror, tuple))
|
|
self.assertEqual(nameerror[0], 'foo')
|
|
self.assertTrue(isinstance(nameerror[1], NameError))
|
|
|
|
def test_listify_lookup_plugin_terms(self):
|
|
basedir = os.path.dirname(__file__)
|
|
# Straight lookups
|
|
#self.assertEqual(ansible.utils.listify_lookup_plugin_terms('things', basedir, dict(things=[])), [])
|
|
#self.assertEqual(ansible.utils.listify_lookup_plugin_terms('things', basedir, dict(things=['one', 'two'])), ['one', 'two'])
|
|
|
|
def test_deprecated(self):
|
|
sys_stderr = sys.stderr
|
|
sys.stderr = StringIO.StringIO()
|
|
ansible.utils.deprecated('Ack!', '0.0')
|
|
out = sys.stderr.getvalue()
|
|
self.assertTrue('0.0' in out)
|
|
self.assertTrue('[DEPRECATION WARNING]' in out)
|
|
|
|
sys.stderr = StringIO.StringIO()
|
|
ansible.utils.deprecated('Ack!', None)
|
|
out = sys.stderr.getvalue()
|
|
self.assertTrue('0.0' not in out)
|
|
self.assertTrue('[DEPRECATION WARNING]' in out)
|
|
|
|
sys.stderr = StringIO.StringIO()
|
|
warnings = C.DEPRECATION_WARNINGS
|
|
C.DEPRECATION_WARNINGS = False
|
|
ansible.utils.deprecated('Ack!', None)
|
|
out = sys.stderr.getvalue()
|
|
self.assertTrue(not out)
|
|
C.DEPRECATION_WARNINGS = warnings
|
|
|
|
sys.stderr = sys_stderr
|
|
|
|
try:
|
|
ansible.utils.deprecated('Ack!', '0.0', True)
|
|
except ansible.errors.AnsibleError, e:
|
|
self.assertTrue('0.0' not in str(e))
|
|
self.assertTrue('[DEPRECATED]' in str(e))
|
|
else:
|
|
raise AssertionError("Incorrect exception, expected AnsibleError")
|
|
|
|
def test_warning(self):
|
|
sys_stderr = sys.stderr
|
|
sys.stderr = StringIO.StringIO()
|
|
ansible.utils.warning('ANSIBLE')
|
|
out = sys.stderr.getvalue()
|
|
sys.stderr = sys_stderr
|
|
self.assertTrue('[WARNING]: ANSIBLE' in out)
|
|
|
|
def test_combine_vars(self):
|
|
one = {'foo': {'bar': True}, 'baz': {'one': 'qux'}}
|
|
two = {'baz': {'two': 'qux'}}
|
|
replace = {'baz': {'two': 'qux'}, 'foo': {'bar': True}}
|
|
merge = {'baz': {'two': 'qux', 'one': 'qux'}, 'foo': {'bar': True}}
|
|
|
|
C.DEFAULT_HASH_BEHAVIOUR = 'replace'
|
|
self.assertEqual(ansible.utils.combine_vars(one, two), replace)
|
|
|
|
C.DEFAULT_HASH_BEHAVIOUR = 'merge'
|
|
self.assertEqual(ansible.utils.combine_vars(one, two), merge)
|
|
|
|
def test_err(self):
|
|
sys_stderr = sys.stderr
|
|
sys.stderr = StringIO.StringIO()
|
|
ansible.utils.err('ANSIBLE')
|
|
out = sys.stderr.getvalue()
|
|
sys.stderr = sys_stderr
|
|
self.assertEqual(out, 'ANSIBLE\n')
|
|
|
|
def test_exit(self):
|
|
sys_stderr = sys.stderr
|
|
sys.stderr = StringIO.StringIO()
|
|
try:
|
|
ansible.utils.exit('ansible')
|
|
except SystemExit, e:
|
|
self.assertEqual(e.code, 1)
|
|
self.assertEqual(sys.stderr.getvalue(), 'ansible\n')
|
|
else:
|
|
raise AssertionError('Incorrect exception, expected SystemExit')
|
|
finally:
|
|
sys.stderr = sys_stderr
|
|
|
|
def test_unfrackpath(self):
|
|
os.environ['TEST_ROOT'] = os.path.dirname(os.path.dirname(__file__))
|
|
self.assertEqual(ansible.utils.unfrackpath('$TEST_ROOT/units/../units/TestUtils.py'), __file__.rstrip('c'))
|
|
|
|
def test_is_executable(self):
|
|
self.assertEqual(ansible.utils.is_executable(__file__), 0)
|
|
|
|
bin_ansible = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
|
|
'bin', 'ansible')
|
|
self.assertNotEqual(ansible.utils.is_executable(bin_ansible), 0)
|
|
|
|
def test_get_diff(self):
|
|
standard = dict(
|
|
before_header='foo',
|
|
after_header='bar',
|
|
before='fooo',
|
|
after='foo'
|
|
)
|
|
|
|
standard_expected = """--- before: foo
|
|
+++ after: bar
|
|
@@ -1 +1 @@
|
|
-fooo+foo"""
|
|
|
|
# workaround py26 and py27 difflib differences
|
|
standard_expected = """-fooo+foo"""
|
|
diff = ansible.utils.get_diff(standard)
|
|
diff = diff.split('\n')
|
|
del diff[0]
|
|
del diff[0]
|
|
del diff[0]
|
|
diff = '\n'.join(diff)
|
|
self.assertEqual(diff, unicode(standard_expected))
|
|
|
|
def test_split_args(self):
|
|
# split_args is a smarter shlex.split for the needs of the way ansible uses it
|
|
|
|
def _split_info(input, desired, actual):
|
|
print "SENT: ", input
|
|
print "WANT: ", desired
|
|
print "GOT: ", actual
|
|
|
|
def _test_combo(input, desired):
|
|
actual = split_args(input)
|
|
_split_info(input, desired, actual)
|
|
assert actual == desired
|
|
|
|
# trivial splitting
|
|
_test_combo('a b=c d=f', ['a', 'b=c', 'd=f' ])
|
|
|
|
# mixed quotes
|
|
_test_combo('a b=\'c\' d="e" f=\'g\'', ['a', "b='c'", 'd="e"', "f='g'" ])
|
|
|
|
# with spaces
|
|
# FIXME: this fails, commenting out only for now
|
|
# _test_combo('a "\'one two three\'"', ['a', "'one two three'" ])
|
|
|
|
# TODO: ...
|
|
# jinja2 preservation
|
|
_test_combo('a {{ y }} z', ['a', '{{ y }}', 'z' ])
|
|
|
|
# jinja2 preservation with spaces and filters and other hard things
|
|
_test_combo(
|
|
'a {{ x | filter(\'moo\', \'param\') }} z {{ chicken }} "waffles"',
|
|
['a', "{{ x | filter('moo', 'param') }}", 'z', '{{ chicken }}', '"waffles"']
|
|
)
|
|
|
|
# invalid quote detection
|
|
self.assertRaises(Exception, split_args, 'hey I started a quote"')
|
|
self.assertRaises(Exception, split_args, 'hey I started a\' quote')
|
|
|
|
# jinja2 loop blocks with lots of complexity
|
|
_test_combo(
|
|
# in memory of neighbors cat
|
|
# we preserve line breaks unless a line continuation character precedes them
|
|
'a {% if x %} y {%else %} {{meow}} {% endif %} "cookie\nchip" \\\ndone\nand done',
|
|
['a', '{% if x %}', 'y', '{%else %}', '{{meow}}', '{% endif %}', '"cookie\nchip"', 'done\n', 'and', 'done']
|
|
)
|
|
|
|
# test space preservation within quotes
|
|
_test_combo(
|
|
'content="1 2 3 4 " foo=bar',
|
|
['content="1 2 3 4 "', 'foo=bar']
|
|
)
|
|
|
|
# invalid jinja2 nesting detection
|
|
# invalid quote nesting detection
|
|
|
|
def test_clean_data(self):
|
|
# clean data removes jinja2 tags from data
|
|
self.assertEqual(
|
|
ansible.utils._clean_data('this is a normal string', from_remote=True),
|
|
'this is a normal string'
|
|
)
|
|
self.assertEqual(
|
|
ansible.utils._clean_data('this string has a {{variable}}', from_remote=True),
|
|
'this string has a {#variable#}'
|
|
)
|
|
self.assertEqual(
|
|
ansible.utils._clean_data('this string {{has}} two {{variables}} in it', from_remote=True),
|
|
'this string {#has#} two {#variables#} in it'
|
|
)
|
|
self.assertEqual(
|
|
ansible.utils._clean_data('this string has a {{variable with a\nnewline}}', from_remote=True),
|
|
'this string has a {#variable with a\nnewline#}'
|
|
)
|
|
self.assertEqual(
|
|
ansible.utils._clean_data('this string is from inventory {{variable}}', from_inventory=True),
|
|
'this string is from inventory {{variable}}'
|
|
)
|
|
self.assertEqual(
|
|
ansible.utils._clean_data('this string is from inventory too but uses lookup {{lookup("foo","bar")}}', from_inventory=True),
|
|
'this string is from inventory too but uses lookup {#lookup("foo","bar")#}'
|
|
)
|
|
self.assertEqual(
|
|
ansible.utils._clean_data('this string has JSON in it: {"foo":{"bar":{"baz":"oops"}}}', from_remote=True),
|
|
'this string has JSON in it: {"foo":{"bar":{"baz":"oops"}}}'
|
|
)
|
|
self.assertEqual(
|
|
ansible.utils._clean_data('this string contains unicode: ¢ £ ¤ ¥', from_remote=True),
|
|
'this string contains unicode: ¢ £ ¤ ¥'
|
|
)
|
|
|
|
|
|
def test_censor_unlogged_data(self):
|
|
''' used by the no_log attribute '''
|
|
input = dict(
|
|
password='sekrit',
|
|
rc=12,
|
|
failed=True,
|
|
changed=False,
|
|
skipped=True,
|
|
msg='moo',
|
|
)
|
|
data = ansible.utils.censor_unlogged_data(input)
|
|
assert 'password' not in data
|
|
assert 'rc' in data
|
|
assert 'failed' in data
|
|
assert 'changed' in data
|
|
assert 'skipped' in data
|
|
assert 'msg' not in data
|
|
assert data['censored'] == 'results hidden due to no_log parameter'
|
|
|
|
def test_repo_url_to_role_name(self):
|
|
tests = [("http://git.example.com/repos/repo.git", "repo"),
|
|
("ssh://git@git.example.com:repos/role-name", "role-name"),
|
|
("ssh://git@git.example.com:repos/role-name,v0.1", "role-name"),
|
|
("directory/role/is/installed/in", "directory/role/is/installed/in")]
|
|
for (url, result) in tests:
|
|
self.assertEqual(ansible.utils.repo_url_to_role_name(url), result)
|
|
|
|
def test_role_spec_parse(self):
|
|
tests = [
|
|
(
|
|
"git+http://git.example.com/repos/repo.git,v1.0",
|
|
{
|
|
'scm': 'git',
|
|
'src': 'http://git.example.com/repos/repo.git',
|
|
'version': 'v1.0',
|
|
'name': 'repo'
|
|
}
|
|
),
|
|
(
|
|
"http://repo.example.com/download/tarfile.tar.gz",
|
|
{
|
|
'scm': None,
|
|
'src': 'http://repo.example.com/download/tarfile.tar.gz',
|
|
'version': '',
|
|
'name': 'tarfile'
|
|
}
|
|
),
|
|
(
|
|
"http://repo.example.com/download/tarfile.tar.gz,,nicename",
|
|
{
|
|
'scm': None,
|
|
'src': 'http://repo.example.com/download/tarfile.tar.gz',
|
|
'version': '',
|
|
'name': 'nicename'
|
|
}
|
|
),
|
|
(
|
|
"git+http://git.example.com/repos/repo.git,v1.0,awesome",
|
|
{
|
|
'scm': 'git',
|
|
'src': 'http://git.example.com/repos/repo.git',
|
|
'version': 'v1.0',
|
|
'name': 'awesome'
|
|
}
|
|
),
|
|
(
|
|
# test that http://github URLs are assumed git+http:// unless they end in .tar.gz
|
|
"http://github.com/ansible/fakerole/fake",
|
|
{
|
|
'scm' : 'git',
|
|
'src' : 'http://github.com/ansible/fakerole/fake',
|
|
'version' : 'master',
|
|
'name' : 'fake'
|
|
}
|
|
),
|
|
(
|
|
# test that http://github URLs are assumed git+http:// unless they end in .tar.gz
|
|
"http://github.com/ansible/fakerole/fake/archive/master.tar.gz",
|
|
{
|
|
'scm' : None,
|
|
'src' : 'http://github.com/ansible/fakerole/fake/archive/master.tar.gz',
|
|
'version' : '',
|
|
'name' : 'master'
|
|
}
|
|
)
|
|
]
|
|
for (spec, result) in tests:
|
|
self.assertEqual(ansible.utils.role_spec_parse(spec), result)
|
|
|
|
def test_role_yaml_parse(self):
|
|
tests = (
|
|
(
|
|
# Old style
|
|
{
|
|
'role': 'debops.elasticsearch',
|
|
'name': 'elks'
|
|
},
|
|
{
|
|
'role': 'debops.elasticsearch',
|
|
'name': 'elks',
|
|
'scm': None,
|
|
'src': 'debops.elasticsearch',
|
|
'version': '',
|
|
}
|
|
),
|
|
(
|
|
{
|
|
'role': 'debops.elasticsearch,1.0,elks',
|
|
'my_param': 'foo'
|
|
},
|
|
{
|
|
'role': 'debops.elasticsearch,1.0,elks',
|
|
'name': 'elks',
|
|
'scm': None,
|
|
'src': 'debops.elasticsearch',
|
|
'version': '1.0',
|
|
'my_param': 'foo',
|
|
}
|
|
),
|
|
(
|
|
{
|
|
'role': 'debops.elasticsearch,1.0',
|
|
'my_param': 'foo'
|
|
},
|
|
{
|
|
'role': 'debops.elasticsearch,1.0',
|
|
'name': 'debops.elasticsearch',
|
|
'scm': None,
|
|
'src': 'debops.elasticsearch',
|
|
'version': '1.0',
|
|
'my_param': 'foo',
|
|
}
|
|
),
|
|
# New style
|
|
(
|
|
{
|
|
'src': 'debops.elasticsearch',
|
|
'name': 'elks',
|
|
'my_param': 'foo'
|
|
},
|
|
{
|
|
'name': 'elks',
|
|
'scm': None,
|
|
'src': 'debops.elasticsearch',
|
|
'version': '',
|
|
'my_param': 'foo'
|
|
}
|
|
),
|
|
)
|
|
|
|
for (role, result) in tests:
|
|
self.assertEqual(ansible.utils.role_yaml_parse(role), result)
|
|
|