From dbcfb3d0fe0bc3d6c4c29019fc78ffcf8c40922f Mon Sep 17 00:00:00 2001 From: Chris Archibald Date: Mon, 11 Mar 2019 06:56:30 -0700 Subject: [PATCH] Update to na_ontap_firewall_policy (#51976) * Revert "changes to clusteR" This reverts commit 33ee1b71e4bc8435fb315762a871f8c4cb6c5f80. * Fix issue and unit tests * update to firewall * fix import issues * Revert "Revert "changes to clusteR"" This reverts commit 2713c75f31cbf81ef1785d6ab9ea5d0d4db9af60. * fix docs * stop pylint on unicode line, line can only be run in python2 * Review comment * add pylint skip * add pylint skip --- .../netapp/na_ontap_firewall_policy.py | 340 ++++++++++-------- .../netapp/test_na_ontap_firewall_policy.py | 286 +++++++++++++++ 2 files changed, 470 insertions(+), 156 deletions(-) create mode 100644 test/units/modules/storage/netapp/test_na_ontap_firewall_policy.py diff --git a/lib/ansible/modules/storage/netapp/na_ontap_firewall_policy.py b/lib/ansible/modules/storage/netapp/na_ontap_firewall_policy.py index 8dfa3e70721..92ccc8a9b85 100644 --- a/lib/ansible/modules/storage/netapp/na_ontap_firewall_policy.py +++ b/lib/ansible/modules/storage/netapp/na_ontap_firewall_policy.py @@ -1,6 +1,6 @@ #!/usr/bin/python -# (c) 2018, NetApp, Inc +# (c) 2018-2019, NetApp, Inc # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) from __future__ import absolute_import, division, print_function @@ -16,71 +16,66 @@ short_description: NetApp ONTAP Manage a firewall policy version_added: '2.7' author: NetApp Ansible Team (@carchi8py) description: - - Manage a firewall policy for an Ontap Cluster + - Configure firewall on an ONTAP node and manage firewall policy for an ONTAP SVM extends_documentation_fragment: - netapp.na_ontap +requirements: + - Python package ipaddress. Install using 'pip install ipaddress' options: state: description: - - Whether to set up a fire policy or not + - Whether to set up a firewall policy or not choices: ['present', 'absent'] default: present allow_list: description: - - A list of IPs and masks to use + - A list of IPs and masks to use. + - The host bits of the IP addresses used in this list must be set to 0. policy: description: - A policy name for the firewall policy - required: true service: description: - The service to apply the policy to choices: ['dns', 'http', 'https', 'ndmp', 'ndmps', 'ntp', 'rsh', 'snmp', 'ssh', 'telnet'] - required: true vserver: description: - The Vserver to apply the policy to. - required: true enable: description: - - enabled firewall + - enable firewall on a node choices: ['enable', 'disable'] - default: enable logging: description: - - enable logging + - enable logging for firewall on a node choices: ['enable', 'disable'] - default: disable node: description: - The node to run the firewall configuration on - required: True ''' EXAMPLES = """ - name: create firewall Policy na_ontap_firewall_policy: state: present - allow_list: [1.2.3.4/24,1.3.3.4/24] + allow_list: [1.2.3.0/24,1.3.0.0/16] policy: pizza service: http vserver: ci_dev hostname: "{{ netapp hostname }}" username: "{{ netapp username }}" password: "{{ netapp password }}" - node: laurentn-vsim1 - name: Modify firewall Policy na_ontap_firewall_policy: state: present - allow_list: [1.2.3.4/24,1.3.3.4/24] + allow_list: [1.5.3.0/24] policy: pizza service: http vserver: ci_dev hostname: "{{ netapp hostname }}" username: "{{ netapp username }}" password: "{{ netapp password }}" - node: laurentn-vsim1 - name: Destory firewall Policy na_ontap_firewall_policy: @@ -91,7 +86,16 @@ EXAMPLES = """ hostname: "{{ netapp hostname }}" username: "{{ netapp username }}" password: "{{ netapp password }}" - node: laurentn-vsim1 + + - name: Enable firewall and logging on a node + na_ontap_firewall_policy: + node: test-vsim1 + enable: enable + logging: enable + hostname: "{{ netapp hostname }}" + username: "{{ netapp username }}" + password: "{{ netapp password }}" + """ RETURN = """ @@ -103,6 +107,13 @@ from ansible.module_utils.basic import AnsibleModule from ansible.module_utils._text import to_native import ansible.module_utils.netapp as netapp_utils from ansible.module_utils.netapp_module import NetAppModule +try: + import ipaddress + HAS_IPADDRESS_LIB = True +except ImportError: + HAS_IPADDRESS_LIB = False + +import sys HAS_NETAPP_LIB = netapp_utils.has_netapp_lib() @@ -113,16 +124,20 @@ class NetAppONTAPFirewallPolicy(object): self.argument_spec.update(dict( state=dict(required=False, choices=['present', 'absent'], default='present'), allow_list=dict(required=False, type="list"), - policy=dict(required=True, type='str'), - service=dict(required=True, type='str', choices=['dns', 'http', 'https', 'ndmp', 'ndmps', 'ntp', 'rsh', 'snmp', 'ssh', 'telnet']), - vserver=dict(required=True, type="str"), - enable=dict(required=False, type="str", choices=['enable', 'disable'], default='enable'), - logging=dict(required=False, type="str", choices=["enable", 'disable'], default='disable'), - node=dict(required=True, type="str") + policy=dict(required=False, type='str'), + service=dict(required=False, type='str', choices=['dns', 'http', 'https', 'ndmp', + 'ndmps', 'ntp', 'rsh', 'snmp', 'ssh', 'telnet']), + vserver=dict(required=False, type="str"), + enable=dict(required=False, type="str", choices=['enable', 'disable']), + logging=dict(required=False, type="str", choices=['enable', 'disable']), + node=dict(required=False, type="str") )) self.module = AnsibleModule( argument_spec=self.argument_spec, + required_together=(['policy', 'service', 'vserver'], + ['enable', 'node'] + ), supports_check_mode=True ) @@ -133,33 +148,32 @@ class NetAppONTAPFirewallPolicy(object): self.module.fail_json(msg="the python NetApp-Lib module is required") else: self.server = netapp_utils.setup_na_ontap_zapi(module=self.module) + + if HAS_IPADDRESS_LIB is False: + self.module.fail_json(msg="the python ipaddress lib is required for this module") return - def create_firewall_policy(self): - """ - Create a firewall policy - :return: Nothing - """ - net_firewall_policy_obj = netapp_utils.zapi.NaElement("net-firewall-policy-create") - net_firewall_policy_obj = self.create_modify_policy(net_firewall_policy_obj) - try: - self.server.invoke_successfully(net_firewall_policy_obj, enable_tunneling=True) - except netapp_utils.zapi.NaApiError as error: - self.module.fail_json(msg="Error creating Firewall Policy: %s" % (to_native(error)), exception=traceback.format_exc()) - - def destroy_firewall_policy(self): - """ - Destroy a Firewall Policy - :return: None - """ - net_firewall_policy_obj = netapp_utils.zapi.NaElement("net-firewall-policy-destroy") - net_firewall_policy_obj.add_new_child('policy', self.parameters['policy']) - net_firewall_policy_obj.add_new_child('service', self.parameters['service']) - net_firewall_policy_obj.add_new_child('vserver', self.parameters['vserver']) - try: - self.server.invoke_successfully(net_firewall_policy_obj, enable_tunneling=True) - except netapp_utils.zapi.NaApiError as error: - self.module.fail_json(msg="Error destroying Firewall Policy: %s" % (to_native(error)), exception=traceback.format_exc()) + def validate_ip_addresses(self): + ''' + Validate if the given IP address is a network address (i.e. it's host bits are set to 0) + ONTAP doesn't validate if the host bits are set, + and hence doesn't add a new address unless the IP is from a different network. + So this validation allows the module to be idempotent. + :return: None + ''' + for ip in self.parameters['allow_list']: + # create an IPv4 object for current IP address + if sys.version_info[0] >= 3: + ip_addr = str(ip) + else: + ip_addr = unicode(ip) # pylint: disable=undefined-variable + # get network address from netmask, throw exception if address is not a network address + try: + ipaddress.ip_network(ip_addr) + except ValueError as exc: + self.module.fail_json(msg='Error: Invalid IP address value for allow_list parameter.' + 'Please specify a network address without host bits set: %s' + % (to_native(exc))) def get_firewall_policy(self): """ @@ -167,147 +181,161 @@ class NetAppONTAPFirewallPolicy(object): :return: returns a firewall policy object, or returns False if there are none """ net_firewall_policy_obj = netapp_utils.zapi.NaElement("net-firewall-policy-get-iter") - net_firewall_policy_info = netapp_utils.zapi.NaElement("net-firewall-policy-info") - query = netapp_utils.zapi.NaElement('query') - net_firewall_policy_info.add_new_child('policy', self.parameters['policy']) - query.add_child_elem(net_firewall_policy_info) - net_firewall_policy_obj.add_child_elem(query) - result = self.server.invoke_successfully(net_firewall_policy_obj, True) - if result.get_child_by_name('num-records') and \ - int(result.get_child_content('num-records')) == 1: - return result - return False + attributes = { + 'query': { + 'net-firewall-policy-info': self.firewall_policy_attributes() + } + } + net_firewall_policy_obj.translate_struct(attributes) - def modify_firewall_policy(self): + try: + result = self.server.invoke_successfully(net_firewall_policy_obj, True) + except netapp_utils.zapi.NaApiError as error: + self.module.fail_json(msg="Error getting firewall policy %s:%s" % (self.parameters['policy'], + to_native(error)), + exception=traceback.format_exc()) + + if result.get_child_by_name('num-records') and int(result.get_child_content('num-records')) >= 1: + attributes_list = result.get_child_by_name('attributes-list') + policy_info = attributes_list.get_child_by_name('net-firewall-policy-info') + ips = self.na_helper.get_value_for_list(from_zapi=True, + zapi_parent=policy_info.get_child_by_name('allow-list')) + return { + 'service': policy_info['service'], + 'allow_list': ips} + return None + + def create_firewall_policy(self): """ - Modify a firewall Policy + Create a firewall policy for given vserver + :return: None + """ + net_firewall_policy_obj = netapp_utils.zapi.NaElement("net-firewall-policy-create") + net_firewall_policy_obj.translate_struct(self.firewall_policy_attributes()) + if self.parameters.get('allow_list'): + self.validate_ip_addresses() + net_firewall_policy_obj.add_child_elem(self.na_helper.get_value_for_list(from_zapi=False, + zapi_parent='allow-list', + zapi_child='ip-and-mask', + data=self.parameters['allow_list']) + ) + try: + self.server.invoke_successfully(net_firewall_policy_obj, enable_tunneling=True) + except netapp_utils.zapi.NaApiError as error: + self.module.fail_json(msg="Error creating Firewall Policy: %s" % (to_native(error)), exception=traceback.format_exc()) + + def destroy_firewall_policy(self): + """ + Destroy a Firewall Policy from a vserver + :return: None + """ + net_firewall_policy_obj = netapp_utils.zapi.NaElement("net-firewall-policy-destroy") + net_firewall_policy_obj.translate_struct(self.firewall_policy_attributes()) + try: + self.server.invoke_successfully(net_firewall_policy_obj, enable_tunneling=True) + except netapp_utils.zapi.NaApiError as error: + self.module.fail_json(msg="Error destroying Firewall Policy: %s" % (to_native(error)), exception=traceback.format_exc()) + + def modify_firewall_policy(self, modify): + """ + Modify a firewall Policy on a vserver :return: none """ + self.validate_ip_addresses() net_firewall_policy_obj = netapp_utils.zapi.NaElement("net-firewall-policy-modify") - net_firewall_policy_obj = self.create_modify_policy(net_firewall_policy_obj) + net_firewall_policy_obj.translate_struct(self.firewall_policy_attributes()) + net_firewall_policy_obj.add_child_elem(self.na_helper.get_value_for_list(from_zapi=False, + zapi_parent='allow-list', + zapi_child='ip-and-mask', + data=modify['allow_list'])) try: self.server.invoke_successfully(net_firewall_policy_obj, enable_tunneling=True) except netapp_utils.zapi.NaApiError as error: self.module.fail_json(msg="Error modifying Firewall Policy: %s" % (to_native(error)), exception=traceback.format_exc()) - def create_modify_policy(self, net_firewall_policy_obj): - """ - Set up the parameters for creating or modifying a policy - :param net_firewall_policy_obj: The Firewall policy to modify - :return: - """ - net_firewall_policy_obj.add_new_child('policy', self.parameters['policy']) - net_firewall_policy_obj.add_new_child('service', self.parameters['service']) - net_firewall_policy_obj.add_new_child('vserver', self.parameters['vserver']) - allow_ip_list = netapp_utils.zapi.NaElement("allow-list") - for each in self.parameters['allow_list']: - net_firewall_policy_ip = netapp_utils.zapi.NaElement("ip-and-mask") - net_firewall_policy_ip.set_content(each) - allow_ip_list.add_child_elem(net_firewall_policy_ip) - net_firewall_policy_obj.add_child_elem(allow_ip_list) - return net_firewall_policy_obj + def firewall_policy_attributes(self): + return { + 'policy': self.parameters['policy'], + 'service': self.parameters['service'], + 'vserver': self.parameters['vserver'], + } - def get_firewall_config(self): + def get_firewall_config_for_node(self): """ - Get a firewall configuration - :return: the firewall configuration + Get firewall configuration on the node + :return: dict() with firewall config details """ + if self.parameters.get('logging'): + if self.parameters.get('node') is None: + self.module.fail_json(msg='Error: Missing parameter \'node\' to modify firewall logging') net_firewall_config_obj = netapp_utils.zapi.NaElement("net-firewall-config-get") net_firewall_config_obj.add_new_child('node-name', self.parameters['node']) try: result = self.server.invoke_successfully(net_firewall_config_obj, True) except netapp_utils.zapi.NaApiError as error: - self.module.fail_json(msg="Error getting Firewall Configuration: %s" % (to_native(error)), exception=traceback.format_exc()) - return result + self.module.fail_json(msg="Error getting Firewall Configuration: %s" % (to_native(error)), + exception=traceback.format_exc()) + if result.get_child_by_name('attributes'): + firewall_info = result['attributes'].get_child_by_name('net-firewall-config-info') + return {'enable': self.change_status_to_bool(firewall_info.get_child_content('is-enabled'), to_zapi=False), + 'logging': self.change_status_to_bool(firewall_info.get_child_content('is-logging'), to_zapi=False)} + return None - def check_policy(self, policy): + def modify_firewall_config(self, modify): """ - Check to see if a policy has been changed or not - :param policy: policy to check - :return: True if the policy has changed, False if there are no changes - """ - changed = False - attributes_list = policy.get_child_by_name('attributes-list') - policy_info = attributes_list.get_child_by_name('net-firewall-policy-info') - allow_list = policy_info.get_child_by_name('allow-list') - for each in allow_list.get_children(): - if each.get_content() not in self.parameters['allow_list']: - changed = True - if self.parameters['service'] != policy_info.get_child_by_name('service').get_content(): - changed = True - if self.parameters['policy'] != policy_info.get_child_by_name('policy').get_content(): - changed = True - return changed - - def modify_firewall_config(self): - """ - Modify the configuration of a firewall - :return: none + Modify the configuration of a firewall on node + :return: None """ net_firewall_config_obj = netapp_utils.zapi.NaElement("net-firewall-config-modify") net_firewall_config_obj.add_new_child('node-name', self.parameters['node']) - net_firewall_config_obj.add_new_child('is-enabled', self.parameters['enable']) - net_firewall_config_obj.add_new_child('is-logging', self.parameters['logging']) + if modify.get('enable'): + net_firewall_config_obj.add_new_child('is-enabled', self.change_status_to_bool(self.parameters['enable'])) + if modify.get('logging'): + net_firewall_config_obj.add_new_child('is-logging', self.change_status_to_bool(self.parameters['logging'])) try: self.server.invoke_successfully(net_firewall_config_obj, enable_tunneling=True) except netapp_utils.zapi.NaApiError as error: - self.module.fail_json(msg="Error modifying Firewall Config: %s" % (to_native(error)), exception=traceback.format_exc()) + self.module.fail_json(msg="Error modifying Firewall Config: %s" % (to_native(error)), + exception=traceback.format_exc()) - def check_config(self, config): - """ - check to see if a firewall configuration has changed or not - :param config: The configuration to check - :return: true if it has changed, false if it has not - """ - changed = False - attributes_list = config.get_child_by_name('attributes') - firewall_info = attributes_list.get_child_by_name('net-firewall-config-info') - enable = firewall_info.get_child_by_name('is-enabled') - logging = firewall_info.get_child_by_name('is-logging') - if self.parameters['enable'] == 'enable': - is_enable = "true" + def change_status_to_bool(self, input, to_zapi=True): + if to_zapi: + return 'true' if input == 'enable' else 'false' else: - is_enable = "false" - if enable != is_enable: - changed = True - if self.parameters['logging'] == 'logging': - is_logging = "true" - else: - is_logging = "false" - if logging != is_logging: - changed = True - return changed + return 'enable' if input == 'true' else 'disable' - def apply(self): + def autosupport_log(self): results = netapp_utils.get_cserver(self.server) cserver = netapp_utils.setup_na_ontap_zapi(module=self.module, vserver=results) netapp_utils.ems_log_event("na_ontap_firewall_policy", cserver) - changed = False - if self.parameters['state'] == 'present': - policy = self.get_firewall_policy() - if not policy: - self.create_firewall_policy() - if not self.check_config(self.get_firewall_config()): - self.modify_firewall_config() - changed = True + + def apply(self): + self.autosupport_log() + cd_action, modify, modify_config = None, None, None + if self.parameters.get('policy'): + current = self.get_firewall_policy() + cd_action = self.na_helper.get_cd_action(current, self.parameters) + if cd_action is None and self.parameters['state'] == 'present': + modify = self.na_helper.get_modified_attributes(current, self.parameters) + if self.parameters.get('node'): + current_config = self.get_firewall_config_for_node() + # firewall config for a node is always present, we cannot create or delete a firewall on a node + modify_config = self.na_helper.get_modified_attributes(current_config, self.parameters) + + if self.na_helper.changed: + if self.module.check_mode: + pass else: - if self.check_policy(policy): - self.modify_firewall_policy() - changed = True - if not self.check_config(self.get_firewall_config()): - self.modify_firewall_config() - changed = True - else: - if self.get_firewall_policy(): - self.destroy_firewall_policy() - if not self.check_config(self.get_firewall_config()): - self.modify_firewall_config() - changed = True - else: - if not self.check_config(self.get_firewall_config()): - self.modify_firewall_config() - changed = True - self.module.exit_json(changed=changed) + if cd_action == 'create': + self.create_firewall_policy() + elif cd_action == 'delete': + self.destroy_firewall_policy() + else: + if modify: + self.modify_firewall_policy(modify) + if modify_config: + self.modify_firewall_config(modify_config) + self.module.exit_json(changed=self.na_helper.changed) def main(): diff --git a/test/units/modules/storage/netapp/test_na_ontap_firewall_policy.py b/test/units/modules/storage/netapp/test_na_ontap_firewall_policy.py new file mode 100644 index 00000000000..9d78c4fabba --- /dev/null +++ b/test/units/modules/storage/netapp/test_na_ontap_firewall_policy.py @@ -0,0 +1,286 @@ +# (c) 2018, NetApp, Inc +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +''' unit test template for ONTAP Ansible module ''' + +from __future__ import print_function +import json +import pytest + +from units.compat import unittest +from units.compat.mock import patch, Mock +from ansible.module_utils import basic +from ansible.module_utils._text import to_bytes +import ansible.module_utils.netapp as netapp_utils + +from ansible.modules.storage.netapp.na_ontap_firewall_policy \ + import NetAppONTAPFirewallPolicy as fp_module # module under test + +if not netapp_utils.has_netapp_lib(): + pytestmark = pytest.mark.skip('skipping as missing required netapp_lib') + + +def set_module_args(args): + """prepare arguments so that they will be picked up during module creation""" + args = json.dumps({'ANSIBLE_MODULE_ARGS': args}) + basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access + + +class AnsibleExitJson(Exception): + """Exception class to be raised by module.exit_json and caught by the test case""" + pass + + +class AnsibleFailJson(Exception): + """Exception class to be raised by module.fail_json and caught by the test case""" + pass + + +def exit_json(*args, **kwargs): # pylint: disable=unused-argument + """function to patch over exit_json; package return data into an exception""" + if 'changed' not in kwargs: + kwargs['changed'] = False + raise AnsibleExitJson(kwargs) + + +def fail_json(*args, **kwargs): # pylint: disable=unused-argument + """function to patch over fail_json; package return data into an exception""" + kwargs['failed'] = True + raise AnsibleFailJson(kwargs) + + +class MockONTAPConnection(object): + ''' mock server connection to ONTAP host ''' + + def __init__(self, kind=None, data=None): + ''' save arguments ''' + self.kind = kind + self.data = data + self.xml_in = None + self.xml_out = None + + def invoke_successfully(self, xml, enable_tunneling): # pylint: disable=unused-argument + ''' mock invoke_successfully returning xml data ''' + self.xml_in = xml + if self.kind == 'policy': + xml = self.build_policy_info(self.data) + if self.kind == 'config': + xml = self.build_firewall_config_info(self.data) + self.xml_out = xml + return xml + + @staticmethod + def build_policy_info(data): + ''' build xml data for net-firewall-policy-info ''' + xml = netapp_utils.zapi.NaElement('xml') + attributes = { + 'num-records': 1, + 'attributes-list': { + 'net-firewall-policy-info': { + 'policy': data['policy'], + 'service': data['service'], + 'allow-list': [ + {'ip-and-mask': '1.2.3.0/24'} + ] + } + } + } + + xml.translate_struct(attributes) + return xml + + @staticmethod + def build_firewall_config_info(data): + ''' build xml data for net-firewall-config-info ''' + xml = netapp_utils.zapi.NaElement('xml') + attributes = { + 'attributes': { + 'net-firewall-config-info': { + 'is-enabled': 'true', + 'is-logging': 'false' + } + } + } + xml.translate_struct(attributes) + return xml + + +class TestMyModule(unittest.TestCase): + ''' a group of related Unit Tests ''' + + def setUp(self): + self.mock_module_helper = patch.multiple(basic.AnsibleModule, + exit_json=exit_json, + fail_json=fail_json) + self.mock_module_helper.start() + self.addCleanup(self.mock_module_helper.stop) + self.mock_policy = { + 'policy': 'test', + 'service': 'http', + 'vserver': 'my_vserver', + 'allow_list': '1.2.3.0/24' + } + self.mock_config = { + 'node': 'test', + 'enable': 'enable', + 'logging': 'enable' + } + + def mock_policy_args(self): + return { + 'policy': self.mock_policy['policy'], + 'service': self.mock_policy['service'], + 'vserver': self.mock_policy['vserver'], + 'allow_list': [self.mock_policy['allow_list']], + 'hostname': 'test', + 'username': 'test_user', + 'password': 'test_pass!' + } + + def mock_config_args(self): + return { + 'node': self.mock_config['node'], + 'enable': self.mock_config['enable'], + 'logging': self.mock_config['logging'], + 'hostname': 'test', + 'username': 'test_user', + 'password': 'test_pass!' + } + + def get_mock_object(self, kind=None): + """ + Helper method to return an na_ontap_firewall_policy object + :param kind: passes this param to MockONTAPConnection() + :return: na_ontap_firewall_policy object + """ + obj = fp_module() + obj.autosupport_log = Mock(return_value=None) + if kind is None: + obj.server = MockONTAPConnection() + else: + mock_data = self.mock_config if kind == 'config' else self.mock_policy + obj.server = MockONTAPConnection(kind=kind, data=mock_data) + return obj + + def test_module_fail_when_required_args_missing(self): + ''' required arguments are reported as errors ''' + with pytest.raises(AnsibleFailJson) as exc: + set_module_args({}) + fp_module() + print('Info: %s' % exc.value.args[0]['msg']) + + def test_helper_firewall_policy_attributes(self): + ''' helper returns dictionary with vserver, service and policy details ''' + data = self.mock_policy + set_module_args(self.mock_policy_args()) + result = self.get_mock_object('policy').firewall_policy_attributes() + del data['allow_list'] + assert data == result + + def test_helper_validate_ip_addresses_positive(self): + ''' test if helper validates if IP is a network address ''' + data = self.mock_policy_args() + data['allow_list'] = ['1.2.0.0/16', '1.2.3.0/24'] + set_module_args(data) + result = self.get_mock_object().validate_ip_addresses() + assert result is None + + def test_helper_validate_ip_addresses_negative(self): + ''' test if helper validates if IP is a network address ''' + data = self.mock_policy_args() + data['allow_list'] = ['1.2.0.10/16', '1.2.3.0/24'] + set_module_args(data) + with pytest.raises(AnsibleFailJson) as exc: + self.get_mock_object().validate_ip_addresses() + msg = 'Error: Invalid IP address value for allow_list parameter.' \ + 'Please specify a network address without host bits set: ' \ + '1.2.0.10/16 has host bits set' + assert exc.value.args[0]['msg'] == msg + + def test_get_nonexistent_policy(self): + ''' Test if get_firewall_policy returns None for non-existent policy ''' + set_module_args(self.mock_policy_args()) + result = self.get_mock_object().get_firewall_policy() + assert result is None + + def test_get_existing_policy(self): + ''' Test if get_firewall_policy returns policy details for existing policy ''' + data = self.mock_policy_args() + set_module_args(data) + result = self.get_mock_object('policy').get_firewall_policy() + assert result['service'] == data['service'] + assert result['allow_list'] == ['1.2.3.0/24'] # from build_policy_info() + + def test_successful_create(self): + ''' Test successful create ''' + set_module_args(self.mock_policy_args()) + with pytest.raises(AnsibleExitJson) as exc: + self.get_mock_object().apply() + assert exc.value.args[0]['changed'] + + def test_create_idempotency(self): + ''' Test create idempotency ''' + set_module_args(self.mock_policy_args()) + with pytest.raises(AnsibleExitJson) as exc: + self.get_mock_object('policy').apply() + assert not exc.value.args[0]['changed'] + + def test_successful_delete(self): + ''' Test delete existing job ''' + data = self.mock_policy_args() + data['state'] = 'absent' + set_module_args(data) + with pytest.raises(AnsibleExitJson) as exc: + self.get_mock_object('policy').apply() + assert exc.value.args[0]['changed'] + + def test_delete_idempotency(self): + ''' Test delete idempotency ''' + data = self.mock_policy_args() + data['state'] = 'absent' + set_module_args(data) + with pytest.raises(AnsibleExitJson) as exc: + self.get_mock_object().apply() + assert not exc.value.args[0]['changed'] + + def test_successful_modify(self): + ''' Test successful modify allow_list ''' + data = self.mock_policy_args() + data['allow_list'] = ['1.2.0.0/16'] + set_module_args(data) + with pytest.raises(AnsibleExitJson) as exc: + self.get_mock_object('policy').apply() + assert exc.value.args[0]['changed'] + + def test_successful_modify_mutiple_ips(self): + ''' Test successful modify allow_list ''' + data = self.mock_policy_args() + data['allow_list'] = ['1.2.0.0/16', '1.0.0.0/8'] + set_module_args(data) + with pytest.raises(AnsibleExitJson) as exc: + self.get_mock_object('policy').apply() + assert exc.value.args[0]['changed'] + + def test_get_nonexistent_config(self): + ''' Test if get_firewall_config returns None for non-existent node ''' + set_module_args(self.mock_config_args()) + result = self.get_mock_object().get_firewall_config_for_node() + assert result is None + + def test_get_existing_config(self): + ''' Test if get_firewall_config returns policy details for existing node ''' + data = self.mock_config_args() + set_module_args(data) + result = self.get_mock_object('config').get_firewall_config_for_node() + assert result['enable'] == 'enable' # from build_config_info() + assert result['logging'] == 'disable' # from build_config_info() + + def test_successful_modify_config(self): + ''' Test successful modify allow_list ''' + data = self.mock_config_args() + data['enable'] = 'disable' + data['logging'] = 'enable' + set_module_args(data) + with pytest.raises(AnsibleExitJson) as exc: + self.get_mock_object('config').apply() + assert exc.value.args[0]['changed']