Resource module for IOS ACL (#66629)

* fix ios_acl
This commit is contained in:
Sumit Jaiswal 2020-02-28 22:06:38 +05:30 committed by GitHub
parent 32a76f0aa1
commit 9392912608
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 4674 additions and 5 deletions

View file

@ -0,0 +1,593 @@
#
# -*- coding: utf-8 -*-
# Copyright 2019 Red Hat
# GNU General Public License v3.0+
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
#############################################
# WARNING #
#############################################
#
# This file is auto generated by the resource
# module builder playbook.
#
# Do not edit this file manually.
#
# Changes to this file will be over written
# by the resource module builder.
#
# Changes should be made in the model used to
# generate this file or in the resource module
# builder template.
#
#############################################
"""
The arg spec for the ios_acls module
"""
from __future__ import absolute_import, division, print_function
__metaclass__ = type
class AclsArgs(object):
"""The arg spec for the ios_acls module
"""
def __init__(self, **kwargs):
pass
argument_spec = {
'config': {
'elements': 'dict',
'options': {
'afi': {
'required': True,
'choices': ['ipv4', 'ipv6'],
'type': 'str'
},
'acls': {
'elements': 'dict',
'type': 'list',
'options': {
'name': {
'required': True,
'type': 'str'
},
'acl_type': {
'choices': ['extended', 'standard'],
'type': 'str'
},
'aces': {
'elements': 'dict',
'type': 'list',
'options': {
'grant': {
'choices': ['permit', 'deny'],
'type': 'str'
},
'sequence': {
'type': 'int'
},
'source': {
'type':
'dict',
'mutually_exclusive':
[['address', 'any', 'host'],
['wildcard_bits', 'any', 'host']],
'options': {
'address': {
'type': 'str'
},
'wildcard_bits': {
'type': 'str'
},
'any': {
'type': 'bool'
},
'host': {
'type': 'str'
},
'port_protocol': {
'type': 'dict',
'options': {
'eq': {
'type': 'str'
},
'gt': {
'type': 'str'
},
'lt': {
'type': 'str'
},
'neq': {
'type': 'str'
},
'range': {
'type': 'dict',
'options': {
'start': {
'type': 'int'
},
'end': {
'type': 'int'
}
}
}
}
}
},
},
'destination': {
'type':
'dict',
'mutually_exclusive':
[['address', 'any', 'host'],
['wildcard_bits', 'any', 'host']],
'options': {
'address': {
'type': 'str'
},
'wildcard_bits': {
'type': 'str'
},
'any': {
'type': 'bool'
},
'host': {
'type': 'str'
},
'port_protocol': {
'type': 'dict',
'options': {
'eq': {
'type': 'str'
},
'gt': {
'type': 'str'
},
'lt': {
'type': 'str'
},
'neq': {
'type': 'str'
},
'range': {
'type': 'dict',
'options': {
'start': {
'type': 'int'
},
'end': {
'type': 'int'
}
}
}
}
}
}
},
'protocol': {
'type': 'str'
},
'protocol_options': {
'type': 'dict',
'options': {
'protocol_number': {
'type': 'int'
},
'ahp': {
'type': 'bool'
},
'eigrp': {
'type': 'bool'
},
'esp': {
'type': 'bool'
},
'gre': {
'type': 'bool'
},
'hbh': {
'type': 'bool'
},
'icmp': {
'type': 'dict',
'options': {
'administratively_prohibited':
{
'type': 'bool'
},
'alternate_address': {
'type': 'bool'
},
'conversion_error': {
'type': 'bool'
},
'dod_host_prohibited': {
'type': 'bool'
},
'dod_net_prohibited': {
'type': 'bool'
},
'echo': {
'type': 'bool'
},
'echo_reply': {
'type': 'bool'
},
'general_parameter_problem': {
'type': 'bool'
},
'host_isolated': {
'type': 'bool'
},
'host_precedence_unreachable':
{
'type': 'bool'
},
'host_redirect': {
'type': 'bool'
},
'host_tos_redirect': {
'type': 'bool'
},
'host_tos_unreachable': {
'type': 'bool'
},
'host_unknown': {
'type': 'bool'
},
'host_unreachable': {
'type': 'bool'
},
'information_reply': {
'type': 'bool'
},
'information_request': {
'type': 'bool'
},
'mask_reply': {
'type': 'bool'
},
'mask_request': {
'type': 'bool'
},
'mobile_redirect': {
'type': 'bool'
},
'net_redirect': {
'type': 'bool'
},
'net_tos_redirect': {
'type': 'bool'
},
'net_tos_unreachable': {
'type': 'bool'
},
'net_unreachable': {
'type': 'bool'
},
'network_unknown': {
'type': 'bool'
},
'no_room_for_option': {
'type': 'bool'
},
'option_missing': {
'type': 'bool'
},
'packet_too_big': {
'type': 'bool'
},
'parameter_problem': {
'type': 'bool'
},
'port_unreachable': {
'type': 'bool'
},
'precedence_unreachable': {
'type': 'bool'
},
'protocol_unreachable': {
'type': 'bool'
},
'reassembly_timeout': {
'type': 'bool'
},
'redirect': {
'type': 'bool'
},
'router_advertisement': {
'type': 'bool'
},
'router_solicitation': {
'type': 'bool'
},
'source_quench': {
'type': 'bool'
},
'source_route_failed': {
'type': 'bool'
},
'time_exceeded': {
'type': 'bool'
},
'timestamp_reply': {
'type': 'bool'
},
'timestamp_request': {
'type': 'bool'
},
'traceroute': {
'type': 'bool'
},
'ttl_exceeded': {
'type': 'bool'
},
'unreachable': {
'type': 'bool'
},
}
},
'igmp': {
'type': 'dict',
'options': {
'dvmrp': {
'type': 'bool'
},
'host_query': {
'type': 'bool'
},
'mtrace_resp': {
'type': 'bool'
},
'mtrace_route': {
'type': 'bool'
},
'pim': {
'type': 'bool'
},
'trace': {
'type': 'bool'
},
'v1host_report': {
'type': 'bool'
},
'v2host_report': {
'type': 'bool'
},
'v2leave_group': {
'type': 'bool'
},
'v3host_report': {
'type': 'bool'
}
}
},
'ip': {
'type': 'bool'
},
'ipv6': {
'type': 'bool'
},
'ipinip': {
'type': 'bool'
},
'nos': {
'type': 'bool'
},
'ospf': {
'type': 'bool'
},
'pcp': {
'type': 'bool'
},
'pim': {
'type': 'bool'
},
'sctp': {
'type': 'bool'
},
'tcp': {
'options': {
'ack': {
'type': 'bool'
},
'established': {
'type': 'bool'
},
'fin': {
'type': 'bool'
},
'psh': {
'type': 'bool'
},
'rst': {
'type': 'bool'
},
'syn': {
'type': 'bool'
},
'urg': {
'type': 'bool'
}
},
'type': 'dict'
},
'udp': {
'type': 'bool'
}
}
},
'dscp': {
'type': 'str'
},
'fragments': {
'type': 'str'
},
'log': {
'type': 'str'
},
'log_input': {
'type': 'str'
},
'option': {
'type': 'dict',
'options': {
'add_ext': {
'type': 'bool'
},
'any_options': {
'type': 'bool'
},
'com_security': {
'type': 'bool'
},
'dps': {
'type': 'bool'
},
'encode': {
'type': 'bool'
},
'eool': {
'type': 'bool'
},
'ext_ip': {
'type': 'bool'
},
'ext_security': {
'type': 'bool'
},
'finn': {
'type': 'bool'
},
'imitd': {
'type': 'bool'
},
'lsr': {
'type': 'bool'
},
'mtup': {
'type': 'bool'
},
'mtur': {
'type': 'bool'
},
'no_op': {
'type': 'bool'
},
'nsapa': {
'type': 'bool'
},
'record_route': {
'type': 'bool'
},
'router_alert': {
'type': 'bool'
},
'sdb': {
'type': 'bool'
},
'security': {
'type': 'bool'
},
'ssr': {
'type': 'bool'
},
'stream_id': {
'type': 'bool'
},
'timestamp': {
'type': 'bool'
},
'traceroute': {
'type': 'bool'
},
'ump': {
'type': 'bool'
},
'visa': {
'type': 'bool'
},
'zsu': {
'type': 'bool'
}
}
},
'precedence': {
'type': 'int'
},
'time_range': {
'type': 'str'
},
'tos': {
'type': 'dict',
'options': {
'service_value': {
'type': 'int'
},
'max_reliability': {
'type': 'bool'
},
'max_throughput': {
'type': 'bool'
},
'min_delay': {
'type': 'bool'
},
'min_monetary_cost': {
'type': 'bool'
},
'normal': {
'type': 'bool'
}
}
},
'ttl': {
'type': 'dict',
'options': {
'eq': {
'type': 'int'
},
'gt': {
'type': 'int'
},
'lt': {
'type': 'int'
},
'neq': {
'type': 'int'
},
'range': {
'type': 'dict',
'options': {
'start': {
'type': 'int'
},
'end': {
'type': 'int'
}
}
}
}
},
}
}
}
},
},
'type': 'list'
},
'running_config': {
'type': 'str'
},
'state': {
'choices': [
'merged', 'replaced', 'overridden', 'deleted', 'gathered',
'rendered', 'parsed'
],
'default':
'merged',
'type':
'str'
}
}

View file

@ -0,0 +1,717 @@
#
# -*- coding: utf-8 -*-
# Copyright 2019 Red Hat Inc.
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
"""
The ios_acls class
It is in this file where the current configuration (as dict)
is compared to the provided configuration (as dict) and the command set
necessary to bring the current configuration to it's desired end-state is
created
"""
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import copy
from ansible.module_utils.network.common.cfg.base import ConfigBase
from ansible.module_utils.network.common.utils import to_list
from ansible.module_utils.network.ios.facts.facts import Facts
from ansible.module_utils.six import iteritems
from ansible.module_utils.network.common.utils import remove_empties
from ansible.module_utils.network.ios.utils.utils import new_dict_to_set
class Acls(ConfigBase):
"""
The ios_acls class
"""
gather_subset = [
'!all',
'!min',
]
gather_network_resources = [
'acls',
]
def __init__(self, module):
super(Acls, self).__init__(module)
def get_acl_facts(self, data=None):
""" Get the 'facts' (the current configuration)
:rtype: A dictionary
:returns: The current configuration as a dictionary
"""
facts, _warnings = Facts(self._module).get_facts(self.gather_subset, self.gather_network_resources, data=data)
acl_facts = facts['ansible_network_resources'].get('acls')
if not acl_facts:
return []
return acl_facts
def execute_module(self):
""" Execute the module
:rtype: A dictionary
:returns: The result from moduel execution
"""
result = {'changed': False}
commands = list()
warnings = list()
if self.state in self.ACTION_STATES:
existing_acl_facts = self.get_acl_facts()
else:
existing_acl_facts = []
if self.state in self.ACTION_STATES or self.state == 'rendered':
commands.extend(self.set_config(existing_acl_facts))
if commands and self.state in self.ACTION_STATES:
if not self._module.check_mode:
self._connection.edit_config(commands)
result['changed'] = True
if self.state in self.ACTION_STATES:
result['commands'] = commands
if self.state in self.ACTION_STATES or self.state == 'gathered':
changed_acl_facts = self.get_acl_facts()
elif self.state == 'rendered':
result['rendered'] = commands
elif self.state == 'parsed':
running_config = self._module.params['running_config']
if not running_config:
self._module.fail_json(msg="value of running_config parameter must not be empty for state parsed")
result['parsed'] = self.get_acl_facts(data=running_config)
else:
changed_acl_facts = []
if self.state in self.ACTION_STATES:
result['before'] = existing_acl_facts
if result['changed']:
result['after'] = changed_acl_facts
elif self.state == 'gathered':
result['gathered'] = changed_acl_facts
result['warnings'] = warnings
return result
def set_config(self, existing_acl_facts):
""" Collect the configuration from the args passed to the module,
collect the current configuration (as a dict from facts)
:rtype: A list
:returns: the commands necessary to migrate the current configuration
to the deisred configuration
"""
want = self._module.params['config']
have = existing_acl_facts
resp = self.set_state(want, have)
return to_list(resp)
def set_state(self, want, have):
""" Select the appropriate function based on the state provided
:param want: the desired configuration as a dictionary
:param have: the current configuration as a dictionary
:rtype: A list
:returns: the commands necessary to migrate the current configuration
to the deisred configuration
"""
commands = []
state = self._module.params['state']
if state in ('overridden', 'merged', 'replaced', 'rendered') and not want:
self._module.fail_json(msg='value of config parameter must not be empty for state {0}'.format(state))
if state == 'overridden':
commands = self._state_overridden(want, have)
elif state == 'deleted':
commands = self._state_deleted(want, have)
elif state == 'merged' or state == 'rendered':
commands = self._state_merged(want, have)
elif state == 'replaced':
commands = self._state_replaced(want, have)
return commands
def _state_replaced(self, want, have):
""" The command generator when state is replaced
:param want: the desired configuration as a dictionary
:param have: the current configuration as a dictionary
:rtype: A list
:returns: the commands necessary to migrate the current configuration
to the deisred configuration
"""
commands = []
for config_want in want:
for acls_want in config_want.get('acls'):
for ace_want in acls_want.get('aces'):
check = False
for config_have in have:
for acls_have in config_have.get('acls'):
for ace_have in acls_have.get('aces'):
if acls_want.get('name') == acls_have.get('name'):
ace_want = remove_empties(ace_want)
acls_want = remove_empties(acls_want)
cmd, change = self._set_config(ace_want,
ace_have,
acls_want,
config_want['afi'])
if cmd:
for temp_acls_have in config_have.get('acls'):
for temp_ace_have in temp_acls_have.get('aces'):
if acls_want.get('name') == temp_acls_have.get('name'):
commands.extend(
self._clear_config(temp_acls_have,
config_have,
temp_ace_have.get('sequence')))
commands.extend(cmd)
check = True
if check:
break
if check:
break
if not check:
# For configuring any non-existing want config
ace_want = remove_empties(ace_want)
cmd, change = self._set_config(ace_want,
{},
acls_want,
config_want['afi'])
commands.extend(cmd)
# Split and arrange the config commands
commands = self.split_set_cmd(commands)
return commands
def _state_overridden(self, want, have):
""" The command generator when state is overridden
:param want: the desired configuration as a dictionary
:param have: the current configuration as a dictionary
:rtype: A list
:returns: the commands necessary to migrate the current configuration
to the desired configuration
"""
commands = []
# Creating a copy of want, so that want dict is intact even after delete operation
# performed during override want n have comparison
temp_want = copy.deepcopy(want)
for config_have in have:
for acls_have in config_have.get('acls'):
for ace_have in acls_have.get('aces'):
check = False
for config_want in temp_want:
count = 0
for acls_want in config_want.get('acls'):
for ace_want in acls_want.get('aces'):
if acls_want.get('name') == acls_have.get('name'):
ace_want = remove_empties(ace_want)
acls_want = remove_empties(acls_want)
cmd, change = self._set_config(ace_want, ace_have, acls_want, config_want['afi'])
if cmd:
for temp_acls_have in config_have.get('acls'):
for temp_ace_have in temp_acls_have.get('aces'):
if acls_want.get('name') == temp_acls_have.get('name'):
commands.extend(
self._clear_config(temp_acls_have,
config_have,
temp_ace_have.get('sequence')))
commands.extend(cmd)
check = True
if check:
del config_want.get('acls')[count]
else:
count += 1
if check:
break
if check:
break
if not check:
# Delete the config not present in want config
commands.extend(self._clear_config(acls_have, config_have))
# For configuring any non-existing want config
for config_want in temp_want:
for acls_want in config_want.get('acls'):
for ace_want in acls_want.get('aces'):
ace_want = remove_empties(ace_want)
cmd, change = self._set_config(ace_want,
{},
acls_want,
config_want['afi'])
commands.extend(cmd)
# Split and arrange the config commands
commands = self.split_set_cmd(commands)
# Arranging the cmds suct that all delete cmds are fired before all set cmds
negate_commands = [each for each in commands if 'no' in each and 'access-list' in each]
negate_commands.extend([each for each in commands if each not in negate_commands])
commands = negate_commands
return commands
def _state_merged(self, want, have):
""" The command generator when state is merged
:param want: the additive configuration as a dictionary
:param have: the current configuration as a dictionary
:rtype: A list
:returns: the commands necessary to merge the provided into
the current configuration
"""
commands = []
for config_want in want:
for acls_want in config_want.get('acls'):
for ace_want in acls_want.get('aces'):
check = False
for config_have in have:
for acls_have in config_have.get('acls'):
for ace_have in acls_have.get('aces'):
if acls_want.get('name') == acls_have.get('name') and \
ace_want.get('sequence') == ace_have.get('sequence'):
ace_want = remove_empties(ace_want)
cmd, change = self._set_config(ace_want,
ace_have,
acls_want,
config_want['afi'])
# clear config will be fired only when there's command wrt to config
if config_want.get('afi') == 'ipv4' and change:
# for ipv4 only inplace update cannot be done, so deleting the sequence ace
# and then updating the want ace changes
commands.extend(self._clear_config(acls_want,
config_want,
ace_want.get('sequence')))
commands.extend(cmd)
check = True
elif acls_want.get('name') == acls_have.get('name'):
ace_want = remove_empties(ace_want)
cmd, check = self.common_condition_check(ace_want,
ace_have,
acls_want,
config_want,
check,
acls_have)
if acls_have.get('acl_type') == 'standard':
check = True
commands.extend(cmd)
if check:
break
if check:
break
if not check:
# For configuring any non-existing want config
ace_want = remove_empties(ace_want)
cmd, change = self._set_config(ace_want,
{},
acls_want,
config_want['afi'])
commands.extend(cmd)
# Split and arrange the config commands
commands = self.split_set_cmd(commands)
return commands
def _state_deleted(self, want, have):
""" The command generator when state is deleted
:param want: the objects from which the configuration should be removed
:param have: the current configuration as a dictionary
:rtype: A list
:returns: the commands necessary to remove the current configuration
of the provided objects
"""
commands = []
if want:
for config_want in want:
if config_want.get('acls'):
for acls_want in config_want.get('acls'):
if acls_want.get('aces'):
for ace_want in acls_want.get('aces'):
for config_have in have:
for acls_have in config_have.get('acls'):
if acls_want.get('name') == acls_have.get('name'):
if ace_want.get('sequence'):
commands.extend(self._clear_config(acls_want,
config_want,
ace_want.get('sequence')))
else:
commands.extend(self._clear_config(acls_want,
config_want))
else:
for config_have in have:
for acls_have in config_have.get('acls'):
if acls_want.get('name') == acls_have.get('name'):
commands.extend(self._clear_config(acls_want,
config_want))
else:
afi_want = config_want.get('afi')
for config_have in have:
if config_have.get('afi') == afi_want:
for acls_have in config_have.get('acls'):
commands.extend(self._clear_config(acls_have, config_want))
# Split and arrange the config commands
commands = self.split_set_cmd(commands)
else:
for config_have in have:
for acls_have in config_have.get('acls'):
commands.extend(self._clear_config(acls_have, config_have))
return commands
def common_condition_check(self, want, have, acls_want, config_want, check, state='', acls_have=None):
""" The command formatter from the generated command
:param want: want config
:param have: have config
:param acls_want: acls want config
:param config_want: want config list
:param check: for same acls in want and have config, check=True
:param state: operation state
:rtype: A list
:returns: commands generated from want n have config diff
"""
commands = []
if want.get('source') and want.get('destination') and have.get('source') and have.get('destination'):
if want.get('destination') and have.get('destination') or \
want.get('source').get('address') and have.get('source'):
if want.get('destination').get('address') == \
have.get('destination').get('address') and \
want.get('source').get('address') == \
have.get('source').get('address'):
cmd, change = self._set_config(want,
have,
acls_want,
config_want['afi'])
commands.extend(cmd)
check = True
if commands:
if state == 'replaced' or state == 'overridden':
commands.extend(self._clear_config(acls_want, config_want))
elif want.get('destination').get('any') == \
have.get('destination').get('any') and \
want.get('source').get('address') == \
have.get('source').get('address') and \
want.get('destination').get('any'):
cmd, change = self._set_config(want,
have,
acls_want,
config_want['afi'])
commands.extend(cmd)
check = True
if commands:
if state == 'replaced' or state == 'overridden':
commands.extend(self._clear_config(acls_want, config_want))
elif want.get('destination').get('address') == \
have.get('destination').get('address') and \
want.get('source').get('any') == have.get('source').get('any') and \
want.get('source').get('any'):
cmd, change = self._set_config(want,
have,
acls_want,
config_want['afi'])
commands.extend(cmd)
check = True
if commands:
if state == 'replaced' or state == 'overridden':
commands.extend(self._clear_config(acls_want, config_want))
elif want.get('destination').get('any') == \
have.get('destination').get('any') and \
want.get('source').get('any') == have.get('source').get('any') and \
want.get('destination').get('any'):
cmd, change = self._set_config(want,
have,
acls_want,
config_want['afi'])
commands.extend(cmd)
check = True
if commands:
if state == 'replaced' or state == 'overridden':
commands.extend(self._clear_config(acls_want, config_want))
elif acls_have and acls_have.get('acl_type') == 'standard':
check = True
if want.get('source') == have.get('source'):
cmd, change = self._set_config(want,
have,
acls_want,
config_want['afi'])
commands.extend(cmd)
return commands, check
def split_set_cmd(self, cmds):
""" The command formatter from the generated command
:param cmds: generated command
:rtype: A list
:returns: the formatted commands which is compliant and
actually fired on the device
"""
command = []
def common_code(access_grant, cmd, command):
cmd = cmd.split(access_grant)
access_list = cmd[0].strip(' ')
if access_list not in command:
command.append(access_list)
command_items = len(command)
# get the last index of the list and push the trimmed cmd at the end of list
index = command.index(access_list) + (command_items - command.index(access_list))
cmd = access_grant + cmd[1]
command.insert(index + 1, cmd)
def sequence_common_code(sequence_index, each_list, command):
# Command to split
def join_list_to_str(temp_list, cmd=''):
for item in temp_list:
cmd += item
cmd += ' '
return cmd
temp_list = each_list[:sequence_index]
cmd = join_list_to_str(temp_list).rstrip(' ')
if cmd not in command:
command.append(cmd)
temp_list = each_list[sequence_index:]
cmd = join_list_to_str(temp_list).rstrip(' ')
command.append(cmd)
def grant_common_code(cmd_list, grant_type, command):
index = cmd_list.index(grant_type)
if 'extended' in each_list:
if cmd_list.index('extended') == (index - 2):
common_code(grant_type, each, command)
else:
sequence_common_code((index - 1), each_list, command)
elif 'standard' in each_list:
if cmd_list.index('standard') == (index - 2):
common_code(grant_type, each, command)
else:
sequence_common_code((index - 1), each_list, command)
elif 'ipv6' in each_list:
if 'sequence' in each_list:
sequence_index = each_list.index('sequence')
sequence_common_code(sequence_index, each_list, command)
else:
common_code(grant_type, each, command)
return command
for each in cmds:
each_list = each.split(' ')
if 'no' in each:
if each_list.index('no') == 0:
command.append(each)
else:
common_code('no', each, command)
if 'deny' in each:
grant_common_code(each_list, 'deny', command)
if 'permit' in each:
grant_common_code(each_list, 'permit', command)
return command
def source_dest_config(self, config, cmd, protocol_option):
""" Function to populate source/destination address and port protocol options
:param config: want and have diff config
:param cmd: source/destination command
:param protocol_option: source/destination protocol option
:rtype: A list
:returns: the commands generated based on input source/destination params
"""
if 'ipv6' in cmd:
address = config.get('address')
host = config.get('host')
if (address and '::' not in address) or (host and '::' not in host):
self._module.fail_json(msg='Incorrect IPV6 address!')
else:
address = config.get('address')
wildcard = config.get('wildcard_bits')
host = config.get('host')
any = config.get('any')
if 'standard' in cmd and address and not wildcard:
cmd = cmd + ' {0}'.format(address)
elif address and wildcard:
cmd = cmd + ' {0} {1}'.format(address, wildcard)
elif host:
cmd = cmd + ' host {0}'.format(host)
if any:
cmd = cmd + ' {0}'.format('any')
port_protocol = config.get('port_protocol')
if port_protocol and (protocol_option.get('tcp') or protocol_option.get('udp')):
cmd = cmd + ' {0} {1}'.format(list(port_protocol)[0], list(port_protocol.values())[0])
elif port_protocol and not (protocol_option.get('tcp') or protocol_option.get('udp')):
self._module.fail_json(msg='Port Protocol option is valid only with TCP/UDP Protocol option!')
return cmd
def _set_config(self, want, have, acl_want, afi):
""" Function that sets the acls config based on the want and have config
:param want: want config
:param have: have config
:param acl_want: want acls config
:param afi: acls afi type
:rtype: A list
:returns: the commands generated based on input want/have params
"""
commands = []
change = False
want_set = set()
have_set = set()
# Convert the want and have dict to its respective set for taking the set diff
new_dict_to_set(want, [], want_set)
new_dict_to_set(have, [], have_set)
diff = want_set - have_set
# Populate the config only when there's a diff b/w want and have config
if diff:
name = acl_want.get('name')
if afi == 'ipv4':
try:
name = int(name)
# If name is numbered acls
if name <= 99:
cmd = 'ip access-list standard {0}'.format(name)
elif name >= 100:
cmd = 'ip access-list extended {0}'.format(name)
except ValueError:
# If name is named acls
acl_type = acl_want.get('acl_type')
if acl_type:
cmd = 'ip access-list {0} {1}'.format(acl_type, name)
else:
self._module.fail_json(msg='ACL type value is required for Named ACL!')
elif afi == 'ipv6':
cmd = 'ipv6 access-list {0}'.format(name)
# Get all of aces option values from diff dict
sequence = want.get('sequence')
grant = want.get('grant')
source = want.get('source')
destination = want.get('destination')
po = want.get('protocol_options')
protocol = want.get('protocol')
dscp = want.get('dscp')
fragments = want.get('fragments')
log = want.get('log')
log_input = want.get('log_input')
option = want.get('option')
precedence = want.get('precedence')
time_range = want.get('time_range')
tos = want.get('tos')
ttl = want.get('ttl')
if sequence:
if afi == 'ipv6':
cmd = cmd + ' sequence {0}'.format(sequence)
else:
cmd = cmd + ' {0}'.format(sequence)
if grant:
cmd = cmd + ' {0}'.format(grant)
if po and isinstance(po, dict):
po_key = list(po)[0]
if protocol and protocol != po_key:
self._module.fail_json(msg='Protocol value cannot be different from Protocol option protocol value!')
cmd = cmd + ' {0}'.format(po_key)
if po.get('icmp'):
po_val = po.get('icmp')
elif po.get('igmp'):
po_val = po.get('igmp')
elif po.get('tcp'):
po_val = po.get('tcp')
elif protocol:
cmd = cmd + ' {0}'.format(protocol)
if source:
cmd = self.source_dest_config(source, cmd, po)
if destination:
cmd = self.source_dest_config(destination, cmd, po)
if po:
cmd = cmd + ' {0}'.format(list(po_val)[0])
if dscp:
cmd = cmd + ' dscp {0}'.format(dscp)
if fragments:
cmd = cmd + ' fragments {0}'.format(fragments)
if log:
cmd = cmd + ' log {0}'.format(log)
if log_input:
cmd = cmd + ' log-input {0}'.format(log_input)
if option:
cmd = cmd + ' option {0}'.format(list(option)[0])
if precedence:
cmd = cmd + ' precedence {0}'.format(precedence)
if time_range:
cmd = cmd + ' time-range {0}'.format(time_range)
if tos:
for k, v in iteritems(tos):
if k == 'service_value':
cmd = cmd + ' tos {0}'.format(v)
else:
cmd = cmd + ' tos {0}'.format(v)
if ttl:
for k, v in iteritems(ttl):
if k == 'range' and v:
start = v.get('start')
end = v.get('start')
cmd = cmd + ' ttl {0} {1}'.format(start, end)
elif v:
cmd = cmd + ' ttl {0} {1}'.format(k, v)
commands.append(cmd)
if commands:
change = True
return commands, change
def _clear_config(self, acls, config, sequence=''):
""" Function that deletes the acls config based on the want and have config
:param acls: acls config
:param config: config
:rtype: A list
:returns: the commands generated based on input acls/config params
"""
commands = []
afi = config.get('afi')
name = acls.get('name')
if afi == 'ipv4' and name:
try:
name = int(name)
if name <= 99 and not sequence:
cmd = 'no ip access-list standard {0}'.format(name)
elif name >= 100 and not sequence:
cmd = 'no ip access-list extended {0}'.format(name)
elif sequence:
if name <= 99:
cmd = 'ip access-list standard {0} '.format(name)
elif name >= 100:
cmd = 'ip access-list extended {0} '.format(name)
cmd += 'no {0}'.format(sequence)
except ValueError:
acl_type = acls.get('acl_type')
if acl_type == 'extended' and not sequence:
cmd = 'no ip access-list extended {0}'.format(name)
elif acl_type == 'standard' and not sequence:
cmd = 'no ip access-list standard {0}'.format(name)
elif sequence:
if acl_type == 'extended':
cmd = 'ip access-list extended {0} '.format(name)
elif acl_type == 'standard':
cmd = 'ip access-list standard {0}'.format(name)
cmd += 'no {0}'.format(sequence)
else:
self._module.fail_json(msg="ACL type value is required for Named ACL!")
elif afi == 'ipv6' and name:
if sequence:
cmd = 'no sequence {0}'.format(sequence)
else:
cmd = 'no ipv6 access-list {0}'.format(name)
commands.append(cmd)
return commands

View file

@ -0,0 +1,498 @@
#
# -*- coding: utf-8 -*-
# Copyright 2019 Red Hat
# GNU General Public License v3.0+
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
"""
The ios_acls fact class
It is in this file the configuration is collected from the device
for a given resource, parsed, and the facts tree is populated
based on the configuration.
"""
from __future__ import absolute_import, division, print_function
__metaclass__ = type
from copy import deepcopy
import re
from ansible.module_utils.network.common import utils
from ansible.module_utils.network.ios.utils.utils import check_n_return_valid_ipv6_addr
from ansible.module_utils.network.ios.argspec.acls.acls import AclsArgs
class AclsFacts(object):
""" The ios_acls fact class
"""
def __init__(self, module, subspec='config', options='options'):
self._module = module
self.argument_spec = AclsArgs.argument_spec
spec = deepcopy(self.argument_spec)
if subspec:
if options:
facts_argument_spec = spec[subspec][options]
else:
facts_argument_spec = spec[subspec]
else:
facts_argument_spec = spec
self.generated_spec = utils.generate_dict(facts_argument_spec)
def get_acl_data(self, connection):
# Get the access-lists from the ios router
return connection.get('sh access-list')
def populate_facts(self, connection, ansible_facts, data=None):
""" Populate the facts for acls
:param connection: the device connection
:param ansible_facts: Facts dictionary
:param data: previously collected conf
:rtype: dictionary
:returns: facts
"""
if not data:
data = self.get_acl_data(connection)
# operate on a collection of resource x
config = data.split('\n')
spec = {'acls': list(), 'afi': None}
if config:
objs = self.render_config(spec, config)
# check if rendered config list has only empty dict
if len(objs) == 1 and objs[0] == {}:
objs = []
facts = {}
if objs:
facts['acls'] = []
params = utils.validate_config(self.argument_spec, {'config': objs})
for cfg in params['config']:
facts['acls'].append(utils.remove_empties(cfg))
ansible_facts['ansible_network_resources'].update(facts)
return ansible_facts
def create_config_dict(self, config):
""" Function that parse the acls config and convert to module usable config
:param config: config
:rtype: A dict
:returns: the config generated based on have config params
"""
conf = {}
temp_list = []
access_list_name = ''
count = 0
if len(config) >= 1 and config[0] != '':
for each in config:
if 'access-list' in each:
temp = each.split('access-list ')[1].split(' ')[0]
if temp == 'extended' or temp == 'standard':
temp = each.split('access-list ')[1]
if not access_list_name:
access_list_name = temp
if 'access-list' not in each:
if 'extended' in temp or 'standard' in temp:
temp_list.append('ipv4 access-list ' + temp + each)
else:
temp_list.append('ipv6 access-list ' + temp + each)
if temp == access_list_name and 'access-list' in each and \
not ('extended' in access_list_name or 'standard' in access_list_name):
temp_list.append(each)
elif temp != access_list_name:
conf[access_list_name] = temp_list
temp_list = list()
if 'permit' in each or 'deny' in each:
temp_list.append(each)
access_list_name = temp
count += 1
if len(config) == count:
conf[access_list_name] = temp_list
temp_list = []
return conf
def populate_port_protocol(self, source, destination, each_list):
""" Function Populates port portocol wrt to source and destination
:param acls: source config
:param config: destination config
:param each_list: config
:rtype: A list
:returns: the commands generated based on source and destination params
"""
operators = ['eq', 'gt', 'lt', 'neq', 'range']
for item in operators:
if item in each_list:
index = each_list.index(item)
if source.get('address') or source.get('any') or source.get('host') and not source.get('port_protocol'):
try:
source_index = each_list.index(source.get('address'))
except ValueError:
try:
source_index = each_list.index('any')
except ValueError:
source_index = each_list.index('host')
if source.get('address'):
if (source_index + 2) == index and 'ipv6' not in each_list:
source['port_protocol'] = {item: each_list[index + 1]}
each_list.remove(item)
del each_list[index]
elif (source_index + 1) == index and 'ipv6' in each_list:
source['port_protocol'] = {item: each_list[index + 1]}
each_list.remove(item)
del each_list[source_index]
del each_list[index - 1]
elif source.get('any'):
if (source_index + 1) == index:
source['port_protocol'] = {item: each_list[index + 1]}
each_list.remove(item)
del each_list[index - 1]
del each_list[source_index]
elif source.get('host'):
if (source_index + 1) == index:
source['port_protocol'] = {item: each_list[index + 1]}
each_list.remove(item)
del each_list[index - 1]
del each_list[source_index]
if destination.get('address') or destination.get('any') or destination.get('host'):
try:
destination_index = each_list.index(destination.get('address'))
except ValueError:
try:
destination_index = each_list.index('any')
except ValueError:
destination_index = each_list.index('host') + 1
index -= 1
if (destination_index + 1) == index or (destination_index + 2) == index:
destination['port_protocol'] = {item: each_list[index + 1]}
each_list.remove(item)
del each_list[index]
break
if 'eq' in each_list or 'gt' in each_list or 'lt' in each_list or 'neq' in each_list or 'range' in each_list:
self.populate_port_protocol(source, destination, each_list)
def populate_source_destination(self, each, config, source, destination):
any = []
if 'any' in each:
any = re.findall('any', each)
if len(any) == 2:
source['any'] = True
destination['any'] = True
elif 'host' in each:
host = re.findall('host', each)
each = each.split(' ')
if len(host) == 2:
host_index = each.index('host')
source['host'] = each[host_index + 1]
del each[host_index]
host_index = each.index('host')
destination['host'] = each[host_index + 1]
else:
ip_n_wildcard_bits = re.findall(r'[0-9]+(?:\.[0-9]+){3}', each)
ip_index = None
if ip_n_wildcard_bits:
ip_index = each.index(ip_n_wildcard_bits[0])
host_index = each.index('host')
if ip_index:
if host_index < ip_index:
source['host'] = each(host_index + 1)
destination['address'] = ip_n_wildcard_bits[0]
destination['wildcard_bits'] = ip_n_wildcard_bits[1]
elif host_index > ip_index:
destination['host'] = each(host_index + 1)
source['address'] = ip_n_wildcard_bits[0]
source['wildcard_bits'] = ip_n_wildcard_bits[1]
else:
if config['afi'] == 'ipv4':
ip_n_wildcard_bits = re.findall(r'[0-9]+(?:\.[0-9]+){3}', each)
each = each.split(' ')
if len(ip_n_wildcard_bits) == 0 and len(any) == 1:
source['any'] = True
elif len(ip_n_wildcard_bits) == 1:
source['address'] = ip_n_wildcard_bits[0]
elif len(ip_n_wildcard_bits) == 2:
if 'any' in each:
if each.index('any') > each.index(ip_n_wildcard_bits[0]):
source['address'] = ip_n_wildcard_bits[0]
source['wildcard_bits'] = ip_n_wildcard_bits[1]
destination['any'] = True
elif each.index('any') < each.index(ip_n_wildcard_bits[0]):
source['any'] = True
destination['address'] = ip_n_wildcard_bits[0]
destination['wildcard_bits'] = ip_n_wildcard_bits[1]
else:
source['address'] = ip_n_wildcard_bits[0]
source['wildcard_bits'] = ip_n_wildcard_bits[1]
elif len(ip_n_wildcard_bits) == 4:
source['address'] = ip_n_wildcard_bits[0]
source['wildcard_bits'] = ip_n_wildcard_bits[1]
destination['address'] = ip_n_wildcard_bits[2]
destination['wildcard_bits'] = ip_n_wildcard_bits[3]
elif config['afi'] == 'ipv6':
temp_ipv6 = []
each = each.split(' ')
check_n_return_valid_ipv6_addr(self._module, each, temp_ipv6)
count = 0
for every in each:
if len(temp_ipv6) == 2:
if temp_ipv6[0] in every or temp_ipv6[1] in every:
temp_ipv6[count] = every
count += 1
elif len(temp_ipv6) == 1:
if temp_ipv6[0] in every:
temp_ipv6[count] = every
if 'any' in each:
if each.index('any') > each.index(temp_ipv6[0]):
source['address'] = temp_ipv6[0]
destination['any'] = True
elif each.index('any') < each.index(temp_ipv6[0]):
source['any'] = True
destination['address'] = temp_ipv6[0]
elif len(temp_ipv6) == 2:
source['address'] = temp_ipv6[0]
destination['address'] = temp_ipv6[1]
def parsed_config_facts(self, have_config):
"""
For parsed config have_config is string of commands which
need to be splitted before passing it through render_config
from spec for null values
:param have_config: The configuration
:rtype: list of have config
:returns: The splitted generated config
"""
split_config = re.split('ip|ipv6 access-list', have_config[0])
temp_config = []
# common piece of code for populating the temp_config list
def common_config_code(each, grant, temp_config):
temp = re.split(grant, each)
temp_config.append(temp[0])
temp_config.extend([grant + item for item in temp if 'access-list' not in item])
for each in split_config:
if 'v6' in each:
each = 'ipv6 ' + each.split('v6 ')[1]
if 'permit' in each:
common_config_code(each, 'permit', temp_config)
elif 'deny' in each:
common_config_code(each, 'deny', temp_config)
else:
each = 'ip' + each
if 'permit' in each:
common_config_code(each, 'permit', temp_config)
if 'deny' in each:
common_config_code(each, 'deny', temp_config)
return temp_config
def render_config(self, spec, have_config):
"""
Render config as dictionary structure and delete keys
from spec for null values
:param spec: The facts tree, generated from the argspec
:param conf: The configuration
:rtype: dictionary
:returns: The generated config
"""
# for parsed scnenario where commands are passed to generate the acls facts
if len(have_config) == 1:
have_config = self.parsed_config_facts(have_config)
config = deepcopy(spec)
render_config = list()
acls = dict()
aces = list()
temp_name = ''
for each in have_config:
each_list = [val for val in each.split(' ') if val != '']
if 'IPv6' in each or 'ipv6' in each:
if aces:
config['acls'].append(acls)
ip_config = config
if ip_config.get('acls'):
render_config.append(ip_config)
if not config['afi'] or config['afi'] == 'ipv4':
config = deepcopy(spec)
config['afi'] = 'ipv6'
acls = dict()
aces = list()
elif not config['afi'] and ('IP' in each or 'ip' in each):
config['afi'] = 'ipv4'
if 'access list' in each or 'access-list' in each:
try:
temp_index = each_list.index('list')
name = (each_list[temp_index + 1])
except ValueError:
name = each_list[-1]
if temp_name != name:
if aces:
config['acls'].append(acls)
acls = dict()
aces = list()
temp_name = name
acls['name'] = name
if 'Extended' in each:
acls['acl_type'] = 'extended'
continue
elif 'Standard' in each:
acls['acl_type'] = 'standard'
continue
ace_options = {}
try:
if config['afi'] == 'ipv4':
if 'deny' in each_list or 'permit' in each_list:
ace_options['sequence'] = int(each_list[0])
elif config['afi'] == 'ipv6':
if 'sequence' in each_list:
ace_options['sequence'] = int(each_list[each_list.index('sequence') + 1])
except ValueError:
pass
if utils.parse_conf_arg(each, 'permit'):
ace_options['grant'] = 'permit'
each_list.remove('permit')
elif utils.parse_conf_arg(each, 'deny'):
ace_options['grant'] = 'deny'
each_list.remove('deny')
protocol_option = ['ahp', 'eigrp', 'esp', 'gre', 'hbh', 'icmp', 'igmp', 'ip', 'ipv6', 'ipinip', 'nos',
'ospf', 'pcp', 'pim', 'sctp', 'tcp', 'udp']
tcp_flags = ['ack', 'established', 'fin', 'psh', 'rst', 'syn', 'urg']
icmp_options = ['administratively_prohibited', 'alternate_address', 'conversion_error',
'dod_host_prohibited', 'dod_net_prohibited', 'echo', 'echo_reply',
'general_parameter_problem', 'host_isolated', 'host_precedence_unreachable',
'host_redirect', 'host_tos_redirect', 'host_tos_unreachable', 'host_unknown',
'host_unreachable', 'information_reply', 'information_request', 'mask_reply',
'mask_request', 'mobile_redirect', 'net_redirect', 'net_tos_redirect',
'net_tos_unreachable', 'net_unreachable', 'network_unknown', 'no_room_for_option',
'option_missing', 'packet_too_big', 'parameter_problem', 'port_unreachable',
'precedence_unreachable', 'protocol_unreachable', 'reassembly_timeout', 'redirect',
'router_advertisement', 'router_solicitation', 'source_quench', 'source_route_failed',
'time_exceeded', 'timestamp_reply', 'timestamp_request', 'traceroute', 'ttl_exceeded',
'unreachable']
igmp_options = ['dvmrp', 'host_query', 'mtrace_resp', 'mtrace_route', 'pim', 'trace', 'v1host_report',
'v2host_report', 'v2leave_group', 'v3host_report']
temp_option = ''
for option in protocol_option:
if option in each_list and 'access' not in each_list[each_list.index(option) + 1]:
temp_option = option
each_list.remove(temp_option)
if temp_option == 'tcp':
temp_flag = [each_flag for each_flag in tcp_flags if each_flag in each]
if temp_flag:
flag = temp_flag[0]
if flag in each_list:
each_list.remove(flag)
temp_flag = flag
if temp_option == 'icmp':
temp_flag = [each_option for each_option in icmp_options if each_option in each]
if temp_flag:
flag = temp_flag[0]
if flag in each_list:
each_list.remove(flag)
temp_flag = flag
if temp_option == 'igmp':
temp_flag = [each_option for each_option in igmp_options if each_option in each]
if temp_flag:
flag = temp_flag[0]
if flag in each_list:
each_list.remove(flag)
temp_flag = flag
break
dscp = utils.parse_conf_arg(each, 'dscp')
if dscp:
ace_options['dscp'] = dscp.split(' ')[0]
fragments = utils.parse_conf_arg(each, 'fragments')
if fragments:
ace_options['fragments'] = fragments.split(' ')[0]
log = utils.parse_conf_arg(each, 'log')
if log:
ace_options['log'] = log.split(' ')[0]
log_input = utils.parse_conf_arg(each, 'log_input')
if log_input:
ace_options['log_input'] = log_input.split(' ')[0]
option = utils.parse_conf_arg(each, 'option')
if option:
option = option.split(' ')[0]
option_dict = {}
option_dict[option] = True
ace_options['option'] = option_dict
precedence = utils.parse_conf_arg(each, 'precedence')
if precedence:
ace_options['precedence'] = precedence.split(' ')[0]
time_range = utils.parse_conf_arg(each, 'time_range')
if time_range:
ace_options['time_range'] = time_range.split(' ')[0]
tos = utils.parse_conf_arg(each, 'tos')
if tos:
tos_val = dict()
try:
tos_val['service_value'] = int(tos)
except ValueError:
tos = tos.replace('-', '_')
tos_val[tos] = True
ace_options['tos'] = tos_val
ttl = utils.parse_conf_arg(each, 'ttl')
if ttl:
temp_ttl = ttl.split(' ')
ttl = {}
ttl[temp_ttl[0]] = temp_ttl[1]
each_list = [item for item in each_list[:each_list.index('ttl')]]
ace_options['ttl'] = ttl
source = {}
destination = {}
self.populate_source_destination(each, config, source, destination)
if source.get('address') and source.get('address') == destination.get('address'):
self._module.fail_json(msg='Source and Destination address cannot be same!')
else:
self.populate_port_protocol(source, destination, each_list)
if source:
ace_options['source'] = source
if destination:
ace_options['destination'] = destination
if temp_option:
protocol_options = {}
ace_options['protocol'] = temp_option
if temp_option == 'tcp':
tcp = {}
if temp_flag:
tcp[temp_flag] = True
else:
tcp['set'] = True
protocol_options[temp_option] = tcp
elif temp_option == 'icmp':
icmp = dict()
if temp_flag:
icmp[temp_flag] = True
else:
icmp['set'] = True
protocol_options[temp_option] = icmp
elif temp_option == 'igmp':
igmp = dict()
if temp_flag:
igmp[temp_flag] = True
else:
igmp['set'] = True
protocol_options[temp_option] = igmp
else:
protocol_options[temp_option] = True
ace_options['protocol_options'] = protocol_options
if ace_options:
aces.append(ace_options)
acls['aces'] = aces
if acls:
if not config.get('acls'):
config['acls'] = list()
config['acls'].append(acls)
if config not in render_config:
render_config.append(utils.remove_empties(config))
# delete the populated config
del config
return render_config

View file

@ -25,6 +25,7 @@ from ansible.module_utils.network.ios.facts.lldp_interfaces.lldp_interfaces impo
from ansible.module_utils.network.ios.facts.l3_interfaces.l3_interfaces import L3_InterfacesFacts from ansible.module_utils.network.ios.facts.l3_interfaces.l3_interfaces import L3_InterfacesFacts
from ansible.module_utils.network.ios.facts.acl_interfaces.acl_interfaces import Acl_InterfacesFacts from ansible.module_utils.network.ios.facts.acl_interfaces.acl_interfaces import Acl_InterfacesFacts
from ansible.module_utils.network.ios.facts.static_routes.static_routes import Static_RoutesFacts from ansible.module_utils.network.ios.facts.static_routes.static_routes import Static_RoutesFacts
from ansible.module_utils.network.ios.facts.acls.acls import AclsFacts
from ansible.module_utils.network.ios.facts.legacy.base import Default, Hardware, Interfaces, Config from ansible.module_utils.network.ios.facts.legacy.base import Default, Hardware, Interfaces, Config
@ -47,6 +48,7 @@ FACT_RESOURCE_SUBSETS = dict(
l3_interfaces=L3_InterfacesFacts, l3_interfaces=L3_InterfacesFacts,
acl_interfaces=Acl_InterfacesFacts, acl_interfaces=Acl_InterfacesFacts,
static_routes=Static_RoutesFacts, static_routes=Static_RoutesFacts,
acls=AclsFacts,
) )

View file

@ -9,6 +9,7 @@
from __future__ import absolute_import, division, print_function from __future__ import absolute_import, division, print_function
__metaclass__ = type __metaclass__ = type
import socket
from ansible.module_utils.six import iteritems from ansible.module_utils.six import iteritems
from ansible.module_utils.network.common.utils import is_masklen, to_netmask from ansible.module_utils.network.common.utils import is_masklen, to_netmask
@ -28,7 +29,22 @@ def add_command_to_config_list(interface, cmd, commands):
commands.append(cmd) commands.append(cmd)
def new_dict_to_set(input_dict, temp_list, test_set, count): def check_n_return_valid_ipv6_addr(module, input_list, filtered_ipv6_list):
# To verify the valid ipv6 address
try:
for each in input_list:
if '::' in each:
if '/' in each:
each = each.split('/')[0]
if socket.inet_pton(socket.AF_INET6, each):
filtered_ipv6_list.append(each)
return filtered_ipv6_list
except socket.error:
module.fail_json(msg='Incorrect IPV6 address!')
def new_dict_to_set(input_dict, temp_list, test_set, count=0):
# recursive function to convert input dict to set for comparision
test_dict = dict() test_dict = dict()
if isinstance(input_dict, dict): if isinstance(input_dict, dict):
input_dict_len = len(input_dict) input_dict_len = len(input_dict)
@ -45,9 +61,26 @@ def new_dict_to_set(input_dict, temp_list, test_set, count):
else: else:
if v is not None: if v is not None:
test_dict.update({k: v}) test_dict.update({k: v})
if tuple(iteritems(test_dict)) not in test_set and count == input_dict_len: try:
test_set.add(tuple(iteritems(test_dict))) if tuple(iteritems(test_dict)) not in test_set and count == input_dict_len:
count = 0 test_set.add(tuple(iteritems(test_dict)))
count = 0
except TypeError:
temp_dict = {}
def expand_dict(dict_to_expand):
temp = dict()
for k, v in iteritems(dict_to_expand):
if isinstance(v, dict):
expand_dict(v)
else:
if v is not None:
temp.update({k: v})
temp_dict.update(tuple(iteritems(temp)))
new_dict = {k: v}
expand_dict(new_dict)
if tuple(iteritems(temp_dict)) not in test_set:
test_set.add(tuple(iteritems(temp_dict)))
def dict_to_set(sample_dict): def dict_to_set(sample_dict):

File diff suppressed because it is too large Load diff

View file

@ -58,7 +58,7 @@ options:
a specific subset should not be collected. a specific subset should not be collected.
Valid subsets are 'all', 'interfaces', 'l2_interfaces', 'vlans', Valid subsets are 'all', 'interfaces', 'l2_interfaces', 'vlans',
'lag_interfaces', 'lacp', 'lacp_interfaces', 'lldp_global', 'lag_interfaces', 'lacp', 'lacp_interfaces', 'lldp_global',
'lldp_interfaces', 'l3_interfaces', 'acl_interfaces', 'static_routes'. 'lldp_interfaces', 'l3_interfaces', 'acl_interfaces', 'static_routes', 'acls'.
version_added: "2.9" version_added: "2.9"
""" """

View file

@ -0,0 +1,3 @@
---
testcase: "[^_].*"
test_items: []

View file

@ -0,0 +1 @@
dependencies: []

View file

@ -0,0 +1,21 @@
---
- name: Collect all cli test cases
find:
paths: "{{ role_path }}/tests/cli"
patterns: "{{ testcase }}.yaml"
use_regex: true
register: test_cases
delegate_to: localhost
- name: Set test_items
set_fact: test_items="{{ test_cases.files | map(attribute='path') | list }}"
delegate_to: localhost
- name: Run test case (connection=network_cli)
include: "{{ test_case_to_run }}"
vars:
ansible_connection: network_cli
with_items: "{{ test_items }}"
loop_control:
loop_var: test_case_to_run
tags: connection_network_cli

View file

@ -0,0 +1,2 @@
---
- { include: cli.yaml, tags: ['cli'] }

View file

@ -0,0 +1,7 @@
ip access-list extended test_acl
deny tcp 192.0.2.0 0.0.0.255 192.0.3.0 0.0.0.255 eq www fin option traceroute ttl eq 10
ip access-list extended 123
deny tcp 198.51.100.0 0.0.0.255 198.51.101.0 0.0.0.255 eq telnet ack tos 12
deny tcp 192.0.3.0 0.0.0.255 192.0.4.0 0.0.0.255 eq www ack dscp ef ttl lt 20
ipv6 access-list R1_TRAFFIC
deny tcp any eq www any eq telnet ack dscp af11

View file

@ -0,0 +1,15 @@
---
- name: Populate Config
cli_config:
config: "{{ lines }}"
vars:
lines: |
ip access-list extended test_acl
deny tcp 192.0.2.0 0.0.0.255 192.0.3.0 0.0.0.255 eq www fin option traceroute ttl eq 10
ip access-list extended 110
deny icmp 192.0.2.0 0.0.0.255 192.0.3.0 0.0.0.255 echo dscp ef ttl eq 10
ip access-list extended 123
deny tcp 198.51.100.0 0.0.0.255 198.51.101.0 0.0.0.255 eq telnet ack tos 12
deny tcp 192.0.3.0 0.0.0.255 192.0.4.0 0.0.0.255 eq www ack dscp ef ttl lt 20
ipv6 access-list R1_TRAFFIC
deny tcp any eq www any eq telnet ack dscp af11

View file

@ -0,0 +1,12 @@
---
- name: Remove Config
cli_config:
config: "{{ lines }}"
vars:
lines: |
no ip access-list standard std_acl
no ip access-list extended test_acl
no ip access-list extended 110
no ip access-list extended 123
no ip access-list extended 150
no ipv6 access-list R1_TRAFFIC

View file

@ -0,0 +1,70 @@
---
- debug:
msg: "Start Deleted integration state for ios_acls ansible_connection={{ ansible_connection }}"
- include_tasks: _remove_config.yaml
- include_tasks: _populate_config.yaml
- block:
- name: Delete attributes of provided configured ACLs
ios_acls: &deleted
config:
- afi: ipv4
acls:
- name: test_acl
acl_type: extended
- name: 110
aces:
- sequence: 10
- name: 123
- afi: ipv6
acls:
- name: R1_TRAFFIC
state: deleted
register: result
- assert:
that:
- "result.commands|length == 5"
- "result.changed == true"
- "result.commands|symmetric_difference(deleted.commands) == []"
- name: Delete attributes of all configured interfaces (IDEMPOTENT)
ios_acls: *deleted
register: result
- name: Assert that the previous task was idempotent
assert:
that:
- "result.commands|length == 0"
- "result.changed == false"
- include_tasks: _remove_config.yaml
- include_tasks: _populate_config.yaml
- name: Delete ACL attributes of provided configured interfaces based on AFI
ios_acls: &deleted_afi
config:
- afi: ipv4
state: deleted
register: result
- assert:
that:
- "result.commands|length == 3"
- "result.changed == true"
- "result.commands|symmetric_difference(deleted_afi.commands) == []"
- name: Delete ACL attributes of provided configured interfaces based on AFI (IDEMPOTENT)
ios_acls: *deleted_afi
register: result
- name: Assert that the previous task was idempotent
assert:
that:
- "result.commands|length == 0"
- "result.changed == false"
always:
- include_tasks: _remove_config.yaml

View file

@ -0,0 +1,58 @@
---
- debug:
msg: "START ios_acls empty_config.yaml integration tests on connection={{ ansible_connection }}"
- name: Merged with empty config should give appropriate error message
ios_acls:
config:
state: merged
register: result
ignore_errors: True
- assert:
that:
- result.msg == 'value of config parameter must not be empty for state merged'
- name: Replaced with empty config should give appropriate error message
ios_acls:
config:
state: replaced
register: result
ignore_errors: True
- assert:
that:
- result.msg == 'value of config parameter must not be empty for state replaced'
- name: Overridden with empty config should give appropriate error message
ios_acls:
config:
state: overridden
register: result
ignore_errors: True
- assert:
that:
- result.msg == 'value of config parameter must not be empty for state overridden'
- name: Rendered with empty config should give appropriate error message
ios_acls:
config:
state: rendered
register: result
ignore_errors: True
- assert:
that:
- result.msg == 'value of config parameter must not be empty for state rendered'
- name: Parsed with empty config should give appropriate error message
ios_acls:
running_config:
state: parsed
register: result
ignore_errors: True
- assert:
that:
- result.msg == 'value of running_config parameter must not be empty for state parsed'

View file

@ -0,0 +1,21 @@
---
- debug:
msg: "START ios_acls gathered integration tests on connection={{ ansible_connection }}"
- include_tasks: _remove_config.yaml
- include_tasks: _populate_config.yaml
- block:
- name: Gather the provided configuration with the exisiting running configuration
ios_acls: &gathered
config:
state: gathered
register: result
- assert:
that:
- "gathered['config'] | symmetric_difference(result.gathered) == []"
- "result['changed'] == false"
always:
- include_tasks: _remove_config.yaml

View file

@ -0,0 +1,123 @@
---
- debug:
msg: "START Merged ios_acls state for integration tests on connection={{ ansible_connection }}"
- include_tasks: _remove_config.yaml
- block:
- name: Merge provided configuration with device configuration
ios_acls: &merged
config:
- afi: ipv4
acls:
- name: std_acl
acl_type: standard
aces:
- grant: deny
source:
address: 192.0.2.0
wildcard_bits: 0.0.0.255
- name: test_acl
acl_type: extended
aces:
- grant: deny
protocol_options:
tcp:
fin: true
source:
address: 192.0.2.0
wildcard_bits: 0.0.0.255
destination:
address: 192.0.3.0
wildcard_bits: 0.0.0.255
port_protocol:
eq: www
option:
traceroute: true
ttl:
eq: 10
- name: 110
aces:
- grant: deny
sequence: 10
protocol_options:
icmp:
echo: true
source:
address: 192.0.2.0
wildcard_bits: 0.0.0.255
destination:
address: 192.0.3.0
wildcard_bits: 0.0.0.255
dscp: ef
ttl:
eq: 10
- name: 123
aces:
- grant: deny
protocol_options:
tcp:
ack: true
source:
address: 198.51.100.0
wildcard_bits: 0.0.0.255
destination:
address: 198.51.101.0
wildcard_bits: 0.0.0.255
port_protocol:
eq: telnet
tos:
service_value: 12
- grant: deny
protocol_options:
tcp:
ack: true
source:
address: 192.0.3.0
wildcard_bits: 0.0.0.255
destination:
address: 192.0.4.0
wildcard_bits: 0.0.0.255
port_protocol:
eq: www
dscp: ef
ttl:
lt: 20
- afi: ipv6
acls:
- name: R1_TRAFFIC
aces:
- grant: deny
protocol_options:
tcp:
ack: true
source:
any: true
port_protocol:
eq: www
destination:
any: true
port_protocol:
eq: telnet
dscp: af11
state: merged
register: result
- assert:
that:
- "result.commands|length == 11"
- "result.changed == true"
- "result.commands|symmetric_difference(merged.commands) == []"
- name: Merge provided configuration with device configuration (IDEMPOTENT)
ios_acls: *merged
register: result
- name: Assert that the previous task was idempotent
assert:
that:
- "result.commands|length == 0"
- "result['changed'] == false"
always:
- include_tasks: _remove_config.yaml

View file

@ -0,0 +1,73 @@
---
- debug:
msg: "START Overridden ios_acls state for integration tests on connection={{ ansible_connection }}"
- include_tasks: _remove_config.yaml
- include_tasks: _populate_config.yaml
- block:
- name: Override device configuration of all interfaces with provided configuration
ios_acls: &overridden
config:
- afi: ipv4
acls:
- name: 110
aces:
- grant: deny
protocol_options:
tcp:
ack: true
source:
address: 198.51.100.0
wildcard_bits: 0.0.0.255
port_protocol:
eq: telnet
destination:
address: 198.51.110.0
wildcard_bits: 0.0.0.255
port_protocol:
eq: www
dscp: ef
ttl:
eq: 10
- name: 150
aces:
- grant: deny
protocol_options:
tcp:
syn: true
source:
address: 198.51.100.0
wildcard_bits: 0.0.0.255
port_protocol:
eq: telnet
destination:
address: 198.51.110.0
wildcard_bits: 0.0.0.255
port_protocol:
eq: telnet
dscp: ef
ttl:
eq: 10
state: overridden
register: result
- assert:
that:
- "result.commands|length == 8"
- "result.changed == true"
- "result.commands|symmetric_difference(overridden.commands) == []"
- name: Override device configuration of all interfaces with provided configuration (IDEMPOTENT)
ios_acls: *overridden
register: result
- name: Assert that task was idempotent
assert:
that:
- "result.commands|length == 0"
- "result['changed'] == false"
always:
- include_tasks: _remove_config.yaml

View file

@ -0,0 +1,16 @@
---
- debug:
msg: "START ios_acls parsed integration tests on connection={{ ansible_connection }}"
- name: Parse the commands for provided configuration
ios_acls: &parsed
running_config:
"{{ lookup('file', '_parsed.cfg') }}"
state: parsed
become: yes
register: result
- assert:
that:
- "result.changed == false"
- "parsed['config']|symmetric_difference(result.parsed) == []"

View file

@ -0,0 +1,54 @@
---
- debug:
msg: "Start ios_acls rendered integration tests ansible_connection={{ ansible_connection }}"
- block:
- name: Rendered the provided configuration with the exisiting running configuration
ios_acls:
config:
- afi: ipv4
acls:
- name: 110
aces:
- grant: deny
sequence: 10
protocol_options:
tcp:
syn: true
source:
address: 192.0.2.0
wildcard_bits: 0.0.0.255
destination:
address: 192.0.3.0
wildcard_bits: 0.0.0.255
port_protocol:
eq: www
dscp: ef
ttl:
eq: 10
- name: 150
aces:
- grant: deny
protocol_options:
tcp:
syn: true
source:
address: 198.51.100.0
wildcard_bits: 0.0.0.255
port_protocol:
eq: telnet
destination:
address: 198.51.110.0
wildcard_bits: 0.0.0.255
port_protocol:
eq: telnet
dscp: ef
ttl:
eq: 10
state: rendered
register: result
- assert:
that:
- "result.changed == false"
- "result.rendered|symmetric_difference(rendered.commands) == []"

View file

@ -0,0 +1,72 @@
---
- debug:
msg: "START Replaced ios_acls state for integration tests on connection={{ ansible_connection }}"
- include_tasks: _remove_config.yaml
- include_tasks: _populate_config.yaml
- block:
- name: Replaces device configuration of listed interfaces with provided configuration
ios_acls: &replaced
config:
- afi: ipv4
acls:
- name: 110
aces:
- grant: deny
protocol_options:
tcp:
syn: true
source:
address: 192.0.2.0
wildcard_bits: 0.0.0.255
destination:
address: 192.0.3.0
wildcard_bits: 0.0.0.255
port_protocol:
eq: www
dscp: ef
ttl:
eq: 10
- name: 150
aces:
- grant: deny
sequence: 20
protocol_options:
tcp:
syn: true
source:
address: 198.51.100.0
wildcard_bits: 0.0.0.255
port_protocol:
eq: telnet
destination:
address: 198.51.110.0
wildcard_bits: 0.0.0.255
port_protocol:
eq: telnet
dscp: ef
ttl:
eq: 10
state: replaced
register: result
- assert:
that:
- "result.commands|length == 5"
- "result.changed == true"
- "result.commands|symmetric_difference(replaced.commands) == []"
- name: Replaces device configuration of listed interfaces with provided configuration (IDEMPOTENT)
ios_acls: *replaced
register: result
- name: Assert that task was idempotent
assert:
that:
- "result.commands|length == 0"
- "result['changed'] == false"
always:
- include_tasks: _remove_config.yaml

View file

@ -0,0 +1,173 @@
---
- debug:
msg: "START ios_acls round trip integration tests on connection={{ ansible_connection }}"
- include_tasks: _remove_config.yaml
- include_tasks: _populate_config.yaml
- block:
- name: Apply the provided configuration (base config)
ios_acls:
config:
- afi: ipv4
acls:
- name: test_acl
acl_type: extended
aces:
- grant: deny
protocol_options:
tcp:
fin: true
source:
address: 192.0.2.0
wildcard_bits: 0.0.0.255
destination:
address: 192.0.3.0
wildcard_bits: 0.0.0.255
port_protocol:
eq: www
option:
traceroute: true
ttl:
eq: 10
- name: 110
aces:
- grant: deny
protocol_options:
icmp:
echo: true
source:
address: 192.0.2.0
wildcard_bits: 0.0.0.255
destination:
address: 192.0.3.0
wildcard_bits: 0.0.0.255
dscp: ef
ttl:
eq: 10
- name: 123
aces:
- grant: deny
protocol_options:
tcp:
ack: true
source:
address: 198.51.100.0
wildcard_bits: 0.0.0.255
destination:
address: 198.51.101.0
wildcard_bits: 0.0.0.255
port_protocol:
eq: telnet
tos:
service_value: 12
- grant: deny
protocol_options:
tcp:
ack: true
source:
address: 192.0.3.0
wildcard_bits: 0.0.0.255
destination:
address: 192.0.4.0
wildcard_bits: 0.0.0.255
port_protocol:
eq: www
dscp: ef
ttl:
lt: 20
- afi: ipv6
acls:
- name: R1_TRAFFIC
aces:
- grant: deny
protocol_options:
tcp:
ack: true
source:
any: true
port_protocol:
eq: www
destination:
any: true
port_protocol:
eq: telnet
dscp: af11
state: merged
register: base_config
- name: Gather ACLs facts
ios_facts:
gather_subset:
- "!all"
- "!min"
gather_network_resources:
- acls
- name: Apply the configuration which need to be reverted
ios_acls:
config:
- afi: ipv4
acls:
- name: 110
aces:
- grant: deny
protocol_options:
tcp:
ack: true
source:
address: 198.51.100.0
wildcard_bits: 0.0.0.255
port_protocol:
eq: telnet
destination:
address: 198.51.110.0
wildcard_bits: 0.0.0.255
port_protocol:
eq: www
dscp: ef
ttl:
eq: 10
- name: 150
aces:
- grant: deny
protocol_options:
tcp:
syn: true
source:
address: 198.51.100.0
wildcard_bits: 0.0.0.255
port_protocol:
eq: telnet
destination:
address: 198.51.110.0
wildcard_bits: 0.0.0.255
port_protocol:
eq: telnet
dscp: ef
ttl:
eq: 10
state: overridden
register: result
- assert:
that:
- "result.commands|length == 8"
- "result.changed == true"
- "result.commands|symmetric_difference(overridden.commands) == []"
- name: Revert back to base config using facts round trip
ios_acls:
config: "{{ ansible_facts['network_resources']['acls'] }}"
state: overridden
register: revert
- assert:
that:
- "revert.commands|length == 11"
- "revert.changed == true"
- "revert.commands|symmetric_difference(rtt.commands) == []"
always:
- include_tasks: _remove_config.yaml

View file

@ -0,0 +1,242 @@
---
deleted:
commands:
- "no ip access-list extended test_acl"
- "ip access-list extended 110"
- "no 10"
- "no ip access-list extended 123"
- "no ipv6 access-list R1_TRAFFIC"
deleted_afi:
commands:
- "no ip access-list extended 110"
- "no ip access-list extended 123"
- "no ip access-list extended test_acl"
merged:
commands:
- "ip access-list standard std_acl"
- "deny 192.0.2.0 0.0.0.255"
- "ip access-list extended test_acl"
- "deny tcp 192.0.2.0 0.0.0.255 192.0.3.0 0.0.0.255 eq www fin option traceroute ttl eq 10"
- "ip access-list extended 110"
- "10 deny icmp 192.0.2.0 0.0.0.255 192.0.3.0 0.0.0.255 echo dscp ef ttl eq 10"
- "ip access-list extended 123"
- "deny tcp 198.51.100.0 0.0.0.255 198.51.101.0 0.0.0.255 eq telnet ack tos 12"
- "deny tcp 192.0.3.0 0.0.0.255 192.0.4.0 0.0.0.255 eq www ack dscp ef ttl lt 20"
- "ipv6 access-list R1_TRAFFIC"
- "deny tcp any eq www any eq telnet ack dscp af11"
replaced:
commands:
- "ip access-list extended 110"
- "no 10"
- "deny tcp 192.0.2.0 0.0.0.255 192.0.3.0 0.0.0.255 eq www syn dscp ef ttl eq 10"
- "ip access-list extended 150"
- "20 deny tcp 198.51.100.0 0.0.0.255 eq telnet 198.51.110.0 0.0.0.255 eq telnet syn dscp ef ttl eq 10"
overridden:
commands:
- "no ip access-list extended 123"
- "no ip access-list extended test_acl"
- "no ipv6 access-list R1_TRAFFIC"
- "ip access-list extended 110"
- "no 10"
- "deny tcp 198.51.100.0 0.0.0.255 eq telnet 198.51.110.0 0.0.0.255 eq www ack dscp ef ttl eq 10"
- "ip access-list extended 150"
- "deny tcp 198.51.100.0 0.0.0.255 eq telnet 198.51.110.0 0.0.0.255 eq telnet syn dscp ef ttl eq 10"
gathered:
config:
- acls:
- aces:
- destination:
address: 192.0.3.0
wildcard_bits: 0.0.0.255
dscp: ef
grant: deny
protocol: icmp
protocol_options:
icmp:
echo: true
sequence: 10
source:
address: 192.0.2.0
wildcard_bits: 0.0.0.255
ttl:
eq: 10
acl_type: extended
name: '110'
- aces:
- destination:
address: 198.51.101.0
port_protocol:
eq: telnet
wildcard_bits: 0.0.0.255
grant: deny
protocol: tcp
protocol_options:
tcp:
ack: true
sequence: 10
source:
address: 198.51.100.0
wildcard_bits: 0.0.0.255
tos:
service_value: 12
- destination:
address: 192.0.4.0
port_protocol:
eq: www
wildcard_bits: 0.0.0.255
dscp: ef
grant: deny
protocol: tcp
protocol_options:
tcp:
ack: true
sequence: 20
source:
address: 192.0.3.0
wildcard_bits: 0.0.0.255
ttl:
lt: 20
acl_type: extended
name: '123'
- aces:
- destination:
address: 192.0.3.0
port_protocol:
eq: www
wildcard_bits: 0.0.0.255
grant: deny
option:
traceroute: true
protocol: tcp
protocol_options:
tcp:
fin: true
sequence: 10
source:
address: 192.0.2.0
wildcard_bits: 0.0.0.255
ttl:
eq: 10
acl_type: extended
name: test_acl
afi: ipv4
- acls:
- aces:
- destination:
any: true
port_protocol:
eq: telnet
dscp: af11
grant: deny
protocol: tcp
protocol_options:
tcp:
ack: true
sequence: 10
source:
any: true
port_protocol:
eq: www
name: R1_TRAFFIC
afi: ipv6
parsed:
config:
- acls:
- aces:
- destination:
address: 192.0.3.0
port_protocol:
eq: www
wildcard_bits: 0.0.0.255
grant: deny
option:
traceroute: true
protocol: tcp
protocol_options:
tcp:
fin: true
source:
address: 192.0.2.0
wildcard_bits: 0.0.0.255
ttl:
eq: 10
name: test_acl
- aces:
- destination:
address: 198.51.101.0
port_protocol:
eq: telnet
wildcard_bits: 0.0.0.255
grant: deny
protocol: tcp
protocol_options:
tcp:
ack: true
source:
address: 198.51.100.0
wildcard_bits: 0.0.0.255
tos:
service_value: 12
- destination:
address: 192.0.4.0
port_protocol:
eq: www
wildcard_bits: 0.0.0.255
dscp: ef
grant: deny
protocol: tcp
protocol_options:
tcp:
ack: true
source:
address: 192.0.3.0
wildcard_bits: 0.0.0.255
ttl:
lt: 20
name: '123'
afi: ipv4
- acls:
- aces:
- destination:
any: true
port_protocol:
eq: telnet
dscp: af11
grant: deny
protocol: tcp
protocol_options:
tcp:
ack: true
source:
any: true
port_protocol:
eq: www
name: R1_TRAFFIC
afi: ipv6
rendered:
commands:
- ip access-list extended 110
- 10 deny tcp 192.0.2.0 0.0.0.255 192.0.3.0 0.0.0.255 eq www syn dscp ef ttl eq 10
- ip access-list extended 150
- deny tcp 198.51.100.0 0.0.0.255 eq telnet 198.51.110.0 0.0.0.255 eq telnet syn dscp ef ttl eq 10
rtt:
commands:
- no ip access-list extended 150
- ip access-list extended 110
- no 10
- 10 deny icmp 192.0.2.0 0.0.0.255 192.0.3.0 0.0.0.255 echo dscp ef ttl eq 10
- ip access-list extended 123
- 10 deny tcp 198.51.100.0 0.0.0.255 198.51.101.0 0.0.0.255 eq telnet ack tos 12
- 20 deny tcp 192.0.3.0 0.0.0.255 192.0.4.0 0.0.0.255 eq www ack dscp ef ttl lt 20
- ip access-list extended test_acl
- 10 deny tcp 192.0.2.0 0.0.0.255 192.0.3.0 0.0.0.255 eq www fin option traceroute ttl eq 10
- ipv6 access-list R1_TRAFFIC
- sequence 10 deny tcp any eq www any eq telnet ack dscp af11

View file

@ -0,0 +1,4 @@
ip access-list extended 110
deny icmp 192.0.2.0 0.0.0.255 192.0.3.0 0.0.0.255 echo dscp ef ttl eq 10
ipv6 access-list R1_TRAFFIC
deny tcp any eq www any eq telnet ack dscp af11

View file

@ -0,0 +1,442 @@
#
# (c) 2019, Ansible by Red Hat, inc
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
#
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from units.compat.mock import patch
from ansible.modules.network.ios import ios_acls
from units.modules.utils import set_module_args
from .ios_module import TestIosModule, load_fixture
class TestIosAclsModule(TestIosModule):
module = ios_acls
def setUp(self):
super(TestIosAclsModule, self).setUp()
self.mock_get_config = patch('ansible.module_utils.network.common.network.Config.get_config')
self.get_config = self.mock_get_config.start()
self.mock_load_config = patch('ansible.module_utils.network.common.network.Config.load_config')
self.load_config = self.mock_load_config.start()
self.mock_get_resource_connection_config = patch('ansible.module_utils.network.common.cfg.base.'
'get_resource_connection')
self.get_resource_connection_config = self.mock_get_resource_connection_config.start()
self.mock_get_resource_connection_facts = patch('ansible.module_utils.network.common.facts.facts.'
'get_resource_connection')
self.get_resource_connection_facts = self.mock_get_resource_connection_facts.start()
self.mock_edit_config = patch('ansible.module_utils.network.ios.providers.providers.CliProvider.edit_config')
self.edit_config = self.mock_edit_config.start()
self.mock_execute_show_command = patch('ansible.module_utils.network.ios.facts.acls.acls.'
'AclsFacts.get_acl_data')
self.execute_show_command = self.mock_execute_show_command.start()
def tearDown(self):
super(TestIosAclsModule, self).tearDown()
self.mock_get_resource_connection_config.stop()
self.mock_get_resource_connection_facts.stop()
self.mock_edit_config.stop()
self.mock_get_config.stop()
self.mock_load_config.stop()
self.mock_execute_show_command.stop()
def load_fixtures(self, commands=None, transport='cli'):
def load_from_file(*args, **kwargs):
return load_fixture('ios_acls_config.cfg')
self.execute_show_command.side_effect = load_from_file
def test_ios_acls_merged(self):
set_module_args(
dict(config=[
dict(afi="ipv4",
acls=[
dict(name="std_acl",
acl_type="standard",
aces=[
dict(
grant="deny",
source=dict(
address="192.0.2.0",
wildcard_bits="0.0.0.255"
)
)
])
]),
dict(afi="ipv6",
acls=[
dict(name="merge_v6_acl",
aces=[
dict(
grant="deny",
protocol_options=dict(
tcp=dict(ack="true")
),
source=dict(
any="true",
port_protocol=dict(eq="www")
),
destination=dict(
any="true",
port_protocol=dict(eq="telnet")),
dscp="af11"
)
])
])
], state="merged"
)
)
result = self.execute_module(changed=True)
commands = [
'ip access-list standard std_acl',
'deny 192.0.2.0 0.0.0.255',
'ipv6 access-list merge_v6_acl',
'deny tcp any eq www any eq telnet ack dscp af11'
]
self.assertEqual(result['commands'], commands)
def test_ios_acls_merged_idempotent(self):
set_module_args(
dict(config=[
dict(afi="ipv4",
acls=[
dict(name="110",
aces=[
dict(
grant="deny",
protocol_options=dict(
icmp=dict(echo="true")
),
source=dict(
address="192.0.2.0",
wildcard_bits="0.0.0.255"
),
destination=dict(
address="192.0.3.0",
wildcard_bits="0.0.0.255"
),
dscp="ef",
ttl=dict(eq=10)
)
])
]),
dict(afi="ipv6",
acls=[
dict(name="R1_TRAFFIC",
aces=[
dict(
grant="deny",
protocol_options=dict(tcp=dict(ack="true")),
source=dict(
any="true",
port_protocol=dict(eq="www")
),
destination=dict(
any="true",
port_protocol=dict(eq="telnet")
),
dscp="af11"
)
])
])
], state="merged"
))
self.execute_module(changed=False, commands=[], sort=True)
def test_ios_acls_replaced(self):
set_module_args(
dict(config=[
dict(afi="ipv4",
acls=[
dict(name="replace_acl",
acl_type="extended",
aces=[
dict(
grant="deny",
protocol_options=dict(
tcp=dict(ack="true")
),
source=dict(
address="198.51.100.0",
wildcard_bits="0.0.0.255"
),
destination=dict(
address="198.51.101.0",
wildcard_bits="0.0.0.255",
port_protocol=dict(eq="telnet")
),
tos=dict(service_value=12)
)
])
])
], state="replaced"
))
result = self.execute_module(changed=True)
commands = [
'ip access-list extended replace_acl',
'deny tcp 198.51.100.0 0.0.0.255 198.51.101.0 0.0.0.255 eq telnet ack tos 12'
]
self.assertEqual(result['commands'], commands)
def test_ios_acls_replaced_idempotent(self):
set_module_args(
dict(config=[
dict(afi="ipv4",
acls=[
dict(name="110",
aces=[
dict(
grant="deny",
protocol_options=dict(
icmp=dict(echo="true")
),
source=dict(
address="192.0.2.0",
wildcard_bits="0.0.0.255"
),
destination=dict(
address="192.0.3.0",
wildcard_bits="0.0.0.255"
),
dscp="ef",
ttl=dict(eq=10)
)
])
])
], state="replaced"
))
self.execute_module(changed=False, commands=[], sort=True)
def test_ios_acls_overridden(self):
set_module_args(
dict(config=[
dict(afi="ipv4",
acls=[
dict(name="150",
aces=[
dict(
grant="deny",
protocol_options=dict(
tcp=dict(syn="true")
),
source=dict(
address="198.51.100.0",
wildcard_bits="0.0.0.255",
port_protocol=dict(eq="telnet")
),
destination=dict(
address="198.51.110.0",
wildcard_bits="0.0.0.255",
port_protocol=dict(eq="telnet")
),
dscp="ef",
ttl=dict(eq=10)
)
])
])
], state="overridden"
))
result = self.execute_module(changed=True)
commands = [
'no ip access-list extended 110',
'no ipv6 access-list R1_TRAFFIC',
'ip access-list extended 150',
'deny tcp 198.51.100.0 0.0.0.255 eq telnet 198.51.110.0 0.0.0.255 eq telnet syn dscp ef ttl eq 10'
]
self.assertEqual(result['commands'], commands)
def test_ios_acls_overridden_idempotent(self):
set_module_args(
dict(config=[
dict(afi="ipv4",
acls=[
dict(name="110",
aces=[
dict(
grant="deny",
protocol_options=dict(
icmp=dict(echo="true")
),
source=dict(
address="192.0.2.0",
wildcard_bits="0.0.0.255"
),
destination=dict(
address="192.0.3.0",
wildcard_bits="0.0.0.255"
),
dscp="ef",
ttl=dict(eq=10)
)
])
]),
dict(afi="ipv6",
acls=[
dict(name="R1_TRAFFIC",
aces=[
dict(
grant="deny",
protocol_options=dict(tcp=dict(ack="true")),
source=dict(
any="true",
port_protocol=dict(eq="www")
),
destination=dict(
any="true",
port_protocol=dict(eq="telnet")
),
dscp="af11"
)
])
])
], state="overridden"
))
self.execute_module(changed=False, commands=[], sort=True)
def test_ios_acls_deleted_afi_based(self):
set_module_args(
dict(config=[
dict(afi="ipv4")
], state="deleted"
))
result = self.execute_module(changed=True)
commands = [
'no ip access-list extended 110'
]
self.assertEqual(result['commands'], commands)
def test_ios_acls_deleted_acl_based(self):
set_module_args(
dict(config=[
dict(afi="ipv4",
acls=[
dict(name="110",
aces=[
dict(
grant="deny",
protocol_options=dict(
icmp=dict(echo="true")
),
source=dict(
address="192.0.2.0",
wildcard_bits="0.0.0.255"
),
destination=dict(
address="192.0.3.0",
wildcard_bits="0.0.0.255"
),
dscp="ef",
ttl=dict(eq=10)
)
])
]),
dict(afi="ipv6",
acls=[
dict(name="R1_TRAFFIC",
aces=[
dict(
grant="deny",
protocol_options=dict(tcp=dict(ack="true")),
source=dict(
any="true",
port_protocol=dict(eq="www")
),
destination=dict(
any="true",
port_protocol=dict(eq="telnet")
),
dscp="af11"
)
])
])
], state="deleted"
))
result = self.execute_module(changed=True)
commands = [
'no ip access-list extended 110',
'no ipv6 access-list R1_TRAFFIC',
]
self.assertEqual(result['commands'], commands)
def test_ios_acls_rendered(self):
set_module_args(
dict(config=[
dict(afi="ipv4",
acls=[
dict(name="110",
aces=[
dict(
grant="deny",
sequence="10",
protocol_options=dict(
tcp=dict(syn="true")
),
source=dict(
address="192.0.2.0",
wildcard_bits="0.0.0.255"
),
destination=dict(
address="192.0.3.0",
wildcard_bits="0.0.0.255",
port_protocol=dict(eq="www")
),
dscp="ef",
ttl=dict(eq=10)
)
])
])
], state="rendered"))
commands = [
'ip access-list extended 110',
'10 deny tcp 192.0.2.0 0.0.0.255 192.0.3.0 0.0.0.255 eq www syn dscp ef ttl eq 10'
]
result = self.execute_module(changed=False)
self.assertEqual(result['rendered'], commands)
def test_ios_acls_parsed(self):
set_module_args(
dict(running_config="ipv6 access-list R1_TRAFFIC\ndeny tcp any eq www any eq telnet ack dscp af11",
state="parsed"))
result = self.execute_module(changed=False)
parsed_list = [
{
"acls": [
{
"aces": [
{
"destination": {
"any": True,
"port_protocol": {
"eq": "telnet"
}
},
"dscp": "af11",
"grant": "deny",
"protocol": "tcp",
"protocol_options": {
"tcp": {
"ack": True
}
},
"source": {
"any": True,
"port_protocol": {
"eq": "www"
}
}
}
],
"name": "R1_TRAFFIC"
}
],
"afi": "ipv6"
}
]
self.assertEqual(parsed_list, result['parsed'])