Make AnsibleVaultEncryptedUnicode work more like a string (#67823)

* Make AnsibleVaultEncryptedUnicode work more like a string. Fixes #24425

* Remove debugging

* Wrap some things

* Reduce diff

* data should always result in text

* add tests

* Don't just copy and paste, kids

* Add eq and ne back

* Go full UserString copy/paste

* Various version related fixes

* Remove trailing newline

* py2v3

* Add a test that can evaluate whether a variable is vault encrypted

* map was introduces in jinja2 2.7

* moar jinja

* type fix

Co-Authored-By: Sam Doran <sdoran@redhat.com>

* Remove duplicate __hash__

* Fix typo

* Add changelog fragment

* ci_complete

Co-authored-by: Sam Doran <sdoran@redhat.com>
This commit is contained in:
Matt Martz 2020-06-08 16:30:14 -05:00 committed by GitHub
parent 8dd0356719
commit 9667f221a5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 407 additions and 14 deletions

View file

@ -0,0 +1,6 @@
bugfixes:
- Vault - Make the single vaulted value ``AnsibleVaultEncryptedUnicode`` class
work more like a string by replicating the behavior of ``collections.UserString``
from Python. These changes don't allow it to be considered a string, but
most common python string actions will now work as expected.
(https://github.com/ansible/ansible/pull/67823)

View file

@ -61,6 +61,30 @@ To match strings against a substring or a regular expression, use the ``match``,
``match`` succeeds if it finds the pattern at the beginning of the string, while ``search`` succeeds if it finds the pattern anywhere within string. By default, ``regex`` works like ``search``, but ``regex`` can be configured to perform other tests as well.
.. _testing_vault:
Vault
=====
.. versionadded:: 2.10
You can test whether a variable is an inline single vault encrypted value using the ``vault_encrypted`` test.
.. code-block:: yaml
vars:
variable: !vault |
$ANSIBLE_VAULT;1.2;AES256;dev
61323931353866666336306139373937316366366138656131323863373866376666353364373761
3539633234313836346435323766306164626134376564330a373530313635343535343133316133
36643666306434616266376434363239346433643238336464643566386135356334303736353136
6565633133366366360a326566323363363936613664616364623437336130623133343530333739
3039
tasks:
- debug:
msg: '{{ (variable is vault_encrypted) | ternary("Vault encrypted", "Not vault encrypted") }}'
.. _testing_truthiness:
Testing truthiness

View file

@ -599,6 +599,10 @@ class VaultLib:
self.cipher_name = None
self.b_version = b'1.2'
@staticmethod
def is_encrypted(vaulttext):
return is_encrypted(vaulttext)
def encrypt(self, plaintext, secret=None, vault_id=None):
"""Vault encrypt a piece of data.

View file

@ -19,9 +19,13 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import string
import sys as _sys
import sys
import yaml
from ansible.module_utils.common._collections_compat import Sequence
from ansible.module_utils.six import text_type
from ansible.module_utils._text import to_bytes, to_text, to_native
@ -79,9 +83,8 @@ class AnsibleSequence(AnsibleBaseYAMLObject, list):
pass
# Unicode like object that is not evaluated (decrypted) until it needs to be
# TODO: is there a reason these objects are subclasses for YAMLObject?
class AnsibleVaultEncryptedUnicode(yaml.YAMLObject, AnsibleBaseYAMLObject):
class AnsibleVaultEncryptedUnicode(Sequence, AnsibleBaseYAMLObject):
'''Unicode like object that is not evaluated (decrypted) until it needs to be'''
__UNSAFE__ = True
__ENCRYPTED__ = True
yaml_tag = u'!vault'
@ -113,30 +116,31 @@ class AnsibleVaultEncryptedUnicode(yaml.YAMLObject, AnsibleBaseYAMLObject):
@property
def data(self):
if not self.vault:
return self._ciphertext
return to_text(self._ciphertext)
return to_text(self.vault.decrypt(self._ciphertext))
@data.setter
def data(self, value):
self._ciphertext = value
self._ciphertext = to_bytes(value)
def __repr__(self):
return repr(self.data)
def is_encrypted(self):
return self.vault and self.vault.is_encrypted(self._ciphertext)
# Compare a regular str/text_type with the decrypted hypertext
def __eq__(self, other):
if self.vault:
return other == self.data
return False
def __hash__(self):
return id(self)
def __ne__(self, other):
if self.vault:
return other != self.data
return True
def __reversed__(self):
# This gets inerhited from ``collections.Sequence`` which returns a generator
# make this act more like the string implementation
return to_text(self[::-1], errors='surrogate_or_strict')
def __str__(self):
return to_native(self.data, errors='surrogate_or_strict')
@ -144,4 +148,232 @@ class AnsibleVaultEncryptedUnicode(yaml.YAMLObject, AnsibleBaseYAMLObject):
return to_text(self.data, errors='surrogate_or_strict')
def encode(self, encoding=None, errors=None):
return self.data.encode(encoding, errors)
return to_bytes(self.data, encoding=encoding, errors=errors)
# Methods below are a copy from ``collections.UserString``
# Some are copied as is, where others are modified to not
# auto wrap with ``self.__class__``
def __repr__(self):
return repr(self.data)
def __int__(self, base=10):
return int(self.data, base=base)
def __float__(self):
return float(self.data)
def __complex__(self):
return complex(self.data)
def __hash__(self):
return hash(self.data)
# This breaks vault, do not define it, we cannot satisfy this
# def __getnewargs__(self):
# return (self.data[:],)
def __lt__(self, string):
if isinstance(string, AnsibleVaultEncryptedUnicode):
return self.data < string.data
return self.data < string
def __le__(self, string):
if isinstance(string, AnsibleVaultEncryptedUnicode):
return self.data <= string.data
return self.data <= string
def __gt__(self, string):
if isinstance(string, AnsibleVaultEncryptedUnicode):
return self.data > string.data
return self.data > string
def __ge__(self, string):
if isinstance(string, AnsibleVaultEncryptedUnicode):
return self.data >= string.data
return self.data >= string
def __contains__(self, char):
if isinstance(char, AnsibleVaultEncryptedUnicode):
char = char.data
return char in self.data
def __len__(self):
return len(self.data)
def __getitem__(self, index):
return self.data[index]
def __getslice__(self, start, end):
start = max(start, 0)
end = max(end, 0)
return self.data[start:end]
def __add__(self, other):
if isinstance(other, AnsibleVaultEncryptedUnicode):
return self.data + other.data
elif isinstance(other, text_type):
return self.data + other
return self.data + to_text(other)
def __radd__(self, other):
if isinstance(other, text_type):
return other + self.data
return to_text(other) + self.data
def __mul__(self, n):
return self.data * n
__rmul__ = __mul__
def __mod__(self, args):
return self.data % args
def __rmod__(self, template):
return to_text(template) % self
# the following methods are defined in alphabetical order:
def capitalize(self):
return self.data.capitalize()
def casefold(self):
return self.data.casefold()
def center(self, width, *args):
return self.data.center(width, *args)
def count(self, sub, start=0, end=_sys.maxsize):
if isinstance(sub, AnsibleVaultEncryptedUnicode):
sub = sub.data
return self.data.count(sub, start, end)
def endswith(self, suffix, start=0, end=_sys.maxsize):
return self.data.endswith(suffix, start, end)
def expandtabs(self, tabsize=8):
return self.data.expandtabs(tabsize)
def find(self, sub, start=0, end=_sys.maxsize):
if isinstance(sub, AnsibleVaultEncryptedUnicode):
sub = sub.data
return self.data.find(sub, start, end)
def format(self, *args, **kwds):
return self.data.format(*args, **kwds)
def format_map(self, mapping):
return self.data.format_map(mapping)
def index(self, sub, start=0, end=_sys.maxsize):
return self.data.index(sub, start, end)
def isalpha(self):
return self.data.isalpha()
def isalnum(self):
return self.data.isalnum()
def isascii(self):
return self.data.isascii()
def isdecimal(self):
return self.data.isdecimal()
def isdigit(self):
return self.data.isdigit()
def isidentifier(self):
return self.data.isidentifier()
def islower(self):
return self.data.islower()
def isnumeric(self):
return self.data.isnumeric()
def isprintable(self):
return self.data.isprintable()
def isspace(self):
return self.data.isspace()
def istitle(self):
return self.data.istitle()
def isupper(self):
return self.data.isupper()
def join(self, seq):
return self.data.join(seq)
def ljust(self, width, *args):
return self.data.ljust(width, *args)
def lower(self):
return self.data.lower()
def lstrip(self, chars=None):
return self.data.lstrip(chars)
try:
# PY3
maketrans = str.maketrans
except AttributeError:
# PY2
maketrans = string.maketrans
def partition(self, sep):
return self.data.partition(sep)
def replace(self, old, new, maxsplit=-1):
if isinstance(old, AnsibleVaultEncryptedUnicode):
old = old.data
if isinstance(new, AnsibleVaultEncryptedUnicode):
new = new.data
return self.data.replace(old, new, maxsplit)
def rfind(self, sub, start=0, end=_sys.maxsize):
if isinstance(sub, AnsibleVaultEncryptedUnicode):
sub = sub.data
return self.data.rfind(sub, start, end)
def rindex(self, sub, start=0, end=_sys.maxsize):
return self.data.rindex(sub, start, end)
def rjust(self, width, *args):
return self.data.rjust(width, *args)
def rpartition(self, sep):
return self.data.rpartition(sep)
def rstrip(self, chars=None):
return self.data.rstrip(chars)
def split(self, sep=None, maxsplit=-1):
return self.data.split(sep, maxsplit)
def rsplit(self, sep=None, maxsplit=-1):
return self.data.rsplit(sep, maxsplit)
def splitlines(self, keepends=False):
return self.data.splitlines(keepends)
def startswith(self, prefix, start=0, end=_sys.maxsize):
return self.data.startswith(prefix, start, end)
def strip(self, chars=None):
return self.data.strip(chars)
def swapcase(self):
return self.data.swapcase()
def title(self):
return self.data.title()
def translate(self, *args):
return self.data.translate(*args)
def upper(self):
return self.data.upper()
def zfill(self, width):
return self.data.zfill(width)

View file

@ -134,6 +134,9 @@ def regex_replace(value='', pattern='', replacement='', ignorecase=False, multil
def regex_findall(value, regex, multiline=False, ignorecase=False):
''' Perform re.findall and return the list of matches '''
value = to_text(value, errors='surrogate_or_strict', nonstring='simplerepr')
flags = 0
if ignorecase:
flags |= re.I
@ -145,6 +148,8 @@ def regex_findall(value, regex, multiline=False, ignorecase=False):
def regex_search(value, regex, *args, **kwargs):
''' Perform re.search and return the list of matches or a backref '''
value = to_text(value, errors='surrogate_or_strict', nonstring='simplerepr')
groups = list()
for arg in args:
if arg.startswith('\\g'):
@ -184,6 +189,7 @@ def ternary(value, true_val, false_val, none_val=None):
def regex_escape(string, re_type='python'):
string = to_text(string, errors='surrogate_or_strict', nonstring='simplerepr')
'''Escape all regular expressions special characters from STRING.'''
if re_type == 'python':
return re.escape(string)

View file

@ -128,6 +128,14 @@ def regex(value='', pattern='', ignorecase=False, multiline=False, match_type='s
return bool(getattr(_re, match_type, 'search')(value))
def vault_encrypted(value):
"""Evaulate whether a variable is a single vault encrypted value
.. versionadded:: 2.10
"""
return getattr(value, '__ENCRYPTED__', False) and value.is_encrypted()
def match(value, pattern='', ignorecase=False, multiline=False):
''' Perform a `re.match` returning a boolean '''
return regex(value, pattern, ignorecase, multiline, 'match')
@ -236,4 +244,7 @@ class TestModule(object):
# truthiness
'truthy': truthy,
'falsy': falsy,
# vault
'vault_encrypted': vault_encrypted,
}

View file

@ -65,5 +65,5 @@ regex_replace = bar
#bar
#bart
regex_search = 0001
regex_findall = "['car', 'tar', 'bar']"
regex_findall = ["car", "tar", "bar"]
regex_escape = \^f\.\*o\(\.\*\)\$

View file

@ -58,5 +58,5 @@ regex_replace = {{ 'foo' | regex_replace('^foo', 'bar') }}
# Check regex_replace with multiline
{{ '#foo\n#foot' | regex_replace('^#foo', '#bar', multiline=True) }}
regex_search = {{ 'test_value_0001' | regex_search('([0-9]+)$')}}
regex_findall = "{{ 'car\ntar\nfoo\nbar\n' | regex_findall('^.ar$', multiline=True) }}"
regex_findall = {{ 'car\ntar\nfoo\nbar\n' | regex_findall('^.ar$', multiline=True)|to_json }}
regex_escape = {{ '^f.*o(.*)$' | regex_escape() }}

View file

@ -521,3 +521,4 @@ ansible-playbook -i ../../inventory -v "$@" --vault-password-file vault-password
# Ensure we don't leave unencrypted temp files dangling
ansible-playbook -v "$@" --vault-password-file vault-password test_dangling_temp.yml
ansible-playbook "$@" --vault-password-file vault-password single_vault_as_string.yml

View file

@ -0,0 +1,109 @@
- hosts: localhost
vars:
vaulted_value: !vault |
$ANSIBLE_VAULT;1.1;AES256
35323961353038346165643738646465376139363061353835303739663538343266303232326635
3365353662646236356665323135633630656238316530640a663362363763633436373439663031
33663433383037396438656464636433653837376361313638366362333037323961316364363363
3835616438623261650a636164376534376661393134326662326362323131373964313961623365
3833
tasks:
- debug:
msg: "{{ vaulted_value }}"
- debug:
msg: "{{ vaulted_value|type_debug }}"
- assert:
that:
- vaulted_value is vault_encrypted
- vaulted_value == 'foo bar'
- vaulted_value|string == 'foo bar'
- vaulted_value|quote == "'foo bar'"
- vaulted_value|capitalize == 'Foo bar'
- vaulted_value|center(width=9) == ' foo bar '
- vaulted_value|default('monkey') == 'foo bar'
- vaulted_value|escape == 'foo bar'
- vaulted_value|forceescape == 'foo bar'
- vaulted_value|first == 'f'
- "'%s'|format(vaulted_value) == 'foo bar'"
- vaulted_value|indent(indentfirst=True) == ' foo bar'
- vaulted_value.split() == ['foo', 'bar']
- vaulted_value|join('-') == 'f-o-o- -b-a-r'
- vaulted_value|last == 'r'
- vaulted_value|length == 7
- vaulted_value|list == ['f', 'o', 'o', ' ', 'b', 'a', 'r']
- vaulted_value|lower == 'foo bar'
- vaulted_value|replace('foo', 'baz') == 'baz bar'
- vaulted_value|reverse|string == 'rab oof'
- vaulted_value|safe == 'foo bar'
- vaulted_value|slice(2)|list == [['f', 'o', 'o', ' '], ['b', 'a', 'r']]
- vaulted_value|sort|list == [" ", "a", "b", "f", "o", "o", "r"]
- vaulted_value|trim == 'foo bar'
- vaulted_value|upper == 'FOO BAR'
# jinja2.filters.do_urlencode uses an isinstance against string_types
# - vaulted_value|urlencode == 'foo%20bar'
- vaulted_value|urlize == 'foo bar'
- vaulted_value is not callable
- vaulted_value is iterable
- vaulted_value is lower
- vaulted_value is not none
# This is not exactly a string, and UserString doesn't fulfill this
# - vaulted_value is string
- vaulted_value is not upper
- vaulted_value|b64encode == 'Zm9vIGJhcg=='
- vaulted_value|to_uuid == '0271fe51-bb26-560f-b118-5d6513850860'
- vaulted_value|string|to_json == '"foo bar"'
- vaulted_value|md5 == '327b6f07435811239bc47e1544353273'
- vaulted_value|sha1 == '3773dea65156909838fa6c22825cafe090ff8030'
- vaulted_value|hash == '3773dea65156909838fa6c22825cafe090ff8030'
- vaulted_value|regex_replace('foo', 'baz') == 'baz bar'
- vaulted_value|regex_escape == 'foo\ bar'
- vaulted_value|regex_search('foo') == 'foo'
- vaulted_value|regex_findall('foo') == ['foo']
- vaulted_value|comment == '#\n# foo bar\n#'
- assert:
that:
- vaulted_value|random(seed='foo') == ' '
- vaulted_value|shuffle(seed='foo') == ["o", "f", "r", "b", "o", "a", " "]
- vaulted_value|pprint == "'foo bar'"
when: ansible_python.version.major == 3
- assert:
that:
- vaulted_value|random(seed='foo') == 'r'
- vaulted_value|shuffle(seed='foo') == ["b", "o", "a", " ", "o", "f", "r"]
- vaulted_value|pprint == "u'foo bar'"
when: ansible_python.version.major == 2
- assert:
that:
- vaulted_value|map('upper')|list == ['F', 'O', 'O', ' ', 'B', 'A', 'R']
when: lookup('pipe', ansible_python.executable ~ ' -c "import jinja2; print(jinja2.__version__)"') is version('2.7', '>=')
- assert:
that:
- vaulted_value.split()|first|int(base=36) == 20328
- vaulted_value|select('equalto', 'o')|list == ['o', 'o']
- vaulted_value|title == 'Foo Bar'
- vaulted_value is equalto('foo bar')
when: lookup('pipe', ansible_python.executable ~ ' -c "import jinja2; print(jinja2.__version__)"') is version('2.8', '>=')
- assert:
that:
- vaulted_value|string|tojson == '"foo bar"'
- vaulted_value|truncate(4) == 'foo bar'
when: lookup('pipe', ansible_python.executable ~ ' -c "import jinja2; print(jinja2.__version__)"') is version('2.9', '>=')
- assert:
that:
- vaulted_value|wordwrap(4) == 'foo\nbar'
when: lookup('pipe', ansible_python.executable ~ ' -c "import jinja2; print(jinja2.__version__)"') is version('2.11', '>=')
- assert:
that:
- vaulted_value|wordcount == 2
when: lookup('pipe', ansible_python.executable ~ ' -c "import jinja2; print(jinja2.__version__)"') is version('2.11.2', '>=')