pylint fixes for vault related code (#27721)

* rm unneeded parens following assert
* rm unused parse_vaulttext_envelope from yaml.constructor
* No longer need index/enumerate over vault_ids
* rm unnecessary else
* rm unused VaultCli.secrets
* rm unused vault_id arg on VaultAES.decrypt()

pylint: Unused argument 'vault_id'
pylint: Unused parse_vaulttext_envelope imported from ansible.parsing.vault
pylint: Unused variable 'index'
pylint: Unnecessary parens after 'assert' keyword
pylint: Unnecessary "else" after "return" (no-else-return)
pylint: Attribute 'editor' defined outside __init__

* use 'dummy' for unused variables instead of _

Based on pylint unused variable warnings.

Existing code use '_' for this, but that is old
and busted. The hot new thing is 'dummy'. It
is so fetch.

Except for where we get warnings for reusing
the 'dummy' var name inside of a list comprehension.

* Add super().__init__ call to PromptVaultSecret.__init__
pylint: __init__ method from base class 'VaultSecret' is not called (super-init-not-called)

* Make FileVaultSecret.read_file reg method again

The base class read_file() doesnt need self but
the sub classes do.

Rm now unneeded loader arg to read_file()

* Fix err msg string literal that had no effect
pylint: String statement has no effect

The indent on the continuation of the msg_format was wrong
so the second half was dropped.

There was also no need to join() filename (copy/paste from
original with a command list I assume...)

* Use local cipher_name in VaultEditor.edit_file not instance
pylint: Unused variable 'cipher_name'
pylint: Unused variable 'b_ciphertext'

Use the local cipher_name returned from parse_vaulttext_envelope()
instead of the instance self.cipher_name var.

Since there is only one valid cipher_name either way, it was
equilivent, but it will not be with more valid cipher_names

* Rm unused b_salt arg on VaultAES256._encrypt*
pylint: Unused argument 'b_salt'

Previously the methods computed the keys and iv themselves
so needed to be passed in the salt, but now the key/iv
are built before and passed in so b_salt arg is not used
anymore.

* rm redundant import of call from subprocess
pylint: Imports from package subprocess are not grouped

use via subprocess module now instead of direct
import.

* self._bytes is set in super init now, rm dup

* Make FileVaultSecret.read_file() -> _read_file()

_read_file() is details of the implementation of
load(), so now 'private'.
This commit is contained in:
Adrian Likins 2017-08-08 16:10:03 -04:00 committed by GitHub
parent 362f032449
commit c38ff3b8f8
5 changed files with 37 additions and 43 deletions

View file

@ -227,7 +227,7 @@ class CLI(with_metaclass(ABCMeta, object)):
vault_password_files,
ask_vault_pass)
for index, vault_id_slug in enumerate(vault_ids):
for vault_id_slug in vault_ids:
vault_id_name, vault_id_value = CLI.split_vault_id(vault_id_slug)
if vault_id_value in ['prompt', 'prompt_ask_vault_pass']:

View file

@ -211,10 +211,9 @@ class VaultCLI(CLI):
self.new_encrypt_secret = new_encrypt_secret[1]
loader.set_vault_secrets(vault_secrets)
self.secrets = vault_secrets
# FIXME: do we need to create VaultEditor here? its not reused
vault = VaultLib(self.secrets)
vault = VaultLib(vault_secrets)
self.editor = VaultEditor(vault)
self.execute()

View file

@ -32,7 +32,6 @@ from binascii import unhexlify
from hashlib import md5
from hashlib import sha256
from io import BytesIO
from subprocess import call
HAS_CRYPTOGRAPHY = False
HAS_PYCRYPTO = False
@ -75,6 +74,7 @@ except ImportError:
from ansible.errors import AnsibleError
from ansible import constants as C
from ansible.module_utils.six import PY3, binary_type
# Note: on py2, this zip is izip not the list based zip() builtin
from ansible.module_utils.six.moves import zip
from ansible.module_utils._text import to_bytes, to_text
@ -240,7 +240,7 @@ class PromptVaultSecret(VaultSecret):
default_prompt_formats = ["Vault password (%s): "]
def __init__(self, _bytes=None, vault_id=None, prompt_formats=None):
self._bytes = _bytes
super(PromptVaultSecret, self).__init__(_bytes=_bytes)
self.vault_id = vault_id
if prompt_formats is None:
@ -293,8 +293,8 @@ def get_file_vault_secret(filename=None, vault_id_name=None, encoding=None, load
if loader.is_executable(this_path):
# TODO: pass vault_id_name to script via cli
return ScriptVaultSecret(filename=this_path, encoding=encoding, loader=loader)
else:
return FileVaultSecret(filename=this_path, encoding=encoding, loader=loader)
return FileVaultSecret(filename=this_path, encoding=encoding, loader=loader)
# TODO: mv these classes to a seperate file so we don't pollute vault with 'subprocess' etc
@ -319,15 +319,15 @@ class FileVaultSecret(VaultSecret):
return None
def load(self):
self._bytes = self.read_file(self.filename, self.loader)
self._bytes = self._read_file(self.filename)
@staticmethod
def read_file(filename, loader):
def _read_file(self, filename):
"""
Read a vault password from a file or if executable, execute the script and
retrieve password from STDOUT
"""
# TODO: replace with use of self.loader
try:
f = open(filename, "rb")
vault_pass = f.read().strip()
@ -344,23 +344,21 @@ class FileVaultSecret(VaultSecret):
class ScriptVaultSecret(FileVaultSecret):
@staticmethod
def read_file(filename, loader):
if not loader.is_executable(filename):
def _read_file(self, filename):
if not self.loader.is_executable(filename):
raise AnsibleVaultError("The vault password script %s was not executable" % filename)
try:
# STDERR not captured to make it easier for users to prompt for input in their scripts
p = subprocess.Popen(filename, stdout=subprocess.PIPE)
except OSError as e:
msg_format = "Problem running vault password script %s (%s)."
"If this is not a script, remove the executable bit from the file."
msg = msg_format % (' '.join(filename), e)
msg_format = "Problem running vault password script %s (%s)." \
" If this is not a script, remove the executable bit from the file."
msg = msg_format % (filename, e)
raise AnsibleError(msg)
stdout, stderr = p.communicate()
stdout, dummy = p.communicate()
if p.returncode != 0:
raise AnsibleError("Vault password script %s returned non-zero (%s): %s" % (filename, p.returncode, p.stderr))
@ -394,7 +392,7 @@ def match_encrypt_secret(secrets):
'''Find the best/first/only secret in secrets to use for encrypting'''
# ie, consider all of the available secrets as matches
_vault_id_matchers = [_vault_id for _vault_id, _vault_secret in secrets]
_vault_id_matchers = [_vault_id for _vault_id, dummy in secrets]
best_secret = match_best_secret(secrets, _vault_id_matchers)
# can be empty list sans any tuple
return best_secret
@ -441,7 +439,7 @@ class VaultLib:
if secret is None:
if self.secrets:
secret_vault_id, secret = match_encrypt_secret(self.secrets)
dummy, secret = match_encrypt_secret(self.secrets)
else:
raise AnsibleVaultError("A vault password must be specified to encrypt data")
@ -489,7 +487,7 @@ class VaultLib:
msg += "%s is not a vault encrypted file" % filename
raise AnsibleError(msg)
b_vaulttext, b_version, cipher_name, vault_id = parse_vaulttext_envelope(b_vaulttext)
b_vaulttext, dummy, cipher_name, vault_id = parse_vaulttext_envelope(b_vaulttext)
# create the cipher object, note that the cipher used for decrypt can
# be different than the cipher used for encrypt
@ -528,7 +526,7 @@ class VaultLib:
# the known vault secrets.
if not C.DEFAULT_VAULT_ID_MATCH:
# Add all of the known vault_ids as candidates for decrypting a vault.
vault_id_matchers.extend([_vault_id for _vault_id, _secret in self.secrets if _vault_id != vault_id])
vault_id_matchers.extend([_vault_id for _vault_id, _dummy in self.secrets if _vault_id != vault_id])
matched_secrets = match_secrets(self.secrets, vault_id_matchers)
@ -600,7 +598,7 @@ class VaultEditor:
fh.write(data)
fh.write(data[:file_len % chunk_len])
assert(fh.tell() == file_len) # FIXME remove this assert once we have unittests to check its accuracy
assert fh.tell() == file_len # FIXME remove this assert once we have unittests to check its accuracy
os.fsync(fh)
def _shred_file(self, tmp_path):
@ -621,7 +619,7 @@ class VaultEditor:
return
try:
r = call(['shred', tmp_path])
r = subprocess.call(['shred', tmp_path])
except (OSError, ValueError):
# shred is not available on this system, or some other error occurred.
# ValueError caught because OS X El Capitan is raising an
@ -648,7 +646,7 @@ class VaultEditor:
self.write_data(existing_data, tmp_path, shred=False)
# drop the user into an editor on the tmp file
call(self._editor_shell_command(tmp_path))
subprocess.call(self._editor_shell_command(tmp_path))
except:
# whatever happens, destroy the decrypted file
self._shred_file(tmp_path)
@ -737,13 +735,13 @@ class VaultEditor:
# Figure out the vault id from the file, to select the right secret to re-encrypt it
# (duplicates parts of decrypt, but alas...)
b_ciphertext, b_version, cipher_name, vault_id = parse_vaulttext_envelope(b_vaulttext)
dummy, dummy, cipher_name, vault_id = parse_vaulttext_envelope(b_vaulttext)
# if we could decrypt, the vault_id should be in secrets
# though we could have multiple secrets for a given vault_id, pick the first one
secrets = match_secrets(self.vault.secrets, [vault_id])
secret = secrets[0][1]
if self.vault.cipher_name not in CIPHER_WRITE_WHITELIST:
if cipher_name not in CIPHER_WRITE_WHITELIST:
# we want to get rid of files encrypted with the AES cipher
self._edit_file_helper(filename, secret, existing_data=plaintext, force_save=True)
else:
@ -987,7 +985,7 @@ class VaultAES:
return b_plaintext
@classmethod
def decrypt(cls, b_vaulttext, secret, key_length=32, vault_id=None):
def decrypt(cls, b_vaulttext, secret, key_length=32):
""" Decrypt the given data and return it
:arg b_data: A byte string containing the encrypted data
@ -1084,7 +1082,7 @@ class VaultAES256:
return b_key1, b_key2, b_iv
@staticmethod
def _encrypt_cryptography(b_plaintext, b_salt, b_key1, b_key2, b_iv):
def _encrypt_cryptography(b_plaintext, b_key1, b_key2, b_iv):
cipher = C_Cipher(algorithms.AES(b_key1), modes.CTR(b_iv), CRYPTOGRAPHY_BACKEND)
encryptor = cipher.encryptor()
padder = padding.PKCS7(algorithms.AES.block_size).padder()
@ -1099,7 +1097,7 @@ class VaultAES256:
return to_bytes(hexlify(b_hmac), errors='surrogate_or_strict'), hexlify(b_ciphertext)
@staticmethod
def _encrypt_pycrypto(b_plaintext, b_salt, b_key1, b_key2, b_iv):
def _encrypt_pycrypto(b_plaintext, b_key1, b_key2, b_iv):
# PKCS#7 PAD DATA http://tools.ietf.org/html/rfc5652#section-6.3
bs = AES_pycrypto.block_size
padding_length = (bs - len(b_plaintext) % bs) or bs
@ -1135,9 +1133,9 @@ class VaultAES256:
b_key1, b_key2, b_iv = cls._gen_key_initctr(b_password, b_salt)
if HAS_CRYPTOGRAPHY:
b_hmac, b_ciphertext = cls._encrypt_cryptography(b_plaintext, b_salt, b_key1, b_key2, b_iv)
b_hmac, b_ciphertext = cls._encrypt_cryptography(b_plaintext, b_key1, b_key2, b_iv)
elif HAS_PYCRYPTO:
b_hmac, b_ciphertext = cls._encrypt_pycrypto(b_plaintext, b_salt, b_key1, b_key2, b_iv)
b_hmac, b_ciphertext = cls._encrypt_pycrypto(b_plaintext, b_key1, b_key2, b_iv)
else:
raise AnsibleError(NEED_CRYPTO_LIBRARY + '(Detected in encrypt)')

View file

@ -26,8 +26,7 @@ from ansible.module_utils._text import to_bytes
from ansible.parsing.yaml.objects import AnsibleMapping, AnsibleSequence, AnsibleUnicode
from ansible.parsing.yaml.objects import AnsibleVaultEncryptedUnicode
from ansible.utils.unsafe_proxy import wrap_var
from ansible.parsing.vault import VaultLib, parse_vaulttext_envelope
from ansible.parsing.vault import VaultLib
try:
from __main__ import display

View file

@ -104,7 +104,7 @@ class TestVaultEditor(unittest.TestCase):
vault_secrets = self._secrets(self.vault_password)
return VaultEditor(VaultLib(vault_secrets))
@patch('ansible.parsing.vault.call')
@patch('ansible.parsing.vault.subprocess.call')
def test_edit_file_helper_empty_target(self, mock_sp_call):
self._test_dir = self._create_test_dir()
@ -118,7 +118,7 @@ class TestVaultEditor(unittest.TestCase):
self.assertNotEqual(src_contents, b_ciphertext)
@patch('ansible.parsing.vault.call')
@patch('ansible.parsing.vault.subprocess.call')
def test_edit_file_helper_call_exception(self, mock_sp_call):
self._test_dir = self._create_test_dir()
@ -136,7 +136,7 @@ class TestVaultEditor(unittest.TestCase):
src_file_path,
self.vault_secret)
@patch('ansible.parsing.vault.call')
@patch('ansible.parsing.vault.subprocess.call')
def test_edit_file_helper_symlink_target(self, mock_sp_call):
self._test_dir = self._create_test_dir()
@ -170,7 +170,7 @@ class TestVaultEditor(unittest.TestCase):
def _faux_command(self, tmp_path):
pass
@patch('ansible.parsing.vault.call')
@patch('ansible.parsing.vault.subprocess.call')
def test_edit_file_helper_no_change(self, mock_sp_call):
self._test_dir = self._create_test_dir()
@ -308,7 +308,7 @@ class TestVaultEditor(unittest.TestCase):
self._assert_file_is_link(src_file_link_path, src_file_path)
@patch('ansible.parsing.vault.call')
@patch('ansible.parsing.vault.subprocess.call')
def test_edit_file(self, mock_sp_call):
self._test_dir = self._create_test_dir()
src_contents = to_bytes("some info in a file\nyup.")
@ -333,9 +333,7 @@ class TestVaultEditor(unittest.TestCase):
src_file_plaintext = ve.vault.decrypt(new_src_file_contents)
self.assertEqual(src_file_plaintext, new_src_contents)
new_stat = os.stat(src_file_path)
@patch('ansible.parsing.vault.call')
@patch('ansible.parsing.vault.subprocess.call')
def test_edit_file_symlink(self, mock_sp_call):
self._test_dir = self._create_test_dir()
src_contents = to_bytes("some info in a file\nyup.")
@ -371,7 +369,7 @@ class TestVaultEditor(unittest.TestCase):
# self.assertEqual(src_file_plaintext, new_src_contents,
# 'The decrypted plaintext of the editted file is not the expected contents.')
@patch('ansible.parsing.vault.call')
@patch('ansible.parsing.vault.subprocess.call')
def test_edit_file_not_encrypted(self, mock_sp_call):
self._test_dir = self._create_test_dir()
src_contents = to_bytes("some info in a file\nyup.")