Unicode and other fixes for vault
This commit is contained in:
parent
6e107d2f22
commit
a3fd4817ef
4 changed files with 125 additions and 105 deletions
|
@ -151,12 +151,15 @@ class DataLoader():
|
||||||
|
|
||||||
show_content = True
|
show_content = True
|
||||||
try:
|
try:
|
||||||
with open(file_name, 'r') as f:
|
with open(file_name, 'rb') as f:
|
||||||
data = f.read()
|
data = f.read()
|
||||||
if self._vault.is_encrypted(data):
|
if self._vault.is_encrypted(data):
|
||||||
data = self._vault.decrypt(data)
|
data = self._vault.decrypt(data)
|
||||||
show_content = False
|
show_content = False
|
||||||
|
|
||||||
|
data = to_unicode(data, errors='strict')
|
||||||
return (data, show_content)
|
return (data, show_content)
|
||||||
|
|
||||||
except (IOError, OSError) as e:
|
except (IOError, OSError) as e:
|
||||||
raise AnsibleParserError("an error occurred while trying to read the file '%s': %s" % (file_name, str(e)))
|
raise AnsibleParserError("an error occurred while trying to read the file '%s': %s" % (file_name, str(e)))
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,6 @@
|
||||||
from __future__ import (absolute_import, division, print_function)
|
from __future__ import (absolute_import, division, print_function)
|
||||||
__metaclass__ = type
|
__metaclass__ = type
|
||||||
|
|
||||||
import sys
|
|
||||||
import os
|
import os
|
||||||
import shlex
|
import shlex
|
||||||
import shutil
|
import shutil
|
||||||
|
@ -33,7 +32,7 @@ from ansible.errors import AnsibleError
|
||||||
from hashlib import sha256
|
from hashlib import sha256
|
||||||
from binascii import hexlify
|
from binascii import hexlify
|
||||||
from binascii import unhexlify
|
from binascii import unhexlify
|
||||||
from six import binary_type, PY3, text_type
|
from six import PY3
|
||||||
|
|
||||||
# Note: Only used for loading obsolete VaultAES files. All files are written
|
# Note: Only used for loading obsolete VaultAES files. All files are written
|
||||||
# using the newer VaultAES256 which does not require md5
|
# using the newer VaultAES256 which does not require md5
|
||||||
|
@ -95,8 +94,9 @@ HAS_ANY_PBKDF2HMAC = HAS_PBKDF2 or HAS_PBKDF2HMAC
|
||||||
|
|
||||||
CRYPTO_UPGRADE = "ansible-vault requires a newer version of pycrypto than the one installed on your platform. You may fix this with OS-specific commands such as: yum install python-devel; rpm -e --nodeps python-crypto; pip install pycrypto"
|
CRYPTO_UPGRADE = "ansible-vault requires a newer version of pycrypto than the one installed on your platform. You may fix this with OS-specific commands such as: yum install python-devel; rpm -e --nodeps python-crypto; pip install pycrypto"
|
||||||
|
|
||||||
HEADER=u'$ANSIBLE_VAULT'
|
b_HEADER = b'$ANSIBLE_VAULT'
|
||||||
CIPHER_WHITELIST=['AES', 'AES256']
|
CIPHER_WHITELIST = frozenset((u'AES', u'AES256'))
|
||||||
|
CIPHER_WRITE_WHITELIST=frozenset((u'AES256',))
|
||||||
|
|
||||||
|
|
||||||
def check_prereqs():
|
def check_prereqs():
|
||||||
|
@ -104,116 +104,132 @@ def check_prereqs():
|
||||||
if not HAS_AES or not HAS_COUNTER or not HAS_ANY_PBKDF2HMAC or not HAS_HASH:
|
if not HAS_AES or not HAS_COUNTER or not HAS_ANY_PBKDF2HMAC or not HAS_HASH:
|
||||||
raise AnsibleError(CRYPTO_UPGRADE)
|
raise AnsibleError(CRYPTO_UPGRADE)
|
||||||
|
|
||||||
class VaultLib(object):
|
class VaultLib:
|
||||||
|
|
||||||
def __init__(self, password):
|
def __init__(self, password):
|
||||||
self.password = password
|
self.b_password = to_bytes(password, errors='strict', encoding='utf-8')
|
||||||
self.cipher_name = None
|
self.cipher_name = None
|
||||||
self.version = '1.1'
|
self.b_version = b'1.1'
|
||||||
|
|
||||||
def is_encrypted(self, data):
|
def is_encrypted(self, data):
|
||||||
data = to_unicode(data)
|
""" Test if this is vault encrypted data
|
||||||
if data.startswith(HEADER):
|
|
||||||
|
:arg data: a byte str or unicode string to test whether it is
|
||||||
|
recognized as vault encrypted data
|
||||||
|
:returns: True if it is recognized. Otherwise, False.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if to_bytes(data, errors='strict', encoding='utf-8').startswith(b_HEADER):
|
||||||
return True
|
return True
|
||||||
else:
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def encrypt(self, data):
|
def encrypt(self, data):
|
||||||
data = to_unicode(data)
|
"""Vault encrypt a piece of data.
|
||||||
|
|
||||||
if self.is_encrypted(data):
|
:arg data: a utf-8 byte str or unicode string to encrypt.
|
||||||
|
:returns: a utf-8 encoded byte str of encrypted data. The string
|
||||||
|
contains a header identifying this as vault encrypted data and
|
||||||
|
formatted to newline terminated lines of 80 characters. This is
|
||||||
|
suitable for dumping as is to a vault file.
|
||||||
|
"""
|
||||||
|
b_data = to_bytes(data, errors='strict', encoding='utf-8')
|
||||||
|
|
||||||
|
if self.is_encrypted(b_data):
|
||||||
raise AnsibleError("data is already encrypted")
|
raise AnsibleError("data is already encrypted")
|
||||||
|
|
||||||
if not self.cipher_name:
|
if not self.cipher_name:
|
||||||
self.cipher_name = "AES256"
|
self.cipher_name = u"AES256"
|
||||||
# raise AnsibleError("the cipher must be set before encrypting data")
|
|
||||||
|
|
||||||
if 'Vault' + self.cipher_name in globals() and self.cipher_name in CIPHER_WHITELIST:
|
cipher_class_name = u'Vault{0}'.format(self.cipher_name)
|
||||||
cipher = globals()['Vault' + self.cipher_name]
|
if cipher_class_name in globals() and self.cipher_name in CIPHER_WHITELIST:
|
||||||
this_cipher = cipher()
|
Cipher = globals()[cipher_class_name]
|
||||||
|
this_cipher = Cipher()
|
||||||
else:
|
else:
|
||||||
raise AnsibleError("{0} cipher could not be found".format(self.cipher_name))
|
raise AnsibleError(u"{0} cipher could not be found".format(self.cipher_name))
|
||||||
|
|
||||||
"""
|
# encrypt data
|
||||||
# combine sha + data
|
b_enc_data = this_cipher.encrypt(b_data, self.b_password)
|
||||||
this_sha = sha256(data).hexdigest()
|
|
||||||
tmp_data = this_sha + "\n" + data
|
|
||||||
"""
|
|
||||||
|
|
||||||
# encrypt sha + data
|
# format the data for output to the file
|
||||||
enc_data = this_cipher.encrypt(data, self.password)
|
b_tmp_data = self._format_output(b_enc_data)
|
||||||
|
return b_tmp_data
|
||||||
# add header
|
|
||||||
tmp_data = self._add_header(enc_data)
|
|
||||||
return tmp_data
|
|
||||||
|
|
||||||
def decrypt(self, data):
|
def decrypt(self, data):
|
||||||
data = to_bytes(data)
|
"""Decrypt a piece of vault encrypted data.
|
||||||
|
|
||||||
if self.password is None:
|
:arg data: a string to decrypt. Since vault encrypted data is an
|
||||||
|
ascii text format this can be either a byte str or unicode string.
|
||||||
|
:returns: a byte string containing the decrypted data
|
||||||
|
"""
|
||||||
|
b_data = to_bytes(data, errors='strict', encoding='utf-8')
|
||||||
|
|
||||||
|
if self.b_password is None:
|
||||||
raise AnsibleError("A vault password must be specified to decrypt data")
|
raise AnsibleError("A vault password must be specified to decrypt data")
|
||||||
|
|
||||||
if not self.is_encrypted(data):
|
if not self.is_encrypted(b_data):
|
||||||
raise AnsibleError("data is not encrypted")
|
raise AnsibleError("data is not encrypted")
|
||||||
|
|
||||||
# clean out header
|
# clean out header
|
||||||
data = self._split_header(data)
|
b_data = self._split_header(b_data)
|
||||||
|
|
||||||
# create the cipher object
|
# create the cipher object
|
||||||
ciphername = to_unicode(self.cipher_name)
|
cipher_class_name = u'Vault{0}'.format(self.cipher_name)
|
||||||
if 'Vault' + ciphername in globals() and ciphername in CIPHER_WHITELIST:
|
if cipher_class_name in globals() and self.cipher_name in CIPHER_WHITELIST:
|
||||||
cipher = globals()['Vault' + ciphername]
|
Cipher = globals()[cipher_class_name]
|
||||||
this_cipher = cipher()
|
this_cipher = Cipher()
|
||||||
else:
|
else:
|
||||||
raise AnsibleError("{0} cipher could not be found".format(ciphername))
|
raise AnsibleError("{0} cipher could not be found".format(self.cipher_name))
|
||||||
|
|
||||||
# try to unencrypt data
|
# try to unencrypt data
|
||||||
data = this_cipher.decrypt(data, self.password)
|
b_data = this_cipher.decrypt(b_data, self.b_password)
|
||||||
if data is None:
|
if b_data is None:
|
||||||
raise AnsibleError("Decryption failed")
|
raise AnsibleError("Decryption failed")
|
||||||
|
|
||||||
return data
|
return b_data
|
||||||
|
|
||||||
def _add_header(self, data):
|
def _format_output(self, b_data):
|
||||||
# combine header and encrypted data in 80 char columns
|
""" Add header and format to 80 columns
|
||||||
|
|
||||||
|
:arg b_data: the encrypted and hexlified data as a byte string
|
||||||
|
:returns: a byte str that should be dumped into a file. It's
|
||||||
|
formatted to 80 char columns and has the header prepended
|
||||||
|
"""
|
||||||
|
|
||||||
#tmpdata = hexlify(data)
|
|
||||||
tmpdata = [to_bytes(data[i:i+80]) for i in range(0, len(data), 80)]
|
|
||||||
if not self.cipher_name:
|
if not self.cipher_name:
|
||||||
raise AnsibleError("the cipher must be set before adding a header")
|
raise AnsibleError("the cipher must be set before adding a header")
|
||||||
|
|
||||||
dirty_data = to_bytes(HEADER + ";" + self.version + ";" + self.cipher_name + "\n")
|
tmpdata = [b'%s\n' % b_data[i:i+80] for i in range(0, len(b_data), 80)]
|
||||||
for l in tmpdata:
|
tmpdata.insert(0, b'%s;%s;%s\n' % (b_HEADER, self.b_version,
|
||||||
dirty_data += l + b'\n'
|
to_bytes(self.cipher_name, errors='strict', encoding='utf-8')))
|
||||||
|
tmpdata = b''.join(tmpdata)
|
||||||
|
|
||||||
return dirty_data
|
return tmpdata
|
||||||
|
|
||||||
|
def _split_header(self, b_data):
|
||||||
|
"""Retrieve information about the Vault and clean the data
|
||||||
|
|
||||||
def _split_header(self, data):
|
When data is saved, it has a header prepended and is formatted into 80
|
||||||
|
character lines. This method extracts the information from the header
|
||||||
|
and then removes the header and the inserted newlines. The string returned
|
||||||
|
is suitable for processing by the Cipher classes.
|
||||||
|
|
||||||
|
:arg b_data: byte str containing the data from a save file
|
||||||
|
:returns: a byte str suitable for passing to a Cipher class's
|
||||||
|
decrypt() function.
|
||||||
|
"""
|
||||||
# used by decrypt
|
# used by decrypt
|
||||||
|
|
||||||
tmpdata = data.split(b'\n')
|
tmpdata = b_data.split(b'\n')
|
||||||
tmpheader = tmpdata[0].strip().split(b';')
|
tmpheader = tmpdata[0].strip().split(b';')
|
||||||
|
|
||||||
self.version = to_unicode(tmpheader[1].strip())
|
self.b_version = tmpheader[1].strip()
|
||||||
self.cipher_name = to_unicode(tmpheader[2].strip())
|
self.cipher_name = to_unicode(tmpheader[2].strip())
|
||||||
clean_data = b'\n'.join(tmpdata[1:])
|
clean_data = b''.join(tmpdata[1:])
|
||||||
|
|
||||||
"""
|
|
||||||
# strip out newline, join, unhex
|
|
||||||
clean_data = [ x.strip() for x in clean_data ]
|
|
||||||
clean_data = unhexlify(''.join(clean_data))
|
|
||||||
"""
|
|
||||||
|
|
||||||
return clean_data
|
return clean_data
|
||||||
|
|
||||||
def __enter__(self):
|
|
||||||
return self
|
|
||||||
|
|
||||||
def __exit__(self, *err):
|
class VaultEditor:
|
||||||
pass
|
|
||||||
|
|
||||||
class VaultEditor(object):
|
|
||||||
# uses helper methods for write_file(self, filename, data)
|
# uses helper methods for write_file(self, filename, data)
|
||||||
# to write a file so that code isn't duplicated for simple
|
# to write a file so that code isn't duplicated for simple
|
||||||
# file I/O, ditto read_file(self, filename) and launch_editor(self, filename)
|
# file I/O, ditto read_file(self, filename) and launch_editor(self, filename)
|
||||||
|
@ -225,7 +241,7 @@ class VaultEditor(object):
|
||||||
self.password = password
|
self.password = password
|
||||||
self.filename = filename
|
self.filename = filename
|
||||||
|
|
||||||
def _edit_file_helper(self, existing_data=None, cipher=None):
|
def _edit_file_helper(self, existing_data=None, cipher=None, force_save=False):
|
||||||
# make sure the umask is set to a sane value
|
# make sure the umask is set to a sane value
|
||||||
old_umask = os.umask(0o077)
|
old_umask = os.umask(0o077)
|
||||||
|
|
||||||
|
@ -240,7 +256,7 @@ class VaultEditor(object):
|
||||||
tmpdata = self.read_data(tmp_path)
|
tmpdata = self.read_data(tmp_path)
|
||||||
|
|
||||||
# Do nothing if the content has not changed
|
# Do nothing if the content has not changed
|
||||||
if existing_data == tmpdata:
|
if existing_data == tmpdata and not force_save:
|
||||||
return
|
return
|
||||||
|
|
||||||
# create new vault
|
# create new vault
|
||||||
|
@ -297,11 +313,11 @@ class VaultEditor(object):
|
||||||
dec_data = this_vault.decrypt(tmpdata)
|
dec_data = this_vault.decrypt(tmpdata)
|
||||||
|
|
||||||
# let the user edit the data and save
|
# let the user edit the data and save
|
||||||
self._edit_file_helper(existing_data=dec_data)
|
if this_vault.cipher_name not in CIPHER_WRITE_WHITELIST:
|
||||||
###we want the cipher to default to AES256 (get rid of files
|
# we want to get rid of files encrypted with the AES cipher
|
||||||
# encrypted with the AES cipher)
|
self._edit_file_helper(existing_data=dec_data, cipher=None, force_save=True)
|
||||||
#self._edit_file_helper(existing_data=dec_data, cipher=this_vault.cipher_name)
|
else:
|
||||||
|
self._edit_file_helper(existing_data=dec_data, cipher=this_vault.cipher_name, force_save=False)
|
||||||
|
|
||||||
def view_file(self):
|
def view_file(self):
|
||||||
|
|
||||||
|
@ -363,7 +379,7 @@ class VaultEditor(object):
|
||||||
if os.path.isfile(filename):
|
if os.path.isfile(filename):
|
||||||
os.remove(filename)
|
os.remove(filename)
|
||||||
f = open(filename, "wb")
|
f = open(filename, "wb")
|
||||||
f.write(to_bytes(data))
|
f.write(to_bytes(data, errors='strict'))
|
||||||
f.close()
|
f.close()
|
||||||
|
|
||||||
def shuffle_files(self, src, dest):
|
def shuffle_files(self, src, dest):
|
||||||
|
@ -401,19 +417,21 @@ class VaultFile(object):
|
||||||
|
|
||||||
_, self.tmpfile = tempfile.mkstemp()
|
_, self.tmpfile = tempfile.mkstemp()
|
||||||
|
|
||||||
|
### FIXME:
|
||||||
|
# __del__ can be problematic in python... For this use case, make
|
||||||
|
# VaultFile a context manager instead (implement __enter__ and __exit__)
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
self.filehandle.close()
|
self.filehandle.close()
|
||||||
os.unlink(self.tmplfile)
|
os.unlink(self.tmplfile)
|
||||||
|
|
||||||
def is_encrypted(self):
|
def is_encrypted(self):
|
||||||
peak = self.filehandler.readline()
|
peak = self.filehandle.readline()
|
||||||
if peak.startswith(HEADER):
|
if peak.startswith(b_HEADER):
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def get_decrypted(self):
|
def get_decrypted(self):
|
||||||
|
|
||||||
check_prereqs()
|
check_prereqs()
|
||||||
|
|
||||||
if self.is_encrypted():
|
if self.is_encrypted():
|
||||||
|
@ -423,7 +441,7 @@ class VaultFile(object):
|
||||||
if dec_data is None:
|
if dec_data is None:
|
||||||
raise AnsibleError("Decryption failed")
|
raise AnsibleError("Decryption failed")
|
||||||
else:
|
else:
|
||||||
self.tempfile.write(dec_data)
|
self.tmpfile.write(dec_data)
|
||||||
return self.tmpfile
|
return self.tmpfile
|
||||||
else:
|
else:
|
||||||
return self.filename
|
return self.filename
|
||||||
|
@ -432,13 +450,15 @@ class VaultFile(object):
|
||||||
# CIPHERS #
|
# CIPHERS #
|
||||||
########################################
|
########################################
|
||||||
|
|
||||||
class VaultAES(object):
|
class VaultAES:
|
||||||
|
|
||||||
# this version has been obsoleted by the VaultAES256 class
|
# this version has been obsoleted by the VaultAES256 class
|
||||||
# which uses encrypt-then-mac (fixing order) and also improving the KDF used
|
# which uses encrypt-then-mac (fixing order) and also improving the KDF used
|
||||||
# code remains for upgrade purposes only
|
# code remains for upgrade purposes only
|
||||||
# http://stackoverflow.com/a/16761459
|
# http://stackoverflow.com/a/16761459
|
||||||
|
|
||||||
|
# Note: strings in this class should be byte strings by default.
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
if not HAS_AES:
|
if not HAS_AES:
|
||||||
raise AnsibleError(CRYPTO_UPGRADE)
|
raise AnsibleError(CRYPTO_UPGRADE)
|
||||||
|
@ -449,8 +469,8 @@ class VaultAES(object):
|
||||||
|
|
||||||
d = d_i = b''
|
d = d_i = b''
|
||||||
while len(d) < key_length + iv_length:
|
while len(d) < key_length + iv_length:
|
||||||
text = "{0}{1}{2}".format(d_i, password, salt)
|
text = b"%s%s%s" % (d_i, password, salt)
|
||||||
d_i = md5(to_bytes(text)).digest()
|
d_i = to_bytes(md5(text).digest(), errors='strict')
|
||||||
d += d_i
|
d += d_i
|
||||||
|
|
||||||
key = d[:key_length]
|
key = d[:key_length]
|
||||||
|
@ -462,12 +482,11 @@ class VaultAES(object):
|
||||||
|
|
||||||
""" Read plaintext data from in_file and write encrypted to out_file """
|
""" Read plaintext data from in_file and write encrypted to out_file """
|
||||||
|
|
||||||
|
|
||||||
# combine sha + data
|
# combine sha + data
|
||||||
this_sha = sha256(to_bytes(data)).hexdigest()
|
this_sha = to_bytes(sha256(data).hexdigest())
|
||||||
tmp_data = this_sha + "\n" + data
|
tmp_data = this_sha + b"\n" + data
|
||||||
|
|
||||||
in_file = BytesIO(to_bytes(tmp_data))
|
in_file = BytesIO(tmp_data)
|
||||||
in_file.seek(0)
|
in_file.seek(0)
|
||||||
out_file = BytesIO()
|
out_file = BytesIO()
|
||||||
|
|
||||||
|
@ -475,7 +494,7 @@ class VaultAES(object):
|
||||||
|
|
||||||
# Get a block of random data. EL does not have Crypto.Random.new()
|
# Get a block of random data. EL does not have Crypto.Random.new()
|
||||||
# so os.urandom is used for cross platform purposes
|
# so os.urandom is used for cross platform purposes
|
||||||
salt = os.urandom(bs - len('Salted__'))
|
salt = os.urandom(bs - len(b'Salted__'))
|
||||||
|
|
||||||
key, iv = self.aes_derive_key_and_iv(password, salt, key_length, bs)
|
key, iv = self.aes_derive_key_and_iv(password, salt, key_length, bs)
|
||||||
cipher = AES.new(key, AES.MODE_CBC, iv)
|
cipher = AES.new(key, AES.MODE_CBC, iv)
|
||||||
|
@ -486,7 +505,7 @@ class VaultAES(object):
|
||||||
chunk = in_file.read(1024 * bs)
|
chunk = in_file.read(1024 * bs)
|
||||||
if len(chunk) == 0 or len(chunk) % bs != 0:
|
if len(chunk) == 0 or len(chunk) % bs != 0:
|
||||||
padding_length = (bs - len(chunk) % bs) or bs
|
padding_length = (bs - len(chunk) % bs) or bs
|
||||||
chunk += to_bytes(padding_length * chr(padding_length))
|
chunk += to_bytes(padding_length * chr(padding_length), errors='strict', encoding='ascii')
|
||||||
finished = True
|
finished = True
|
||||||
out_file.write(cipher.encrypt(chunk))
|
out_file.write(cipher.encrypt(chunk))
|
||||||
|
|
||||||
|
@ -503,7 +522,6 @@ class VaultAES(object):
|
||||||
|
|
||||||
# http://stackoverflow.com/a/14989032
|
# http://stackoverflow.com/a/14989032
|
||||||
|
|
||||||
data = b''.join(data.split(b'\n'))
|
|
||||||
data = unhexlify(data)
|
data = unhexlify(data)
|
||||||
|
|
||||||
in_file = BytesIO(data)
|
in_file = BytesIO(data)
|
||||||
|
@ -512,7 +530,7 @@ class VaultAES(object):
|
||||||
|
|
||||||
bs = AES.block_size
|
bs = AES.block_size
|
||||||
tmpsalt = in_file.read(bs)
|
tmpsalt = in_file.read(bs)
|
||||||
salt = tmpsalt[len('Salted__'):]
|
salt = tmpsalt[len(b'Salted__'):]
|
||||||
key, iv = self.aes_derive_key_and_iv(password, salt, key_length, bs)
|
key, iv = self.aes_derive_key_and_iv(password, salt, key_length, bs)
|
||||||
cipher = AES.new(key, AES.MODE_CBC, iv)
|
cipher = AES.new(key, AES.MODE_CBC, iv)
|
||||||
next_chunk = b''
|
next_chunk = b''
|
||||||
|
@ -536,13 +554,12 @@ class VaultAES(object):
|
||||||
out_file.seek(0)
|
out_file.seek(0)
|
||||||
out_data = out_file.read()
|
out_data = out_file.read()
|
||||||
out_file.close()
|
out_file.close()
|
||||||
new_data = to_unicode(out_data)
|
|
||||||
|
|
||||||
# split out sha and verify decryption
|
# split out sha and verify decryption
|
||||||
split_data = new_data.split("\n")
|
split_data = out_data.split(b"\n", 1)
|
||||||
this_sha = split_data[0]
|
this_sha = split_data[0]
|
||||||
this_data = '\n'.join(split_data[1:])
|
this_data = split_data[1]
|
||||||
test_sha = sha256(to_bytes(this_data)).hexdigest()
|
test_sha = to_bytes(sha256(this_data).hexdigest())
|
||||||
|
|
||||||
if this_sha != test_sha:
|
if this_sha != test_sha:
|
||||||
raise AnsibleError("Decryption failed")
|
raise AnsibleError("Decryption failed")
|
||||||
|
@ -550,7 +567,7 @@ class VaultAES(object):
|
||||||
return this_data
|
return this_data
|
||||||
|
|
||||||
|
|
||||||
class VaultAES256(object):
|
class VaultAES256:
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Vault implementation using AES-CTR with an HMAC-SHA256 authentication code.
|
Vault implementation using AES-CTR with an HMAC-SHA256 authentication code.
|
||||||
|
@ -559,6 +576,8 @@ class VaultAES256(object):
|
||||||
|
|
||||||
# http://www.daemonology.net/blog/2009-06-11-cryptographic-right-answers.html
|
# http://www.daemonology.net/blog/2009-06-11-cryptographic-right-answers.html
|
||||||
|
|
||||||
|
# Note: strings in this class should be byte strings by default.
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
|
||||||
check_prereqs()
|
check_prereqs()
|
||||||
|
@ -608,7 +627,7 @@ class VaultAES256(object):
|
||||||
# PKCS#7 PAD DATA http://tools.ietf.org/html/rfc5652#section-6.3
|
# PKCS#7 PAD DATA http://tools.ietf.org/html/rfc5652#section-6.3
|
||||||
bs = AES.block_size
|
bs = AES.block_size
|
||||||
padding_length = (bs - len(data) % bs) or bs
|
padding_length = (bs - len(data) % bs) or bs
|
||||||
data += padding_length * chr(padding_length)
|
data += to_bytes(padding_length * chr(padding_length), encoding='ascii', errors='strict')
|
||||||
|
|
||||||
# COUNTER.new PARAMETERS
|
# COUNTER.new PARAMETERS
|
||||||
# 1) nbits (integer) - Length of the counter, in bits.
|
# 1) nbits (integer) - Length of the counter, in bits.
|
||||||
|
@ -628,14 +647,13 @@ class VaultAES256(object):
|
||||||
|
|
||||||
# COMBINE SALT, DIGEST AND DATA
|
# COMBINE SALT, DIGEST AND DATA
|
||||||
hmac = HMAC.new(key2, cryptedData, SHA256)
|
hmac = HMAC.new(key2, cryptedData, SHA256)
|
||||||
message = b''.join([hexlify(salt), b"\n", to_bytes(hmac.hexdigest()), b"\n", hexlify(cryptedData)])
|
message = b'%s\n%s\n%s' % (hexlify(salt), to_bytes(hmac.hexdigest()), hexlify(cryptedData))
|
||||||
message = hexlify(message)
|
message = hexlify(message)
|
||||||
return message
|
return message
|
||||||
|
|
||||||
def decrypt(self, data, password):
|
def decrypt(self, data, password):
|
||||||
|
|
||||||
# SPLIT SALT, DIGEST, AND DATA
|
# SPLIT SALT, DIGEST, AND DATA
|
||||||
data = b''.join(data.split(b"\n"))
|
|
||||||
data = unhexlify(data)
|
data = unhexlify(data)
|
||||||
salt, cryptedHmac, cryptedData = data.split(b"\n", 2)
|
salt, cryptedHmac, cryptedData = data.split(b"\n", 2)
|
||||||
salt = unhexlify(salt)
|
salt = unhexlify(salt)
|
||||||
|
@ -663,7 +681,7 @@ class VaultAES256(object):
|
||||||
|
|
||||||
decryptedData = decryptedData[:-padding_length]
|
decryptedData = decryptedData[:-padding_length]
|
||||||
|
|
||||||
return to_unicode(decryptedData)
|
return decryptedData
|
||||||
|
|
||||||
def is_equal(self, a, b):
|
def is_equal(self, a, b):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -29,7 +29,6 @@ from ansible.plugins.action import ActionBase
|
||||||
from ansible.utils.boolean import boolean
|
from ansible.utils.boolean import boolean
|
||||||
from ansible.utils.hashing import checksum
|
from ansible.utils.hashing import checksum
|
||||||
from ansible.utils.unicode import to_bytes
|
from ansible.utils.unicode import to_bytes
|
||||||
from ansible.parsing.vault import VaultLib
|
|
||||||
|
|
||||||
class ActionModule(ActionBase):
|
class ActionModule(ActionBase):
|
||||||
|
|
||||||
|
|
|
@ -64,7 +64,7 @@ class TestVaultLib(unittest.TestCase):
|
||||||
slots = ['is_encrypted',
|
slots = ['is_encrypted',
|
||||||
'encrypt',
|
'encrypt',
|
||||||
'decrypt',
|
'decrypt',
|
||||||
'_add_header',
|
'_format_output',
|
||||||
'_split_header',]
|
'_split_header',]
|
||||||
for slot in slots:
|
for slot in slots:
|
||||||
assert hasattr(v, slot), "VaultLib is missing the %s method" % slot
|
assert hasattr(v, slot), "VaultLib is missing the %s method" % slot
|
||||||
|
@ -75,11 +75,11 @@ class TestVaultLib(unittest.TestCase):
|
||||||
data = u"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
|
data = u"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
|
||||||
assert v.is_encrypted(data), "encryption check on headered text failed"
|
assert v.is_encrypted(data), "encryption check on headered text failed"
|
||||||
|
|
||||||
def test_add_header(self):
|
def test_format_output(self):
|
||||||
v = VaultLib('ansible')
|
v = VaultLib('ansible')
|
||||||
v.cipher_name = "TEST"
|
v.cipher_name = "TEST"
|
||||||
sensitive_data = "ansible"
|
sensitive_data = "ansible"
|
||||||
data = v._add_header(sensitive_data)
|
data = v._format_output(sensitive_data)
|
||||||
lines = data.split(b'\n')
|
lines = data.split(b'\n')
|
||||||
assert len(lines) > 1, "failed to properly add header"
|
assert len(lines) > 1, "failed to properly add header"
|
||||||
header = to_unicode(lines[0])
|
header = to_unicode(lines[0])
|
||||||
|
@ -87,7 +87,7 @@ class TestVaultLib(unittest.TestCase):
|
||||||
header_parts = header.split(';')
|
header_parts = header.split(';')
|
||||||
assert len(header_parts) == 3, "header has the wrong number of parts"
|
assert len(header_parts) == 3, "header has the wrong number of parts"
|
||||||
assert header_parts[0] == '$ANSIBLE_VAULT', "header does not start with $ANSIBLE_VAULT"
|
assert header_parts[0] == '$ANSIBLE_VAULT', "header does not start with $ANSIBLE_VAULT"
|
||||||
assert header_parts[1] == v.version, "header version is incorrect"
|
assert header_parts[1] == v.b_version, "header version is incorrect"
|
||||||
assert header_parts[2] == 'TEST', "header does end with cipher name"
|
assert header_parts[2] == 'TEST', "header does end with cipher name"
|
||||||
|
|
||||||
def test_split_header(self):
|
def test_split_header(self):
|
||||||
|
@ -97,7 +97,7 @@ class TestVaultLib(unittest.TestCase):
|
||||||
lines = rdata.split(b'\n')
|
lines = rdata.split(b'\n')
|
||||||
assert lines[0] == b"ansible"
|
assert lines[0] == b"ansible"
|
||||||
assert v.cipher_name == 'TEST', "cipher name was not set"
|
assert v.cipher_name == 'TEST', "cipher name was not set"
|
||||||
assert v.version == "9.9"
|
assert v.b_version == "9.9"
|
||||||
|
|
||||||
def test_encrypt_decrypt_aes(self):
|
def test_encrypt_decrypt_aes(self):
|
||||||
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
|
||||||
|
|
Loading…
Reference in a new issue