Rekey on member (#33836)

* Change cast_list_to_dict to more generic rekey_on_member

cast_list_to_dict was taking an arbitrary data format in and returning
an arbitrary data format out.  Rework this to be a more generic function
which creates a dict of dicts based on a member of the dict.

Remove cast_dict_to_list since rekey_on_member handles the use cases we
know about and cast_dict_to_list suffers from the same problems as
cast_list_to_dict.  If this is still needed we could think about filters
we could add to do this in a short jinja2 pipeline.

* Fix bare excepts (bare excepts even catch sys.exit())
This commit is contained in:
Toshio Kuratomi 2017-12-12 19:02:15 -08:00 committed by GitHub
parent 5b6ba8cbfd
commit 155f36bbd8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 99 additions and 129 deletions

View file

@ -1,63 +0,0 @@
# Author Ken Celenza <ken@networktocode.com>
# Author Jason Edelman <jason@networktocode.com>
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ansible.errors import AnsibleError, AnsibleFilterError
def cast_list_to_dict(data, key):
new_obj = {}
if not isinstance(data, list):
raise AnsibleFilterError("Type is not a valid list")
for item in data:
if not isinstance(item, dict):
raise AnsibleFilterError("List item is not a valid dict")
try:
key_elem = item.get(key)
except Exception as e:
raise AnsibleFilterError(str(e))
if new_obj.get(key_elem):
raise AnsibleFilterError("Key {0} is not unique, cannot correctly turn into dict".format(key_elem))
elif not key_elem:
raise AnsibleFilterError("Key {0} was not found".format(key))
else:
new_obj[key_elem] = item
return new_obj
def cast_dict_to_list(data, key_name):
new_obj = []
if not isinstance(data, dict):
raise AnsibleFilterError("Type is not a valid dict")
for key, value in data.items():
if not isinstance(value, dict):
raise AnsibleFilterError("Type of key {0} value {1} is not a valid dict".format(key, value))
if value.get(key_name):
raise AnsibleFilterError("Key name {0} is already in use, cannot correctly turn into dict".format(key_name))
value[key_name] = key
new_obj.append(value)
return new_obj
class FilterModule(object):
'''Convert a list to a dictionary provided a key that exists in all dicts.
If it does not, that dict is omitted
'''
def filters(self):
return {
'cast_list_to_dict': cast_list_to_dict,
'cast_dict_to_list': cast_dict_to_list,
}
if __name__ == "__main__":
list_data = [{"proto": "eigrp", "state": "enabled"}, {"proto": "ospf", "state": "enabled"}]
print(cast_list_to_dict(list_data, 'proto'))
dict_data = {'eigrp': {'state': 'enabled', 'as': '1'}, 'ospf': {'state': 'enabled', 'as': '2'}}
print(cast_dict_to_list(dict_data, 'proto'))

View file

@ -1,4 +1,7 @@
# (c) 2014, Brian Coca <bcoca@ansible.com>
# Copyright 2014, Brian Coca <bcoca@ansible.com>
# Copyright 2017, Ken Celenza <ken@networktocode.com>
# Copyright 2017, Jason Edelman <jason@networktocode.com>
# Copyright 2017, Ansible Project
#
# This file is part of Ansible
#
@ -26,7 +29,9 @@ import math
from ansible import errors
from ansible.module_utils import basic
from ansible.module_utils.six import binary_type, text_type
from ansible.module_utils.six.moves import zip, zip_longest
from ansible.module_utils._text import to_native
def unique(a):
@ -113,7 +118,7 @@ def human_readable(size, isbits=False, unit=None):
''' Return a human readable string '''
try:
return basic.bytes_to_human(size, isbits, unit)
except:
except Exception:
raise errors.AnsibleFilterError("human_readable() can't interpret following string: %s" % size)
@ -121,10 +126,55 @@ def human_to_bytes(size, default_unit=None, isbits=False):
''' Return bytes count from a human readable string '''
try:
return basic.human_to_bytes(size, default_unit, isbits)
except:
except Exception:
raise errors.AnsibleFilterError("human_to_bytes() can't interpret following string: %s" % size)
def rekey_on_member(data, key, duplicates='error'):
"""
Rekey a dict of dicts on another member
May also create a dict from a list of dicts.
duplicates can be one of ``error`` or ``overwrite`` to specify whether to error out if the key
value would be duplicated or to overwrite previous entries if that's the case.
"""
if duplicates not in ('error', 'overwrite'):
raise errors.AnsibleFilterError("duplicates parameter to rekey_on_member has unknown value: {0}".format(duplicates))
new_obj = {}
if isinstance(data, collections.Mapping):
iterate_over = data.values()
elif isinstance(data, collections.Iterable) and not isinstance(data, (text_type, binary_type)):
iterate_over = data
else:
raise errors.AnsibleFilterError("Type is not a valid list, set, or dict")
for item in iterate_over:
if not isinstance(item, collections.Mapping):
raise errors.AnsibleFilterError("List item is not a valid dict")
try:
key_elem = item[key]
except KeyError:
raise errors.AnsibleFilterError("Key {0} was not found".format(key))
except Exception as e:
raise errors.AnsibleFilterError(to_native(e))
# Note: if new_obj[key_elem] exists it will always be a non-empty dict (it will at
# minimun contain {key: key_elem}
if new_obj.get(key_elem, None):
if duplicates == 'error':
raise errors.AnsibleFilterError("Key {0} is not unique, cannot correctly turn into dict".format(key_elem))
elif duplicates == 'overwrite':
new_obj[key_elem] = item
else:
new_obj[key_elem] = item
return new_obj
class FilterModule(object):
''' Ansible math jinja2 filters '''
@ -154,6 +204,7 @@ class FilterModule(object):
# computer theory
'human_readable': human_readable,
'human_to_bytes': human_to_bytes,
'rekey_on_member': rekey_on_member,
# zip
'zip': zip,

View file

@ -1,63 +0,0 @@
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ansible.compat.tests import unittest
from ansible.plugins.filter.cast_type import (cast_list_to_dict, cast_dict_to_list)
from ansible.errors import AnsibleError, AnsibleFilterError
class TestTypeFilter(unittest.TestCase):
def test_cast_list_to_dict(self):
# Good test
list_original = [{"proto": "eigrp", "state": "enabled"}, {"proto": "ospf", "state": "enabled"}]
key = 'proto'
dict_return = {'eigrp': {'state': 'enabled', 'proto': 'eigrp'}, 'ospf': {'state': 'enabled', 'proto': 'ospf'}}
self.assertEqual(cast_list_to_dict(list_original, key), dict_return)
# Fail when key is not found
key = 'key_not_to_be_found'
self.assertRaisesRegexp(AnsibleFilterError, 'was not found', cast_list_to_dict, list_original, key)
# Fail when key is duplicated
list_original = [{"proto": "eigrp", "state": "enabled"}, {"proto": "ospf", "state": "enabled"}, {"proto": "ospf", "state": "enabled"}]
key = 'proto'
self.assertRaisesRegexp(AnsibleFilterError, 'is not unique', cast_list_to_dict, list_original, key)
# Fail when list item is not a dict
list_original = [{"proto": "eigrp", "state": "enabled"}, "ospf"]
key = 'proto'
self.assertRaisesRegexp(AnsibleFilterError, 'List item is not a valid dict', cast_list_to_dict, list_original, key)
# Fail when a non list is sent
list_original = {"proto": "eigrp", "state": "enabled"}
key = 'proto'
self.assertRaisesRegexp(AnsibleFilterError, 'not a valid list', cast_list_to_dict, list_original, key)
def test_cast_dict_to_list(self):
# Good test
dict_original = {'eigrp': {'state': 'enabled', 'as': '1'}, 'ospf': {'state': 'enabled', 'as': '2'}}
key_name = 'proto'
list_return = [{'state': 'enabled', 'proto': 'ospf', 'as': '2'}, {'state': 'enabled', 'proto': 'eigrp', 'as': '1'}]
actual_return = cast_dict_to_list(dict_original, key_name)
try:
_assertItemsEqual = self.assertCountEqual
_assertItemsEqual(actual_return, list_return)
except AttributeError:
self.assertEqual(sorted(actual_return), sorted(list_return))
# Fail when dict key is already used
dict_original = {'eigrp': {'state': 'enabled', 'as': '1', 'proto': 'bgp'}, 'ospf': {'state': 'enabled', 'as': '2'}}
key_name = 'proto'
self.assertRaisesRegexp(AnsibleFilterError, ' already in use, cannot correctly turn into dict', cast_dict_to_list, dict_original, key_name)
# Fail when sending a non-dict
dict_original = [{'eigrp': {'state': 'enabled', 'as': '1'}, 'ospf': {'state': 'enabled', 'as': '2'}}]
key_name = 'proto'
self.assertRaisesRegexp(AnsibleFilterError, 'Type is not a valid dict', cast_dict_to_list, dict_original, key_name)
# Fail when dict value is not a dict
dict_original = {'eigrp': [{'state': 'enabled', 'as': '1'}], 'ospf': {'state': 'enabled', 'as': '2'}}
key_name = 'proto'
self.assertRaisesRegexp(AnsibleFilterError, 'Type of key', cast_dict_to_list, dict_original, key_name)

View file

@ -120,3 +120,48 @@ class TestInversePower:
def test_cube_root(self):
assert ms.inversepower(27, 3) == 3
class TestRekeyOnMember():
# (Input data structure, member to rekey on, expected return)
VALID_ENTRIES = (
([{"proto": "eigrp", "state": "enabled"}, {"proto": "ospf", "state": "enabled"}],
'proto',
{'eigrp': {'state': 'enabled', 'proto': 'eigrp'}, 'ospf': {'state': 'enabled', 'proto': 'ospf'}}),
({'eigrp': {"proto": "eigrp", "state": "enabled"}, 'ospf': {"proto": "ospf", "state": "enabled"}},
'proto',
{'eigrp': {'state': 'enabled', 'proto': 'eigrp'}, 'ospf': {'state': 'enabled', 'proto': 'ospf'}}),
)
# (Input data structure, member to rekey on, expected error message)
INVALID_ENTRIES = (
# Fail when key is not found
([{"proto": "eigrp", "state": "enabled"}], 'invalid_key', "Key invalid_key was not found"),
({"eigrp": {"proto": "eigrp", "state": "enabled"}}, 'invalid_key', "Key invalid_key was not found"),
# Fail when key is duplicated
([{"proto": "eigrp"}, {"proto": "ospf"}, {"proto": "ospf"}],
'proto', 'Key ospf is not unique, cannot correctly turn into dict'),
# Fail when value is not a dict
(["string"], 'proto', "List item is not a valid dict"),
([123], 'proto', "List item is not a valid dict"),
([[{'proto': 1}]], 'proto', "List item is not a valid dict"),
# Fail when we do not send a dict or list
("string", 'proto', "Type is not a valid list, set, or dict"),
(123, 'proto', "Type is not a valid list, set, or dict"),
)
@pytest.mark.parametrize("list_original, key, expected", VALID_ENTRIES)
def test_rekey_on_member_success(self, list_original, key, expected):
assert ms.rekey_on_member(list_original, key) == expected
@pytest.mark.parametrize("list_original, key, expected", INVALID_ENTRIES)
def test_fail_rekey_on_member(self, list_original, key, expected):
with pytest.raises(AnsibleFilterError) as err:
ms.rekey_on_member(list_original, key)
assert err.value.message == expected
def test_duplicate_strategy_overwrite(self):
list_original = ({'proto': 'eigrp', 'id': 1}, {'proto': 'ospf', 'id': 2}, {'proto': 'eigrp', 'id': 3})
expected = {'eigrp': {'proto': 'eigrp', 'id': 3}, 'ospf': {'proto': 'ospf', 'id': 2}}
assert ms.rekey_on_member(list_original, 'proto', duplicates='overwrite') == expected