# -*- coding: utf-8 -*-

import traceback
import unittest
import os
import os.path
import re
import tempfile
import yaml
import passlib.hash
import string
import StringIO
import copy
import tempfile
import shutil

from nose.plugins.skip import SkipTest
from mock import patch

import ansible.utils
import ansible.errors
import ansible.constants as C
import ansible.utils.template as template2
from ansible.module_utils.splitter import split_args

from ansible import __version__

import sys
reload(sys)
sys.setdefaultencoding("utf8") 

class TestUtils(unittest.TestCase):

    def _is_fips(self):
        try:
            data = open('/proc/sys/crypto/fips_enabled').read().strip()
        except:
            return False
        if data != '1':
            return False
        return True

    def test_before_comment(self):
        ''' see if we can detect the part of a string before a comment.  Used by INI parser in inventory '''

        input    = "before # comment"
        expected = "before "
        actual   = ansible.utils.before_comment(input)
        self.assertEqual(expected, actual)

        input    = "before \# not a comment"
        expected = "before # not a comment"
        actual  =  ansible.utils.before_comment(input)
        self.assertEqual(expected, actual)

        input = ""
        expected = ""
        actual = ansible.utils.before_comment(input)
        self.assertEqual(expected, actual)

        input = "#"
        expected = ""
        actual = ansible.utils.before_comment(input)
        self.assertEqual(expected, actual)

    #####################################
    ### check_conditional tests

    def test_check_conditional_jinja2_literals(self):
        # see http://jinja.pocoo.org/docs/templates/#literals

        # none
        self.assertEqual(ansible.utils.check_conditional(
            None, '/', {}), True)
        self.assertEqual(ansible.utils.check_conditional(
            '', '/', {}), True)

        # list
        self.assertEqual(ansible.utils.check_conditional(
            ['true'], '/', {}), True)
        self.assertEqual(ansible.utils.check_conditional(
            ['false'], '/', {}), False)

        # non basestring or list
        self.assertEqual(ansible.utils.check_conditional(
            {}, '/', {}), {})

        # boolean
        self.assertEqual(ansible.utils.check_conditional(
            'true', '/', {}), True)
        self.assertEqual(ansible.utils.check_conditional(
            'false', '/', {}), False)
        self.assertEqual(ansible.utils.check_conditional(
            'True', '/', {}), True)
        self.assertEqual(ansible.utils.check_conditional(
            'False', '/', {}), False)

        # integer
        self.assertEqual(ansible.utils.check_conditional(
            '1', '/', {}), True)
        self.assertEqual(ansible.utils.check_conditional(
            '0', '/', {}), False)

        # string, beware, a string is truthy unless empty
        self.assertEqual(ansible.utils.check_conditional(
            '"yes"', '/', {}), True)
        self.assertEqual(ansible.utils.check_conditional(
            '"no"', '/', {}), True)
        self.assertEqual(ansible.utils.check_conditional(
            '""', '/', {}), False)


    def test_check_conditional_jinja2_variable_literals(self):
        # see http://jinja.pocoo.org/docs/templates/#literals

        # boolean
        self.assertEqual(ansible.utils.check_conditional(
            'var', '/', {'var': 'True'}), True)
        self.assertEqual(ansible.utils.check_conditional(
            'var', '/', {'var': 'true'}), True)
        self.assertEqual(ansible.utils.check_conditional(
            'var', '/', {'var': 'False'}), False)
        self.assertEqual(ansible.utils.check_conditional(
            'var', '/', {'var': 'false'}), False)

        # integer
        self.assertEqual(ansible.utils.check_conditional(
            'var', '/', {'var': '1'}), True)
        self.assertEqual(ansible.utils.check_conditional(
            'var', '/', {'var': 1}), True)
        self.assertEqual(ansible.utils.check_conditional(
            'var', '/', {'var': '0'}), False)
        self.assertEqual(ansible.utils.check_conditional(
            'var', '/', {'var': 0}), False)

        # string, beware, a string is truthy unless empty
        self.assertEqual(ansible.utils.check_conditional(
            'var', '/', {'var': '"yes"'}), True)
        self.assertEqual(ansible.utils.check_conditional(
            'var', '/', {'var': '"no"'}), True)
        self.assertEqual(ansible.utils.check_conditional(
            'var', '/', {'var': '""'}), False)

        # Python boolean in Jinja2 expression
        self.assertEqual(ansible.utils.check_conditional(
            'var', '/', {'var': True}), True)
        self.assertEqual(ansible.utils.check_conditional(
            'var', '/', {'var': False}), False)


    def test_check_conditional_jinja2_expression(self):
        self.assertEqual(ansible.utils.check_conditional(
            '1 == 1', '/', {}), True)
        self.assertEqual(ansible.utils.check_conditional(
            'bar == 42', '/', {'bar': 42}), True)
        self.assertEqual(ansible.utils.check_conditional(
            'bar != 42', '/', {'bar': 42}), False)


    def test_check_conditional_jinja2_expression_in_variable(self):
        self.assertEqual(ansible.utils.check_conditional(
            'var', '/', {'var': '1 == 1'}), True)
        self.assertEqual(ansible.utils.check_conditional(
            'var', '/', {'var': 'bar == 42', 'bar': 42}), True)
        self.assertEqual(ansible.utils.check_conditional(
            'var', '/', {'var': 'bar != 42', 'bar': 42}), False)

    def test_check_conditional_jinja2_unicode(self):
        self.assertEqual(ansible.utils.check_conditional(
            u'"\u00df"', '/', {}), True)
        self.assertEqual(ansible.utils.check_conditional(
            u'var == "\u00df"', '/', {'var': u'\u00df'}), True)


    #####################################
    ### key-value parsing

    def test_parse_kv_basic(self):
        self.assertEqual(ansible.utils.parse_kv('a=simple b="with space" c="this=that"'),
                {'a': 'simple', 'b': 'with space', 'c': 'this=that'})
        self.assertEqual(ansible.utils.parse_kv('msg=АБВГД'),
                {'msg': 'АБВГД'})


    def test_jsonify(self):
        self.assertEqual(ansible.utils.jsonify(None), '{}')
        self.assertEqual(ansible.utils.jsonify(dict(foo='bar', baz=['qux'])), '{"baz": ["qux"], "foo": "bar"}')
        expected = u'{"baz":["qux"],"foo":"bar"}'
        self.assertEqual("".join(ansible.utils.jsonify(dict(foo='bar', baz=['qux']), format=True).split()), expected)

    def test_is_failed(self):
        self.assertEqual(ansible.utils.is_failed(dict(rc=0)), False)
        self.assertEqual(ansible.utils.is_failed(dict(rc=1)), True)
        self.assertEqual(ansible.utils.is_failed(dict()), False)
        self.assertEqual(ansible.utils.is_failed(dict(failed=False)), False)
        self.assertEqual(ansible.utils.is_failed(dict(failed=True)), True)
        self.assertEqual(ansible.utils.is_failed(dict(failed='True')), True)
        self.assertEqual(ansible.utils.is_failed(dict(failed='true')), True)

    def test_is_changed(self):
        self.assertEqual(ansible.utils.is_changed(dict()), False)
        self.assertEqual(ansible.utils.is_changed(dict(changed=False)), False)
        self.assertEqual(ansible.utils.is_changed(dict(changed=True)), True)
        self.assertEqual(ansible.utils.is_changed(dict(changed='True')), True)
        self.assertEqual(ansible.utils.is_changed(dict(changed='true')), True)

    def test_path_dwim(self):
        self.assertEqual(ansible.utils.path_dwim(None, __file__),
               __file__)
        self.assertEqual(ansible.utils.path_dwim(None, '~'),
               os.path.expanduser('~'))
        self.assertEqual(ansible.utils.path_dwim(None, 'TestUtils.py'),
               __file__.rstrip('c'))

    def test_path_dwim_relative(self):
        self.assertEqual(ansible.utils.path_dwim_relative(__file__, 'units', 'TestUtils.py',
                                                          os.path.dirname(os.path.dirname(__file__))),
               __file__.rstrip('c'))

    def test_json_loads(self):
        self.assertEqual(ansible.utils.json_loads('{"foo": "bar"}'), dict(foo='bar'))

    def test_parse_json(self):
        # leading junk
        self.assertEqual(ansible.utils.parse_json('ansible\n{"foo": "bar"}'), dict(foo="bar"))

        # No closing quotation
        try:
            rc = ansible.utils.parse_json('foo=bar "')
            print rc
        except ValueError:
            pass
        else:
            traceback.print_exc()
            raise AssertionError('Incorrect exception, expected ValueError')

        # Failed to parse
        try:
            ansible.utils.parse_json('{')
        except ValueError:
            pass
        else:
            raise AssertionError('Incorrect exception, expected ValueError')

    def test_parse_yaml(self):
        #json
        self.assertEqual(ansible.utils.parse_yaml('{"foo": "bar"}'), dict(foo='bar'))

        # broken json
        try:
            ansible.utils.parse_yaml('{')
        except ansible.errors.AnsibleError:
            pass
        else:
            raise AssertionError

        # broken json with path_hint
        try:
            ansible.utils.parse_yaml('{', path_hint='foo')
        except ansible.errors.AnsibleError:
            pass
        else:
            raise AssertionError

        # yaml with front-matter
        self.assertEqual(ansible.utils.parse_yaml("---\nfoo: bar"), dict(foo='bar'))
        # yaml no front-matter
        self.assertEqual(ansible.utils.parse_yaml('foo: bar'), dict(foo='bar'))
        # yaml indented first line (See #6348)
        self.assertEqual(ansible.utils.parse_yaml(' - foo: bar\n   baz: qux'), [dict(foo='bar', baz='qux')])

    def test_process_common_errors(self):
        # no quote
        self.assertTrue('YAML thought it' in ansible.utils.process_common_errors('', 'foo: {{bar}}', 6))

        # extra colon
        self.assertTrue('an extra unquoted colon' in ansible.utils.process_common_errors('', 'foo: bar:', 8))

        # match
        self.assertTrue('same kind of quote' in ansible.utils.process_common_errors('', 'foo: "{{bar}}"baz', 6))
        self.assertTrue('same kind of quote' in ansible.utils.process_common_errors('', "foo: '{{bar}}'baz", 6))

        # unbalanced
        self.assertTrue('We could be wrong' in ansible.utils.process_common_errors('', 'foo: "bad" "wolf"', 6))
        self.assertTrue('We could be wrong' in ansible.utils.process_common_errors('', "foo: 'bad' 'wolf'", 6))


    def test_process_yaml_error(self):
        data = 'foo: bar\n baz: qux'
        try:
            ansible.utils.parse_yaml(data)
        except yaml.YAMLError, exc:
            try:
                ansible.utils.process_yaml_error(exc, data, __file__)
            except ansible.errors.AnsibleYAMLValidationFailed, e:
                self.assertTrue('Syntax Error while loading' in str(e))
            else:
                raise AssertionError('Incorrect exception, expected AnsibleYAMLValidationFailed')

        data = 'foo: bar\n baz: {{qux}}'
        try:
            ansible.utils.parse_yaml(data)
        except yaml.YAMLError, exc:
            try:
                ansible.utils.process_yaml_error(exc, data, __file__)
            except ansible.errors.AnsibleYAMLValidationFailed, e:
                self.assertTrue('Syntax Error while loading' in str(e))
            else:
                raise AssertionError('Incorrect exception, expected AnsibleYAMLValidationFailed')

        data = '\xFF'
        try:
            ansible.utils.parse_yaml(data)
        except yaml.YAMLError, exc:
            try:
                ansible.utils.process_yaml_error(exc, data, __file__)
            except ansible.errors.AnsibleYAMLValidationFailed, e:
                self.assertTrue('Check over' in str(e))
            else:
                raise AssertionError('Incorrect exception, expected AnsibleYAMLValidationFailed')

        data = '\xFF'
        try:
            ansible.utils.parse_yaml(data)
        except yaml.YAMLError, exc:
            try:
                ansible.utils.process_yaml_error(exc, data, None)
            except ansible.errors.AnsibleYAMLValidationFailed, e:
                self.assertTrue('Could not parse YAML.' in str(e))
            else:
                raise AssertionError('Incorrect exception, expected AnsibleYAMLValidationFailed')

    def test_parse_yaml_from_file(self):
        test = os.path.join(os.path.dirname(__file__), 'inventory_test_data',
                            'common_vars.yml')
        encrypted = os.path.join(os.path.dirname(__file__), 'inventory_test_data',
                                 'encrypted.yml')
        broken = os.path.join(os.path.dirname(__file__), 'inventory_test_data',
                              'broken.yml')

        try:
            ansible.utils.parse_yaml_from_file(os.path.dirname(__file__))
        except ansible.errors.AnsibleError:
            pass
        else:
            raise AssertionError('Incorrect exception, expected AnsibleError')

        self.assertEqual(ansible.utils.parse_yaml_from_file(test), yaml.safe_load(open(test)))

        self.assertEqual(ansible.utils.parse_yaml_from_file(encrypted, 'ansible'), dict(foo='bar'))

        try:
            ansible.utils.parse_yaml_from_file(broken)
        except ansible.errors.AnsibleYAMLValidationFailed, e:
            self.assertTrue('Syntax Error while loading' in str(e))
        else:
            raise AssertionError('Incorrect exception, expected AnsibleYAMLValidationFailed')

    def test_merge_hash(self):
        self.assertEqual(ansible.utils.merge_hash(dict(foo='bar', baz='qux'), dict(foo='baz')),
               dict(foo='baz', baz='qux'))
        self.assertEqual(ansible.utils.merge_hash(dict(foo=dict(bar='baz')), dict(foo=dict(bar='qux'))),
               dict(foo=dict(bar='qux')))

    def test_md5s(self):
        if self._is_fips():
            raise SkipTest('MD5 unavailable on FIPs enabled systems')
        self.assertEqual(ansible.utils.md5s('ansible'), '640c8a5376aa12fa15cf02130ce239a6')
        # Need a test that causes UnicodeEncodeError See 4221

    def test_md5(self):
        if self._is_fips():
            raise SkipTest('MD5 unavailable on FIPs enabled systems')
        self.assertEqual(ansible.utils.md5(os.path.join(os.path.dirname(__file__), 'ansible.cfg')),
                         'fb7b5b90ea63f04bde33e804b6fad42c')
        self.assertEqual(ansible.utils.md5(os.path.join(os.path.dirname(__file__), 'ansible.cf')),
                         None)

    def test_checksum_s(self):
        self.assertEqual(ansible.utils.checksum_s('ansible'), 'bef45157a43c9e5f469d188810814a4a8ab9f2ed')
        # Need a test that causes UnicodeEncodeError See 4221

    def test_checksum(self):
        self.assertEqual(ansible.utils.checksum(os.path.join(os.path.dirname(__file__), 'ansible.cfg')),
                         '658b67c8ac7595adde7048425ff1f9aba270721a')
        self.assertEqual(ansible.utils.checksum(os.path.join(os.path.dirname(__file__), 'ansible.cf')),
                         None)

    def test_default(self):
        self.assertEqual(ansible.utils.default(None, lambda: {}), {})
        self.assertEqual(ansible.utils.default(dict(foo='bar'), lambda: {}), dict(foo='bar'))

    def test__gitinfo(self):
        # this fails if not run from git clone
        # self.assertEqual('last updated' in ansible.utils._gitinfo())
        # missing test for git submodule
        # missing test outside of git clone
        pass

    def test_version(self):
        version = ansible.utils.version('ansible')
        self.assertTrue(version.startswith('ansible %s' % __version__))
        # this fails if not run from git clone
        # self.assertEqual('last updated' in version)

    def test_getch(self):
        # figure out how to test this
        pass

    def test_sanitize_output(self):
        self.assertEqual(ansible.utils.sanitize_output('password=foo'), 'password=VALUE_HIDDEN')
        self.assertEqual(ansible.utils.sanitize_output('foo=user:pass@foo/whatever'),
                         'foo=user:********@foo/whatever')
        self.assertEqual(ansible.utils.sanitize_output('foo=http://username:pass@wherever/foo'),
                         'foo=http://username:********@wherever/foo')
        self.assertEqual(ansible.utils.sanitize_output('foo=http://wherever/foo'),
                         'foo=http://wherever/foo')

    def test_increment_debug(self):
        ansible.utils.VERBOSITY = 0
        ansible.utils.increment_debug(None, None, None, None)
        self.assertEqual(ansible.utils.VERBOSITY, 1)

    def test_base_parser(self):
        output = ansible.utils.base_parser(output_opts=True)
        self.assertTrue(output.has_option('--one-line') and output.has_option('--tree'))

        runas = ansible.utils.base_parser(runas_opts=True)
        for opt in ['--sudo', '--sudo-user', '--user', '--su', '--su-user']:
            self.assertTrue(runas.has_option(opt))

        async = ansible.utils.base_parser(async_opts=True)
        self.assertTrue(async.has_option('--poll') and async.has_option('--background'))

        connect = ansible.utils.base_parser(connect_opts=True)
        self.assertTrue(connect.has_option('--connection'))

        subset = ansible.utils.base_parser(subset_opts=True)
        self.assertTrue(subset.has_option('--limit'))

        check = ansible.utils.base_parser(check_opts=True)
        self.assertTrue(check.has_option('--check'))

        diff = ansible.utils.base_parser(diff_opts=True)
        self.assertTrue(diff.has_option('--diff'))

    def test_do_encrypt(self):
        salt_chars = string.ascii_letters + string.digits + './'
        salt = ansible.utils.random_password(length=8, chars=salt_chars)
        hash = ansible.utils.do_encrypt('ansible', 'sha256_crypt', salt=salt)
        self.assertTrue(passlib.hash.sha256_crypt.verify('ansible', hash))

        hash = ansible.utils.do_encrypt('ansible', 'sha256_crypt')
        self.assertTrue(passlib.hash.sha256_crypt.verify('ansible', hash))

        try:
            ansible.utils.do_encrypt('ansible', 'ansible')
        except ansible.errors.AnsibleError:
            pass
        else:
            raise AssertionError('Incorrect exception, expected AnsibleError')

    def test_do_encrypt_md5(self):
        if self._is_fips():
            raise SkipTest('MD5 unavailable on FIPS systems')
        hash = ansible.utils.do_encrypt('ansible', 'md5_crypt', salt_size=4)
        self.assertTrue(passlib.hash.md5_crypt.verify('ansible', hash))

    def test_last_non_blank_line(self):
        self.assertEqual(ansible.utils.last_non_blank_line('a\n\nb\n\nc'), 'c')
        self.assertEqual(ansible.utils.last_non_blank_line(''), '')

    def test_filter_leading_non_json_lines(self):
        self.assertEqual(ansible.utils.filter_leading_non_json_lines('a\nb\nansible!\n{"foo": "bar"}'),
                         '{"foo": "bar"}\n')
        self.assertEqual(ansible.utils.filter_leading_non_json_lines('a\nb\nansible!\n["foo", "bar"]'),
                         '["foo", "bar"]\n')

    def test_boolean(self):
        self.assertEqual(ansible.utils.boolean("true"), True)
        self.assertEqual(ansible.utils.boolean("True"), True)
        self.assertEqual(ansible.utils.boolean("TRUE"), True)
        self.assertEqual(ansible.utils.boolean("t"), True)
        self.assertEqual(ansible.utils.boolean("T"), True)
        self.assertEqual(ansible.utils.boolean("Y"), True)
        self.assertEqual(ansible.utils.boolean("y"), True)
        self.assertEqual(ansible.utils.boolean("1"), True)
        self.assertEqual(ansible.utils.boolean(1), True)
        self.assertEqual(ansible.utils.boolean("false"), False)
        self.assertEqual(ansible.utils.boolean("False"), False)
        self.assertEqual(ansible.utils.boolean("0"), False)
        self.assertEqual(ansible.utils.boolean(0), False)
        self.assertEqual(ansible.utils.boolean("foo"), False)

    def test_make_sudo_cmd(self):
        cmd = ansible.utils.make_sudo_cmd(C.DEFAULT_SUDO_EXE, 'root', '/bin/sh', '/bin/ls')
        self.assertTrue(isinstance(cmd, tuple))
        self.assertEqual(len(cmd), 3)
        self.assertTrue('-u root' in cmd[0])
        self.assertTrue('-p "[sudo via ansible, key=' in cmd[0] and cmd[1].startswith('[sudo via ansible, key'))
        self.assertTrue('echo BECOME-SUCCESS-' in cmd[0] and cmd[2].startswith('BECOME-SUCCESS-'))
        self.assertTrue('sudo -k' in cmd[0])

    def test_make_su_cmd(self):
        cmd = ansible.utils.make_su_cmd('root', '/bin/sh', '/bin/ls')
        self.assertTrue(isinstance(cmd, tuple))
        self.assertEqual(len(cmd), 3)
        self.assertTrue('root -c "/bin/sh' in cmd[0] or ' root -c /bin/sh' in cmd[0])
        self.assertTrue('echo BECOME-SUCCESS-' in cmd[0] and cmd[2].startswith('BECOME-SUCCESS-'))

    def test_to_unicode(self):
        uni = ansible.utils.unicode.to_unicode(u'ansible')
        self.assertTrue(isinstance(uni, unicode))
        self.assertEqual(uni, u'ansible')

        none = ansible.utils.unicode.to_unicode(None, nonstring='passthru')
        self.assertTrue(isinstance(none, type(None)))
        self.assertTrue(none is None)

        utf8 = ansible.utils.unicode.to_unicode('ansible')
        self.assertTrue(isinstance(utf8, unicode))
        self.assertEqual(utf8, u'ansible')

    def test_is_list_of_strings(self):
        self.assertEqual(ansible.utils.is_list_of_strings(['foo', 'bar', u'baz']), True)
        self.assertEqual(ansible.utils.is_list_of_strings(['foo', 'bar', True]), False)
        self.assertEqual(ansible.utils.is_list_of_strings(['one', 2, 'three']), False)

    def test_contains_vars(self):
        self.assertTrue(ansible.utils.contains_vars('{{foo}}'))
        self.assertTrue(ansible.utils.contains_vars('$foo'))
        self.assertFalse(ansible.utils.contains_vars('foo'))

    def test_safe_eval(self):
        # Not basestring
        self.assertEqual(ansible.utils.safe_eval(len), len)
        self.assertEqual(ansible.utils.safe_eval(1), 1)
        self.assertEqual(ansible.utils.safe_eval(len, include_exceptions=True), (len, None))
        self.assertEqual(ansible.utils.safe_eval(1, include_exceptions=True), (1, None))

        # module
        self.assertEqual(ansible.utils.safe_eval('foo.bar('), 'foo.bar(')
        self.assertEqual(ansible.utils.safe_eval('foo.bar(', include_exceptions=True), ('foo.bar(', None))

        # import
        self.assertEqual(ansible.utils.safe_eval('import foo'), 'import foo')
        self.assertEqual(ansible.utils.safe_eval('import foo', include_exceptions=True), ('import foo', None))

        # valid simple eval
        self.assertEqual(ansible.utils.safe_eval('True'), True)
        self.assertEqual(ansible.utils.safe_eval('True', include_exceptions=True), (True, None))

        # valid eval with lookup
        self.assertEqual(ansible.utils.safe_eval('foo + bar', dict(foo=1, bar=2)), 3)
        self.assertEqual(ansible.utils.safe_eval('foo + bar', dict(foo=1, bar=2), include_exceptions=True), (3, None))

        # invalid eval
        self.assertEqual(ansible.utils.safe_eval('foo'), 'foo')
        nameerror = ansible.utils.safe_eval('foo', include_exceptions=True)
        self.assertTrue(isinstance(nameerror, tuple))
        self.assertEqual(nameerror[0], 'foo')
        self.assertTrue(isinstance(nameerror[1], NameError))

    def test_listify_lookup_plugin_terms(self):
        basedir = os.path.dirname(__file__)
        # Straight lookups
        #self.assertEqual(ansible.utils.listify_lookup_plugin_terms('things', basedir, dict(things=[])), [])
        #self.assertEqual(ansible.utils.listify_lookup_plugin_terms('things', basedir, dict(things=['one', 'two'])), ['one', 'two'])

    def test_deprecated(self):
        sys_stderr = sys.stderr
        sys.stderr = StringIO.StringIO()
        ansible.utils.deprecated('Ack!', '0.0')
        out = sys.stderr.getvalue()
        self.assertTrue('0.0' in out)
        self.assertTrue('[DEPRECATION WARNING]' in out)

        sys.stderr = StringIO.StringIO()
        ansible.utils.deprecated('Ack!', None)
        out = sys.stderr.getvalue()
        self.assertTrue('0.0' not in out)
        self.assertTrue('[DEPRECATION WARNING]' in out)

        sys.stderr = StringIO.StringIO()
        warnings = C.DEPRECATION_WARNINGS
        C.DEPRECATION_WARNINGS = False
        ansible.utils.deprecated('Ack!', None)
        out = sys.stderr.getvalue()
        self.assertTrue(not out)
        C.DEPRECATION_WARNINGS = warnings

        sys.stderr = sys_stderr

        try:
            ansible.utils.deprecated('Ack!', '0.0', True)
        except ansible.errors.AnsibleError, e:
            self.assertTrue('0.0' not in str(e))
            self.assertTrue('[DEPRECATED]' in str(e))
        else:
            raise AssertionError("Incorrect exception, expected AnsibleError")

    def test_warning(self):
        sys_stderr = sys.stderr
        sys.stderr = StringIO.StringIO()
        ansible.utils.warning('ANSIBLE')
        out = sys.stderr.getvalue()
        sys.stderr = sys_stderr
        self.assertTrue('[WARNING]: ANSIBLE' in out)

    def test_combine_vars(self):
        one = {'foo': {'bar': True}, 'baz': {'one': 'qux'}}
        two = {'baz': {'two': 'qux'}}
        replace = {'baz': {'two': 'qux'}, 'foo': {'bar': True}}
        merge = {'baz': {'two': 'qux', 'one': 'qux'}, 'foo': {'bar': True}}

        C.DEFAULT_HASH_BEHAVIOUR = 'replace'
        self.assertEqual(ansible.utils.combine_vars(one, two), replace)

        C.DEFAULT_HASH_BEHAVIOUR = 'merge'
        self.assertEqual(ansible.utils.combine_vars(one, two), merge)

    def test_err(self):
        sys_stderr = sys.stderr
        sys.stderr = StringIO.StringIO()
        ansible.utils.err('ANSIBLE')
        out = sys.stderr.getvalue()
        sys.stderr = sys_stderr
        self.assertEqual(out, 'ANSIBLE\n')

    def test_exit(self):
        sys_stderr = sys.stderr
        sys.stderr = StringIO.StringIO()
        try:
            ansible.utils.exit('ansible')
        except SystemExit, e:
            self.assertEqual(e.code, 1)
            self.assertEqual(sys.stderr.getvalue(), 'ansible\n')
        else:
            raise AssertionError('Incorrect exception, expected SystemExit')
        finally:
            sys.stderr = sys_stderr

    def test_unfrackpath(self):
        os.environ['TEST_ROOT'] = os.path.dirname(os.path.dirname(__file__))
        self.assertEqual(ansible.utils.unfrackpath('$TEST_ROOT/units/../units/TestUtils.py'), __file__.rstrip('c'))

    def test_is_executable(self):
        self.assertEqual(ansible.utils.is_executable(__file__), 0)

        bin_ansible = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
                                   'bin', 'ansible')
        self.assertNotEqual(ansible.utils.is_executable(bin_ansible), 0)

    def test_get_diff(self):
        standard = dict(
            before_header='foo',
            after_header='bar',
            before='fooo',
            after='foo'
        )

        standard_expected = """--- before: foo
+++ after: bar
@@ -1 +1 @@
-fooo+foo"""

        # workaround py26 and py27 difflib differences        
        standard_expected = """-fooo+foo"""
        diff = ansible.utils.get_diff(standard)
        diff = diff.split('\n')
        del diff[0]
        del diff[0]
        del diff[0]
        diff = '\n'.join(diff)
        self.assertEqual(diff, unicode(standard_expected))

    def test_split_args(self):
        # split_args is a smarter shlex.split for the needs of the way ansible uses it

        def _split_info(input, desired, actual):
            print "SENT: ", input
            print "WANT: ", desired 
            print "GOT: ", actual

        def _test_combo(input, desired):
            actual = split_args(input)
            _split_info(input, desired, actual)
            assert actual == desired

        # trivial splitting
        _test_combo('a b=c d=f',                   ['a', 'b=c', 'd=f' ])

        # mixed quotes
        _test_combo('a b=\'c\' d="e" f=\'g\'',     ['a', "b='c'", 'd="e"', "f='g'" ])

        # with spaces
        # FIXME: this fails, commenting out only for now
        # _test_combo('a "\'one two three\'"',     ['a', "'one two three'" ])

        # TODO: ...
        # jinja2 preservation
        _test_combo('a {{ y }} z',                   ['a', '{{ y }}', 'z' ])

        # jinja2 preservation with spaces and filters and other hard things
        _test_combo(
            'a {{ x | filter(\'moo\', \'param\') }} z {{ chicken }} "waffles"', 
            ['a', "{{ x | filter('moo', 'param') }}", 'z', '{{ chicken }}', '"waffles"']
        )

        # invalid quote detection
        self.assertRaises(Exception, split_args, 'hey I started a quote"')
        self.assertRaises(Exception, split_args, 'hey I started a\' quote')

        # jinja2 loop blocks with lots of complexity
        _test_combo(
            # in memory of neighbors cat
            # we preserve line breaks unless a line continuation character precedes them
            'a {% if x %} y {%else %} {{meow}} {% endif %} "cookie\nchip" \\\ndone\nand done',
            ['a', '{% if x %}', 'y', '{%else %}', '{{meow}}', '{% endif %}', '"cookie\nchip"', 'done\n', 'and', 'done']
        )

        # test space preservation within quotes
        _test_combo(
            'content="1 2  3   4    "  foo=bar',
            ['content="1 2  3   4    "', 'foo=bar']
        )

        # invalid jinja2 nesting detection
        # invalid quote nesting detection
    
    def test_clean_data(self):
        # clean data removes jinja2 tags from data
        self.assertEqual(
            ansible.utils._clean_data('this is a normal string', from_remote=True),
            'this is a normal string'
        )
        self.assertEqual(
            ansible.utils._clean_data('this string has a {{variable}}', from_remote=True),
            'this string has a {#variable#}'
        )
        self.assertEqual(
            ansible.utils._clean_data('this string {{has}} two {{variables}} in it', from_remote=True),
            'this string {#has#} two {#variables#} in it'
        )
        self.assertEqual(
            ansible.utils._clean_data('this string has a {{variable with a\nnewline}}', from_remote=True),
            'this string has a {#variable with a\nnewline#}'
        )
        self.assertEqual(
            ansible.utils._clean_data('this string is from inventory {{variable}}', from_inventory=True),
            'this string is from inventory {{variable}}'
        )
        self.assertEqual(
            ansible.utils._clean_data('this string is from inventory too but uses lookup {{lookup("foo","bar")}}', from_inventory=True),
            'this string is from inventory too but uses lookup {#lookup("foo","bar")#}'
        )
        self.assertEqual(
            ansible.utils._clean_data('this string has JSON in it: {"foo":{"bar":{"baz":"oops"}}}', from_remote=True),
            'this string has JSON in it: {"foo":{"bar":{"baz":"oops"}}}'
        )
        self.assertEqual(
            ansible.utils._clean_data('this string contains unicode: ¢ £ ¤ ¥', from_remote=True),
            'this string contains unicode: ¢ £ ¤ ¥'
        )


    def test_censor_unlogged_data(self):
        ''' used by the no_log attribute '''
        input = dict(
             password='sekrit',
             rc=12,
             failed=True,
             changed=False,
             skipped=True,
             msg='moo',
        )
        data = ansible.utils.censor_unlogged_data(input)
        assert 'password' not in data
        assert 'rc' in data
        assert 'failed' in data
        assert 'changed' in data
        assert 'skipped' in data
        assert 'msg' not in data
        assert data['censored'] == 'results hidden due to no_log parameter'

    def test_repo_url_to_role_name(self):
        tests = [("http://git.example.com/repos/repo.git", "repo"),
                 ("ssh://git@git.example.com:repos/role-name", "role-name"),
                 ("ssh://git@git.example.com:repos/role-name,v0.1", "role-name"),
                 ("directory/role/is/installed/in", "directory/role/is/installed/in")]
        for (url, result) in tests:
            self.assertEqual(ansible.utils.repo_url_to_role_name(url), result)

    def test_role_spec_parse(self):
        tests = [
            (
                "git+http://git.example.com/repos/repo.git,v1.0", 
                {
                    'scm': 'git', 
                    'src': 'http://git.example.com/repos/repo.git', 
                    'version': 'v1.0', 
                    'name': 'repo'
                }
            ),
            (
                "http://repo.example.com/download/tarfile.tar.gz", 
                {
                    'scm': None, 
                    'src': 'http://repo.example.com/download/tarfile.tar.gz', 
                    'version': '', 
                    'name': 'tarfile'
                }
            ),
            (
                "http://repo.example.com/download/tarfile.tar.gz,,nicename", 
                {
                    'scm': None, 
                    'src': 'http://repo.example.com/download/tarfile.tar.gz', 
                    'version': '', 
                    'name': 'nicename'
                }
            ),
            (
                "git+http://git.example.com/repos/repo.git,v1.0,awesome", 
                {
                    'scm': 'git', 
                    'src': 'http://git.example.com/repos/repo.git', 
                    'version': 'v1.0', 
                    'name': 'awesome'
                }
            ),
            (
                # test that http://github URLs are assumed git+http:// unless they end in .tar.gz
                "http://github.com/ansible/fakerole/fake",
                {
                    'scm' : 'git',
                    'src' : 'http://github.com/ansible/fakerole/fake',
                    'version' : 'master', 
                    'name' : 'fake'
                }
            ),
            (
                # test that http://github URLs are assumed git+http:// unless they end in .tar.gz
                "http://github.com/ansible/fakerole/fake/archive/master.tar.gz",
                {
                    'scm' : None,
                    'src' : 'http://github.com/ansible/fakerole/fake/archive/master.tar.gz',
                    'version' : '', 
                    'name' : 'master'
                }
            )
            ]
        for (spec, result) in tests:
            self.assertEqual(ansible.utils.role_spec_parse(spec), result)

    def test_role_yaml_parse(self):
        tests = (
                (
                    # Old style
                    {
                        'role': 'debops.elasticsearch',
                        'name': 'elks'
                    },
                    {
                        'role': 'debops.elasticsearch',
                        'name': 'elks',
                        'scm': None,
                        'src': 'debops.elasticsearch',
                        'version': '',
                    }
                ),
                (
                    {
                        'role': 'debops.elasticsearch,1.0,elks',
                        'my_param': 'foo'
                    },
                    {
                        'role': 'debops.elasticsearch,1.0,elks',
                        'name': 'elks',
                        'scm': None,
                        'src': 'debops.elasticsearch',
                        'version': '1.0',
                        'my_param': 'foo',
                    }
                ),
                (
                    {
                        'role': 'debops.elasticsearch,1.0',
                        'my_param': 'foo'
                    },
                    {
                        'role': 'debops.elasticsearch,1.0',
                        'name': 'debops.elasticsearch',
                        'scm': None,
                        'src': 'debops.elasticsearch',
                        'version': '1.0',
                        'my_param': 'foo',
                    }
                ),
                # New style
                (
                    {
                        'src': 'debops.elasticsearch',
                        'name': 'elks',
                        'my_param': 'foo'
                    },
                    {
                        'name': 'elks',
                        'scm': None,
                        'src': 'debops.elasticsearch',
                        'version': '',
                        'my_param': 'foo'
                    }
                ),
            )

        for (role, result) in tests:
            self.assertEqual(ansible.utils.role_yaml_parse(role), result)

    @patch('ansible.utils.plugins.module_finder._get_paths')
    def test_find_plugin(self, mock_get_paths):

        tmp_path = tempfile.mkdtemp()
        mock_get_paths.return_value = [tmp_path,]
        right_module_1 = 'module.py'
        right_module_2 = 'module_without_extension'
        wrong_module_1 = 'folder'
        wrong_module_2 = 'inexistent'
        path_right_module_1 = os.path.join(tmp_path, right_module_1)
        path_right_module_2 = os.path.join(tmp_path, right_module_2)
        path_wrong_module_1 = os.path.join(tmp_path, wrong_module_1)
        open(path_right_module_1, 'w').close()
        open(path_right_module_2, 'w').close()
        os.mkdir(path_wrong_module_1)

        self.assertEqual(ansible.utils.plugins.module_finder.find_plugin(right_module_1),
                         path_right_module_1)
        self.assertEqual(ansible.utils.plugins.module_finder.find_plugin(right_module_2),
                         path_right_module_2)
        self.assertEqual(ansible.utils.plugins.module_finder.find_plugin(wrong_module_1),
                         None)
        self.assertEqual(ansible.utils.plugins.module_finder.find_plugin(wrong_module_2),
                         None)

        shutil.rmtree(tmp_path)