Merge pull request #12075 from ansible/fix-vault-unicode

Unicode and other fixes for vault
This commit is contained in:
Toshio Kuratomi 2015-08-25 14:49:15 -07:00
commit 56ae3a032f
4 changed files with 125 additions and 105 deletions

View file

@ -151,12 +151,15 @@ class DataLoader():
show_content = True
try:
with open(file_name, 'r') as f:
with open(file_name, 'rb') as f:
data = f.read()
if self._vault.is_encrypted(data):
data = self._vault.decrypt(data)
show_content = False
data = to_unicode(data, errors='strict')
return (data, show_content)
except (IOError, OSError) as e:
raise AnsibleParserError("an error occurred while trying to read the file '%s': %s" % (file_name, str(e)))

View file

@ -22,7 +22,6 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import sys
import os
import shlex
import shutil
@ -33,7 +32,7 @@ from ansible.errors import AnsibleError
from hashlib import sha256
from binascii import hexlify
from binascii import unhexlify
from six import binary_type, PY3, text_type
from six import PY3
# Note: Only used for loading obsolete VaultAES files. All files are written
# using the newer VaultAES256 which does not require md5
@ -95,8 +94,9 @@ HAS_ANY_PBKDF2HMAC = HAS_PBKDF2 or HAS_PBKDF2HMAC
CRYPTO_UPGRADE = "ansible-vault requires a newer version of pycrypto than the one installed on your platform. You may fix this with OS-specific commands such as: yum install python-devel; rpm -e --nodeps python-crypto; pip install pycrypto"
HEADER=u'$ANSIBLE_VAULT'
CIPHER_WHITELIST=['AES', 'AES256']
b_HEADER = b'$ANSIBLE_VAULT'
CIPHER_WHITELIST = frozenset((u'AES', u'AES256'))
CIPHER_WRITE_WHITELIST=frozenset((u'AES256',))
def check_prereqs():
@ -104,116 +104,132 @@ def check_prereqs():
if not HAS_AES or not HAS_COUNTER or not HAS_ANY_PBKDF2HMAC or not HAS_HASH:
raise AnsibleError(CRYPTO_UPGRADE)
class VaultLib(object):
class VaultLib:
def __init__(self, password):
self.password = password
self.b_password = to_bytes(password, errors='strict', encoding='utf-8')
self.cipher_name = None
self.version = '1.1'
self.b_version = b'1.1'
def is_encrypted(self, data):
data = to_unicode(data)
if data.startswith(HEADER):
""" Test if this is vault encrypted data
:arg data: a byte str or unicode string to test whether it is
recognized as vault encrypted data
:returns: True if it is recognized. Otherwise, False.
"""
if to_bytes(data, errors='strict', encoding='utf-8').startswith(b_HEADER):
return True
else:
return False
def encrypt(self, data):
data = to_unicode(data)
"""Vault encrypt a piece of data.
if self.is_encrypted(data):
:arg data: a utf-8 byte str or unicode string to encrypt.
:returns: a utf-8 encoded byte str of encrypted data. The string
contains a header identifying this as vault encrypted data and
formatted to newline terminated lines of 80 characters. This is
suitable for dumping as is to a vault file.
"""
b_data = to_bytes(data, errors='strict', encoding='utf-8')
if self.is_encrypted(b_data):
raise AnsibleError("data is already encrypted")
if not self.cipher_name:
self.cipher_name = "AES256"
# raise AnsibleError("the cipher must be set before encrypting data")
self.cipher_name = u"AES256"
if 'Vault' + self.cipher_name in globals() and self.cipher_name in CIPHER_WHITELIST:
cipher = globals()['Vault' + self.cipher_name]
this_cipher = cipher()
cipher_class_name = u'Vault{0}'.format(self.cipher_name)
if cipher_class_name in globals() and self.cipher_name in CIPHER_WHITELIST:
Cipher = globals()[cipher_class_name]
this_cipher = Cipher()
else:
raise AnsibleError("{0} cipher could not be found".format(self.cipher_name))
raise AnsibleError(u"{0} cipher could not be found".format(self.cipher_name))
"""
# combine sha + data
this_sha = sha256(data).hexdigest()
tmp_data = this_sha + "\n" + data
"""
# encrypt data
b_enc_data = this_cipher.encrypt(b_data, self.b_password)
# encrypt sha + data
enc_data = this_cipher.encrypt(data, self.password)
# add header
tmp_data = self._add_header(enc_data)
return tmp_data
# format the data for output to the file
b_tmp_data = self._format_output(b_enc_data)
return b_tmp_data
def decrypt(self, data):
data = to_bytes(data)
"""Decrypt a piece of vault encrypted data.
if self.password is None:
:arg data: a string to decrypt. Since vault encrypted data is an
ascii text format this can be either a byte str or unicode string.
:returns: a byte string containing the decrypted data
"""
b_data = to_bytes(data, errors='strict', encoding='utf-8')
if self.b_password is None:
raise AnsibleError("A vault password must be specified to decrypt data")
if not self.is_encrypted(data):
if not self.is_encrypted(b_data):
raise AnsibleError("data is not encrypted")
# clean out header
data = self._split_header(data)
b_data = self._split_header(b_data)
# create the cipher object
ciphername = to_unicode(self.cipher_name)
if 'Vault' + ciphername in globals() and ciphername in CIPHER_WHITELIST:
cipher = globals()['Vault' + ciphername]
this_cipher = cipher()
cipher_class_name = u'Vault{0}'.format(self.cipher_name)
if cipher_class_name in globals() and self.cipher_name in CIPHER_WHITELIST:
Cipher = globals()[cipher_class_name]
this_cipher = Cipher()
else:
raise AnsibleError("{0} cipher could not be found".format(ciphername))
raise AnsibleError("{0} cipher could not be found".format(self.cipher_name))
# try to unencrypt data
data = this_cipher.decrypt(data, self.password)
if data is None:
b_data = this_cipher.decrypt(b_data, self.b_password)
if b_data is None:
raise AnsibleError("Decryption failed")
return data
return b_data
def _add_header(self, data):
# combine header and encrypted data in 80 char columns
def _format_output(self, b_data):
""" Add header and format to 80 columns
:arg b_data: the encrypted and hexlified data as a byte string
:returns: a byte str that should be dumped into a file. It's
formatted to 80 char columns and has the header prepended
"""
#tmpdata = hexlify(data)
tmpdata = [to_bytes(data[i:i+80]) for i in range(0, len(data), 80)]
if not self.cipher_name:
raise AnsibleError("the cipher must be set before adding a header")
dirty_data = to_bytes(HEADER + ";" + self.version + ";" + self.cipher_name + "\n")
for l in tmpdata:
dirty_data += l + b'\n'
tmpdata = [b'%s\n' % b_data[i:i+80] for i in range(0, len(b_data), 80)]
tmpdata.insert(0, b'%s;%s;%s\n' % (b_HEADER, self.b_version,
to_bytes(self.cipher_name, errors='strict', encoding='utf-8')))
tmpdata = b''.join(tmpdata)
return dirty_data
return tmpdata
def _split_header(self, b_data):
"""Retrieve information about the Vault and clean the data
def _split_header(self, data):
When data is saved, it has a header prepended and is formatted into 80
character lines. This method extracts the information from the header
and then removes the header and the inserted newlines. The string returned
is suitable for processing by the Cipher classes.
:arg b_data: byte str containing the data from a save file
:returns: a byte str suitable for passing to a Cipher class's
decrypt() function.
"""
# used by decrypt
tmpdata = data.split(b'\n')
tmpdata = b_data.split(b'\n')
tmpheader = tmpdata[0].strip().split(b';')
self.version = to_unicode(tmpheader[1].strip())
self.b_version = tmpheader[1].strip()
self.cipher_name = to_unicode(tmpheader[2].strip())
clean_data = b'\n'.join(tmpdata[1:])
"""
# strip out newline, join, unhex
clean_data = [ x.strip() for x in clean_data ]
clean_data = unhexlify(''.join(clean_data))
"""
clean_data = b''.join(tmpdata[1:])
return clean_data
def __enter__(self):
return self
def __exit__(self, *err):
pass
class VaultEditor(object):
class VaultEditor:
# uses helper methods for write_file(self, filename, data)
# to write a file so that code isn't duplicated for simple
# file I/O, ditto read_file(self, filename) and launch_editor(self, filename)
@ -225,7 +241,7 @@ class VaultEditor(object):
self.password = password
self.filename = filename
def _edit_file_helper(self, existing_data=None, cipher=None):
def _edit_file_helper(self, existing_data=None, cipher=None, force_save=False):
# make sure the umask is set to a sane value
old_umask = os.umask(0o077)
@ -240,7 +256,7 @@ class VaultEditor(object):
tmpdata = self.read_data(tmp_path)
# Do nothing if the content has not changed
if existing_data == tmpdata:
if existing_data == tmpdata and not force_save:
return
# create new vault
@ -297,11 +313,11 @@ class VaultEditor(object):
dec_data = this_vault.decrypt(tmpdata)
# let the user edit the data and save
self._edit_file_helper(existing_data=dec_data)
###we want the cipher to default to AES256 (get rid of files
# encrypted with the AES cipher)
#self._edit_file_helper(existing_data=dec_data, cipher=this_vault.cipher_name)
if this_vault.cipher_name not in CIPHER_WRITE_WHITELIST:
# we want to get rid of files encrypted with the AES cipher
self._edit_file_helper(existing_data=dec_data, cipher=None, force_save=True)
else:
self._edit_file_helper(existing_data=dec_data, cipher=this_vault.cipher_name, force_save=False)
def view_file(self):
@ -363,7 +379,7 @@ class VaultEditor(object):
if os.path.isfile(filename):
os.remove(filename)
f = open(filename, "wb")
f.write(to_bytes(data))
f.write(to_bytes(data, errors='strict'))
f.close()
def shuffle_files(self, src, dest):
@ -401,19 +417,21 @@ class VaultFile(object):
_, self.tmpfile = tempfile.mkstemp()
### FIXME:
# __del__ can be problematic in python... For this use case, make
# VaultFile a context manager instead (implement __enter__ and __exit__)
def __del__(self):
self.filehandle.close()
os.unlink(self.tmplfile)
def is_encrypted(self):
peak = self.filehandler.readline()
if peak.startswith(HEADER):
peak = self.filehandle.readline()
if peak.startswith(b_HEADER):
return True
else:
return False
def get_decrypted(self):
check_prereqs()
if self.is_encrypted():
@ -423,7 +441,7 @@ class VaultFile(object):
if dec_data is None:
raise AnsibleError("Decryption failed")
else:
self.tempfile.write(dec_data)
self.tmpfile.write(dec_data)
return self.tmpfile
else:
return self.filename
@ -432,13 +450,15 @@ class VaultFile(object):
# CIPHERS #
########################################
class VaultAES(object):
class VaultAES:
# this version has been obsoleted by the VaultAES256 class
# which uses encrypt-then-mac (fixing order) and also improving the KDF used
# code remains for upgrade purposes only
# http://stackoverflow.com/a/16761459
# Note: strings in this class should be byte strings by default.
def __init__(self):
if not HAS_AES:
raise AnsibleError(CRYPTO_UPGRADE)
@ -449,8 +469,8 @@ class VaultAES(object):
d = d_i = b''
while len(d) < key_length + iv_length:
text = "{0}{1}{2}".format(d_i, password, salt)
d_i = md5(to_bytes(text)).digest()
text = b"%s%s%s" % (d_i, password, salt)
d_i = to_bytes(md5(text).digest(), errors='strict')
d += d_i
key = d[:key_length]
@ -462,12 +482,11 @@ class VaultAES(object):
""" Read plaintext data from in_file and write encrypted to out_file """
# combine sha + data
this_sha = sha256(to_bytes(data)).hexdigest()
tmp_data = this_sha + "\n" + data
this_sha = to_bytes(sha256(data).hexdigest())
tmp_data = this_sha + b"\n" + data
in_file = BytesIO(to_bytes(tmp_data))
in_file = BytesIO(tmp_data)
in_file.seek(0)
out_file = BytesIO()
@ -475,7 +494,7 @@ class VaultAES(object):
# Get a block of random data. EL does not have Crypto.Random.new()
# so os.urandom is used for cross platform purposes
salt = os.urandom(bs - len('Salted__'))
salt = os.urandom(bs - len(b'Salted__'))
key, iv = self.aes_derive_key_and_iv(password, salt, key_length, bs)
cipher = AES.new(key, AES.MODE_CBC, iv)
@ -486,7 +505,7 @@ class VaultAES(object):
chunk = in_file.read(1024 * bs)
if len(chunk) == 0 or len(chunk) % bs != 0:
padding_length = (bs - len(chunk) % bs) or bs
chunk += to_bytes(padding_length * chr(padding_length))
chunk += to_bytes(padding_length * chr(padding_length), errors='strict', encoding='ascii')
finished = True
out_file.write(cipher.encrypt(chunk))
@ -503,7 +522,6 @@ class VaultAES(object):
# http://stackoverflow.com/a/14989032
data = b''.join(data.split(b'\n'))
data = unhexlify(data)
in_file = BytesIO(data)
@ -512,7 +530,7 @@ class VaultAES(object):
bs = AES.block_size
tmpsalt = in_file.read(bs)
salt = tmpsalt[len('Salted__'):]
salt = tmpsalt[len(b'Salted__'):]
key, iv = self.aes_derive_key_and_iv(password, salt, key_length, bs)
cipher = AES.new(key, AES.MODE_CBC, iv)
next_chunk = b''
@ -536,13 +554,12 @@ class VaultAES(object):
out_file.seek(0)
out_data = out_file.read()
out_file.close()
new_data = to_unicode(out_data)
# split out sha and verify decryption
split_data = new_data.split("\n")
split_data = out_data.split(b"\n", 1)
this_sha = split_data[0]
this_data = '\n'.join(split_data[1:])
test_sha = sha256(to_bytes(this_data)).hexdigest()
this_data = split_data[1]
test_sha = to_bytes(sha256(this_data).hexdigest())
if this_sha != test_sha:
raise AnsibleError("Decryption failed")
@ -550,7 +567,7 @@ class VaultAES(object):
return this_data
class VaultAES256(object):
class VaultAES256:
"""
Vault implementation using AES-CTR with an HMAC-SHA256 authentication code.
@ -559,6 +576,8 @@ class VaultAES256(object):
# http://www.daemonology.net/blog/2009-06-11-cryptographic-right-answers.html
# Note: strings in this class should be byte strings by default.
def __init__(self):
check_prereqs()
@ -608,7 +627,7 @@ class VaultAES256(object):
# PKCS#7 PAD DATA http://tools.ietf.org/html/rfc5652#section-6.3
bs = AES.block_size
padding_length = (bs - len(data) % bs) or bs
data += padding_length * chr(padding_length)
data += to_bytes(padding_length * chr(padding_length), encoding='ascii', errors='strict')
# COUNTER.new PARAMETERS
# 1) nbits (integer) - Length of the counter, in bits.
@ -628,14 +647,13 @@ class VaultAES256(object):
# COMBINE SALT, DIGEST AND DATA
hmac = HMAC.new(key2, cryptedData, SHA256)
message = b''.join([hexlify(salt), b"\n", to_bytes(hmac.hexdigest()), b"\n", hexlify(cryptedData)])
message = b'%s\n%s\n%s' % (hexlify(salt), to_bytes(hmac.hexdigest()), hexlify(cryptedData))
message = hexlify(message)
return message
def decrypt(self, data, password):
# SPLIT SALT, DIGEST, AND DATA
data = b''.join(data.split(b"\n"))
data = unhexlify(data)
salt, cryptedHmac, cryptedData = data.split(b"\n", 2)
salt = unhexlify(salt)
@ -663,7 +681,7 @@ class VaultAES256(object):
decryptedData = decryptedData[:-padding_length]
return to_unicode(decryptedData)
return decryptedData
def is_equal(self, a, b):
"""

View file

@ -29,7 +29,6 @@ from ansible.plugins.action import ActionBase
from ansible.utils.boolean import boolean
from ansible.utils.hashing import checksum
from ansible.utils.unicode import to_bytes
from ansible.parsing.vault import VaultLib
class ActionModule(ActionBase):

View file

@ -64,7 +64,7 @@ class TestVaultLib(unittest.TestCase):
slots = ['is_encrypted',
'encrypt',
'decrypt',
'_add_header',
'_format_output',
'_split_header',]
for slot in slots:
assert hasattr(v, slot), "VaultLib is missing the %s method" % slot
@ -75,11 +75,11 @@ class TestVaultLib(unittest.TestCase):
data = u"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
assert v.is_encrypted(data), "encryption check on headered text failed"
def test_add_header(self):
def test_format_output(self):
v = VaultLib('ansible')
v.cipher_name = "TEST"
sensitive_data = "ansible"
data = v._add_header(sensitive_data)
data = v._format_output(sensitive_data)
lines = data.split(b'\n')
assert len(lines) > 1, "failed to properly add header"
header = to_unicode(lines[0])
@ -87,7 +87,7 @@ class TestVaultLib(unittest.TestCase):
header_parts = header.split(';')
assert len(header_parts) == 3, "header has the wrong number of parts"
assert header_parts[0] == '$ANSIBLE_VAULT', "header does not start with $ANSIBLE_VAULT"
assert header_parts[1] == v.version, "header version is incorrect"
assert header_parts[1] == v.b_version, "header version is incorrect"
assert header_parts[2] == 'TEST', "header does end with cipher name"
def test_split_header(self):
@ -97,7 +97,7 @@ class TestVaultLib(unittest.TestCase):
lines = rdata.split(b'\n')
assert lines[0] == b"ansible"
assert v.cipher_name == 'TEST', "cipher name was not set"
assert v.version == "9.9"
assert v.b_version == "9.9"
def test_encrypt_decrypt_aes(self):
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2: