keep unsafe .. unsafe (#23742)

* keep unsafe .. unsafe

fixes #23734, which was broken in previous fix that allowed non string types to be templated
use new 'is_template' function vs bastardizing others
refactored clean_data to allow for arbitrary data structures to clean
fixed/removed some tests

* deal with complex data for is_template

* typos
This commit is contained in:
Brian Coca 2017-04-21 16:07:38 -04:00 committed by GitHub
parent 3358abcf49
commit 4594bee65a
6 changed files with 87 additions and 64 deletions

View file

@ -139,7 +139,7 @@ class Conditional:
if conditional in all_vars and VALID_VAR_REGEX.match(conditional):
conditional = all_vars[conditional]
if templar._clean_data(conditional) != conditional:
if templar.is_template(conditional):
display.warning('when statements should not include jinja2 '
'templating delimiters such as {{ }} or {%% %%}. '
'Found: %s' % conditional)

View file

@ -289,7 +289,7 @@ def load_list_of_tasks(ds, play, block=None, role=None, task_include=None, use_h
needs_templating = False
for param in ir.args:
if templar._contains_vars(ir.args[param]):
if not templar.templatable(ir.args[param]):
if not templar.is_template(ir.args[param]):
needs_templating = True
break
is_static = C.DEFAULT_TASK_INCLUDES_STATIC or \

View file

@ -49,6 +49,7 @@ from ansible.plugins import filter_loader, lookup_loader, test_loader
from ansible.template.safe_eval import safe_eval
from ansible.template.template import AnsibleJ2Template
from ansible.template.vars import AnsibleJ2Vars
from ansible.vars.unsafe_proxy import UnsafeProxy, wrap_var
try:
from __main__ import display
@ -311,45 +312,64 @@ class Templar:
return jinja_exts
def _clean_data(self, orig_data):
''' remove jinja2 template tags from a string '''
''' remove jinja2 template tags from data '''
if not isinstance(orig_data, string_types) or hasattr(orig_data, '__ENCRYPTED__'):
return orig_data
if hasattr(orig_data, '__ENCRYPTED__'):
ret = orig_data
with contextlib.closing(StringIO(orig_data)) as data:
# these variables keep track of opening block locations, as we only
# want to replace matched pairs of print/block tags
print_openings = []
block_openings = []
for mo in self._clean_regex.finditer(orig_data):
token = mo.group(0)
token_start = mo.start(0)
elif isinstance(orig_data, list):
clean_list = []
for list_item in orig_data:
clean_list.append(self._clean_data(list_item))
ret = clean_list
if token[0] == self.environment.variable_start_string[0]:
if token == self.environment.block_start_string:
block_openings.append(token_start)
elif token == self.environment.variable_start_string:
print_openings.append(token_start)
elif isinstance(orig_data, dict):
clean_dict = {}
for k in orig_data:
clean_dict[self._clean_data(k)] = self._clean_data(orig_data[k])
ret = clean_dict
elif token[1] == self.environment.variable_end_string[1]:
prev_idx = None
if token == self.environment.block_end_string and block_openings:
prev_idx = block_openings.pop()
elif token == self.environment.variable_end_string and print_openings:
prev_idx = print_openings.pop()
elif isinstance(orig_data, string_types):
# This will error with str data (needs unicode), but all strings should already be converted already.
# If you get exception, the problem is at the data origin, do not add to_text here.
with contextlib.closing(StringIO(orig_data)) as data:
# these variables keep track of opening block locations, as we only
# want to replace matched pairs of print/block tags
print_openings = []
block_openings = []
for mo in self._clean_regex.finditer(orig_data):
token = mo.group(0)
token_start = mo.start(0)
if prev_idx is not None:
# replace the opening
data.seek(prev_idx, os.SEEK_SET)
data.write(to_text(self.environment.comment_start_string))
# replace the closing
data.seek(token_start, os.SEEK_SET)
data.write(to_text(self.environment.comment_end_string))
if token[0] == self.environment.variable_start_string[0]:
if token == self.environment.block_start_string:
block_openings.append(token_start)
elif token == self.environment.variable_start_string:
print_openings.append(token_start)
else:
raise AnsibleError("Error while cleaning data for safety: unhandled regex match")
elif token[1] == self.environment.variable_end_string[1]:
prev_idx = None
if token == self.environment.block_end_string and block_openings:
prev_idx = block_openings.pop()
elif token == self.environment.variable_end_string and print_openings:
prev_idx = print_openings.pop()
return data.getvalue()
if prev_idx is not None:
# replace the opening
data.seek(prev_idx, os.SEEK_SET)
data.write(to_text(self.environment.comment_start_string))
# replace the closing
data.seek(token_start, os.SEEK_SET)
data.write(to_text(self.environment.comment_end_string))
else:
raise AnsibleError("Error while cleaning data for safety: unhandled regex match")
ret = data.getvalue()
else:
ret = orig_data
return ret
def set_available_variables(self, variables):
'''
@ -371,15 +391,13 @@ class Templar:
before being sent through the template engine.
'''
# Don't template unsafe variables, just return them.
if hasattr(variable, '__UNSAFE__'):
return variable
if fail_on_undefined is None:
fail_on_undefined = self._fail_on_undefined_errors
# Don't template unsafe variables, instead drop them back down to their constituent type.
if hasattr(variable, '__UNSAFE__'):
if isinstance(variable, text_type):
rval = self._clean_data(variable)
return rval
try:
if convert_bare:
variable = self._convert_bare_variable(variable, bare_deprecated=bare_deprecated)
@ -435,7 +453,6 @@ class Templar:
if eval_results[1] is None:
result = eval_results[0]
if unsafe:
from ansible.vars.unsafe_proxy import wrap_var
result = wrap_var(result)
else:
# FIXME: if the safe_eval raised an error, should we do something with it?
@ -482,6 +499,26 @@ class Templar:
else:
return variable
def is_template(self, data):
''' lets us know if data has a template'''
if isinstance(data, string_types):
try:
new = self.do_template(data)
except UndefinedError:
return True
except:
return False
return (new != data)
elif isinstance(data, (list, tuple)):
for v in data:
if self.is_template(v):
return True
elif isinstance(data, dict):
for k in data:
if self.is_template(k) or self.is_template(data[k]):
return True
return False
def templatable(self, data):
'''
returns True if the data can be templated w/o errors
@ -552,7 +589,6 @@ class Templar:
ran = None
if ran:
from ansible.vars.unsafe_proxy import UnsafeProxy, wrap_var
if wantlist:
ran = wrap_var(ran)
else:
@ -593,13 +629,12 @@ class Templar:
key = key.strip()
setattr(myenv, key, ast.literal_eval(val.strip()))
#FIXME: add tests
# Adds Ansible custom filters and tests
myenv.filters.update(self._get_filters())
myenv.tests.update(self._get_tests())
if escape_backslashes:
# Allow users to specify backslashes in playbooks as "\\"
# instead of as "\\\\".
# Allow users to specify backslashes in playbooks as "\\" instead of as "\\\\".
data = _escape_backslashes(data, myenv)
try:
@ -627,7 +662,6 @@ class Templar:
try:
res = j2_concat(rf)
if new_context.unsafe:
from ansible.vars.unsafe_proxy import wrap_var
res = wrap_var(res)
except TypeError as te:
if 'StrictUndefined' in to_native(te):

View file

@ -157,7 +157,7 @@
- name: assert recursive copied directories mode
assert:
that:
- "{{item.stat.mode}} == 0700"
- "item.stat.mode == '0700'"
with_items: "{{dir_stats.results}}"

View file

@ -23,8 +23,8 @@
assert:
that:
- item.0 | changed
- item.0.path_value == "C:\\{{ item.0.item }}Path"
- item.1.stdout_lines[0] == 'C:\\{{ item.0.item }}Path'
- item.0.path_value == "C:\\" + item.0.item + "Path"
- item.1.stdout_lines[0] == 'C:\\' + item.0.item + 'Path'
with_together:
- "{{ pathout.results }}"
- "{{ varout.results }}"

View file

@ -178,12 +178,6 @@ class TestTemplarTemplate(BaseTemplar, unittest.TestCase):
res = self.templar.template(unsafe_obj)
self.assertTrue(self.is_unsafe(res), 'returned value from template.template (%s) is not marked unsafe' % res)
@patch('ansible.template.Templar._clean_data', side_effect=AnsibleError)
def test_template_unsafe_clean_data_exception(self, mock_clean_data):
self.assertRaises(AnsibleError,
self.templar.template,
wrap_var('blip bar'))
# TODO: not sure what template is supposed to do it, but it currently throws attributeError
@patch('ansible.template.Templar._clean_data')
def test_template_unsafe_non_string_clean_data_exception(self, mock_clean_data):
@ -234,16 +228,11 @@ class TestTemplarCleanData(BaseTemplar, unittest.TestCase):
self.assertEqual(res, u'1 2 {#what#} 3 4 {#foo#} 5 6 7')
def test_clean_data_object(self):
obj = {'foo': [1, 2, 3, 'bdasdf', '{what}', '{{foo}}', 5]}
obj = {u'foo': [1, 2, 3, u'bdasdf', u'{what}', u'{{foo}}', 5]}
clean_obj = {u'foo': [1, 2, 3, u'bdasdf', u'{what}', u'{#foo#}', 5]}
res = self.templar._clean_data(obj)
self.assertEqual(res, obj)
def test_clean_data_object_unsafe(self):
rval = [1, 2, 3, wrap_var('bdasdf'), '{what}', wrap_var('{{unsafe_foo}}'), 5]
obj = {'foo': rval}
res = self.templar._clean_data(obj)
self.assertEqual(res, obj)
self.assertTrue(self.is_unsafe(res), 'returned value of _clean_data (%s) is not marked unsafe.' % res)
self.assertNotEqual(res, obj)
self.assertEqual(res, clean_obj)
def test_clean_data_bad_dict(self):
res = self.templar._clean_data(u'{{bad_dict}}')