Apcon modules (#62028)

* Add APCON modules

* Fix cli regex

* revise for passing sanity tests

* Add unit test for apcon_command module
Remove apconos_config.py apconos_update.py apconos_cert.py for now

* Fix for santiy test

* test

* Ignore action-plugin-docs test

* Add apcon_command module

* conflict solved

* merge

* Fix for sanity test

* Revise integration test

* Mark integration test unstable

* Remove integration test temprorily

* 1. Ignored privilege escalation;
2. Ignored get_configure command;
3. Removed doc_fragments;
This commit is contained in:
David 2019-09-23 13:42:17 -07:00 committed by Ganesh Nalawade
parent b17581a307
commit a2846613b8
11 changed files with 622 additions and 0 deletions

View file

@ -0,0 +1,113 @@
# This code is part of Ansible, but is an independent component.
# This particular file snippet, and this file snippet only, is BSD licensed.
# Modules you write using this snippet, which is embedded dynamically by
# Ansible still belong to the author of the module, and may assign their own
# license to the complete work.
#
# Copyright (C) 2019 APCON, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Contains utility methods
# APCON Networking
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ansible.module_utils._text import to_text
from ansible.module_utils.network.common.utils import EntityCollection
from ansible.module_utils.connection import Connection, exec_command
from ansible.module_utils.connection import ConnectionError
_DEVICE_CONFIGS = {}
_CONNECTION = None
command_spec = {
'command': dict(key=True),
}
def check_args(module, warnings):
pass
def get_connection(module):
global _CONNECTION
if _CONNECTION:
return _CONNECTION
_CONNECTION = Connection(module._socket_path)
return _CONNECTION
def get_config(module, flags=None):
flags = [] if flags is None else flags
cmd = ' '.join(flags).strip()
try:
return _DEVICE_CONFIGS[cmd]
except KeyError:
conn = get_connection(module)
out = conn.get(cmd)
cfg = to_text(out, errors='surrogate_then_replace').strip()
_DEVICE_CONFIGS[cmd] = cfg
return cfg
def run_commands(module, commands, check_rc=True):
connection = get_connection(module)
transform = EntityCollection(module, command_spec)
commands = transform(commands)
responses = list()
for cmd in commands:
out = connection.get(**cmd)
responses.append(to_text(out, errors='surrogate_then_replace'))
return responses
def load_config(module, config):
try:
conn = get_connection(module)
conn.edit_config(config)
except ConnectionError as exc:
module.fail_json(msg=to_text(exc))
def get_defaults_flag(module):
rc, out, err = exec_command(module, 'display running-config ?')
out = to_text(out, errors='surrogate_then_replace')
commands = set()
for line in out.splitlines():
if line:
commands.add(line.strip().split()[0])
if 'all' in commands:
return 'all'
else:
return 'full'

View file

@ -0,0 +1,201 @@
#!/usr/bin/python
#
# Copyright (C) 2019 APCON.
#
# GNU General Public License v3.0+
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
#
# Module to execute apconos Commands on Apcon Switches.
# Apcon Networking
from __future__ import absolute_import, division, print_function
__metaclass__ = type
ANSIBLE_METADATA = {'metadata_version': '1.1',
'status': ['preview'],
'supported_by': 'community'}
DOCUMENTATION = """
---
module: apconos_command
version_added: "2.10"
author: "David Lee (@davidlee-ap)"
short_description: Run arbitrary commands on APCON devices
description:
- Sends arbitrary commands to an apcon device and returns the results
read from the device. The module includes an argument that will
cause the module to wait for a specific condition before returning
or timing out if the condition is not met.
notes:
- Tested against apcon iis+ii
options:
commands:
description:
- List of commands to send to the remote device over the
configured provider. The resulting output from the command
is returned. If the I(wait_for) argument is provided, the
module is not returned until the condition is satisfied or
the number of retires as expired.
required: true
type: list
wait_for:
description:
- List of conditions to evaluate against the output of the
command. The task will wait for each condition to be true
before moving forward. If the conditional is not true
within the configured number of retries, the task fails.
See examples.
type: list
match:
description:
- The I(match) argument is used in conjunction with the
I(wait_for) argument to specify the match policy. Valid
values are C(all) or C(any). If the value is set to C(all)
then all conditionals in the wait_for must be satisfied. If
the value is set to C(any) then only one of the values must be
satisfied.
default: all
choices: ['any', 'all']
type: str
retries:
description:
- Specifies the number of retries a command should by tried
before it is considered failed. The command is run on the
target device every retry and evaluated against the
I(wait_for) conditions.
default: 10
type: int
interval:
description:
- Configures the interval in seconds to wait between retries
of the command. If the command does not pass the specified
conditions, the interval indicates how long to wait before
trying the command again.
default: 1
type: int
"""
EXAMPLES = """
- name: Basic Configuration
apconos_command:
commands:
- show version
- enable ssh
register: result
- name: Get output from single command
apconos_command:
commands: ['show version']
register: result
"""
RETURN = """
"""
import time
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.network.common.utils import to_lines
from ansible.module_utils.network.apconos.apconos import run_commands
from ansible.module_utils.network.common.parsing import Conditional
def parse_commands(module, warnings):
commands = module.params['commands']
if module.check_mode:
for item in list(commands):
if not item.startswith('show'):
warnings.append(
'Only show commands are supported when using check mode, not '
'executing %s' % item
)
commands.remove(item)
return commands
def main():
spec = dict(
commands=dict(type='list', required=True),
wait_for=dict(type='list'),
match=dict(default='all', choices=['all', 'any']),
retries=dict(default=10, type='int'),
interval=dict(default=1, type='int')
)
module = AnsibleModule(argument_spec=spec, supports_check_mode=False)
warnings = list()
result = {'changed': False, 'warnings': warnings}
wait_for = module.params['wait_for'] or list()
conditionals = [Conditional(c) for c in wait_for]
commands = parse_commands(module, warnings)
commands = module.params['commands']
retries = module.params['retries']
interval = module.params['interval']
match = module.params['match']
while retries > 0:
responses = run_commands(module, commands)
for item in list(conditionals):
if item(responses):
if match == 'any':
conditionals = list()
break
conditionals.remove(item)
if not conditionals:
break
time.sleep(interval)
retries -= 1
if conditionals:
failed_conditions = [item.raw for item in conditionals]
msg = 'One or more conditional statements have not been satisfied'
module.fail_json(msg=msg, failed_conditions=failed_conditions)
for item in responses:
if len(item) == 0:
if module.check_mode:
result.update({
'changed': False,
'stdout': responses,
'stdout_lines': list(to_lines(responses))
})
else:
result.update({
'changed': True,
'stdout': responses,
'stdout_lines': list(to_lines(responses))
})
elif 'ERROR' in item:
result.update({
'failed': True,
'stdout': responses,
'stdout_lines': list(to_lines(responses))
})
else:
result.update({
'stdout': item,
'stdout_lines': list(to_lines(responses))
})
module.exit_json(**result)
if __name__ == '__main__':
main()

View file

@ -0,0 +1,73 @@
# (C) 2018 Red Hat Inc.
# Copyright (C) 2019 APCON.
#
# GNU General Public License v3.0+
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
#
# Contains CLIConf Plugin methods for apconos Modules
# APCON Networking
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
DOCUMENTATION = """
---
author: "David Li (@davidlee-ap)"
cliconf: apconos
short_description: Use apconos cliconf to run command on APCON network devices
description:
- This apconos plugin provides low level abstraction apis for
sending and receiving CLI commands from APCON network devices.
version_added: "2.9"
"""
import re
import json
from itertools import chain
from ansible.module_utils._text import to_bytes, to_text
from ansible.module_utils.network.common.utils import to_list
from ansible.plugins.cliconf import CliconfBase, enable_mode
class Cliconf(CliconfBase):
def get_device_info(self):
device_info = {}
device_info['network_os'] = 'apconos'
reply = self.get(b'show version')
data = to_text(reply, errors='surrogate_or_strict').strip()
if data:
device_info['network_os_version'] = self.parse_version(data)
device_info['network_os_model'] = self.parse_model(data)
return device_info
def parse_version(self, data):
return ""
def parse_model(self, data):
return ""
@enable_mode
def get_config(self, source='running', format='text'):
pass
@enable_mode
def edit_config(self, command):
for cmd in chain([b'configure terminal'], to_list(command), [b'end']):
self.send_command(cmd)
def get(self, command, prompt=None, answer=None, sendonly=False, check_all=False):
return self.send_command(command=command, prompt=prompt, answer=answer, sendonly=sendonly, check_all=check_all)
def get_capabilities(self):
return json.dumps(self.get_device_info())

View file

@ -0,0 +1,35 @@
# (C) 2017 Red Hat Inc.
# Copyright (C) 2019 APCON.
#
# GNU General Public License v3.0+
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
#
# Contains terminal Plugin methods for apconos Config Module
# Apcon Networking
#
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import json
import re
from ansible.errors import AnsibleConnectionFailure
from ansible.module_utils._text import to_text, to_bytes
from ansible.plugins.terminal import TerminalBase
class TerminalModule(TerminalBase):
terminal_stdout_re = [
re.compile(br'>>\ |#\ |\$\ ')
]
terminal_stderr_re = [
re.compile(br"connection timed out", re.I),
]

View file

@ -0,0 +1,88 @@
# (c) 2019 Red Hat Inc.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import json
from units.modules.utils import AnsibleExitJson, AnsibleFailJson, ModuleTestCase
fixture_path = os.path.join(os.path.dirname(__file__), 'fixtures')
fixture_data = {}
def load_fixture(name):
path = os.path.join(fixture_path, name)
if path in fixture_data:
return fixture_data[path]
with open(path) as f:
data = f.read()
try:
data = json.loads(data)
except Exception:
pass
fixture_data[path] = data
return data
class TestApconosModule(ModuleTestCase):
def execute_module(self, failed=False, changed=False, commands=None, sort=True, defaults=False):
self.load_fixtures(commands)
if failed:
result = self.failed()
self.assertTrue(result['failed'], result)
else:
result = self.changed(changed)
self.assertEqual(result['changed'], changed, result)
if commands is not None:
if sort:
self.assertEqual(sorted(commands), sorted(result['commands']), result['commands'])
else:
self.assertEqual(commands, result['commands'], result['commands'])
return result
def failed(self):
with self.assertRaises(AnsibleFailJson) as exc:
self.module.main()
result = exc.exception.args[0]
self.assertTrue(result['failed'], result)
return result
def changed(self, changed=False):
with self.assertRaises(AnsibleExitJson) as exc:
self.module.main()
result = exc.exception.args[0]
self.assertEqual(result['changed'], changed, result)
return result
def load_fixtures(self, commands=None):
pass

View file

@ -0,0 +1,2 @@
APCON
COMPONENT MODEL VERSION

View file

@ -0,0 +1,110 @@
# (c) 2019 Red Hat Inc.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import json
from units.compat.mock import patch
from ansible.modules.network.apconos import apconos_command
from units.modules.utils import set_module_args
from .apconos_module import TestApconosModule, load_fixture
class TestApconosCommandModule(TestApconosModule):
module = apconos_command
def setUp(self):
super(TestApconosCommandModule, self).setUp()
self.mock_run_commands = patch('ansible.modules.network.apconos.apconos_command.run_commands')
self.run_commands = self.mock_run_commands.start()
def tearDown(self):
super(TestApconosCommandModule, self).tearDown()
self.mock_run_commands.stop()
def load_fixtures(self, commands=None):
def load_from_file(*args, **kwargs):
module, commands = args
output = list()
for item in commands:
filename = str(item).replace(' ', '_')
output.append(load_fixture(filename))
return output
self.run_commands.side_effect = load_from_file
def test_apcon_command_simple(self):
set_module_args(dict(commands=['show version']))
result = self.execute_module()
self.assertEqual(len(result['stdout_lines']), 1)
self.assertEqual(result['stdout_lines'][0][0], 'APCON')
def test_apcon_command_multiple(self):
set_module_args(dict(commands=['show version', 'show version']))
result = self.execute_module()
self.assertEqual(len(result['stdout_lines']), 2)
self.assertEqual(result['stdout_lines'][0][0], 'APCON')
self.assertEqual(result['stdout_lines'][1][0], 'APCON')
def test_apcon_command_wait_for(self):
wait_for = 'result[0] contains "APCON"'
set_module_args(dict(commands=['show version'], wait_for=wait_for))
self.execute_module()
def test_apcon_command_wait_for_fails(self):
wait_for = 'result[0] contains "test string"'
set_module_args(dict(commands=['show version'], wait_for=wait_for))
self.execute_module(failed=True)
self.assertEqual(self.run_commands.call_count, 10)
def test_apcon_command_retries(self):
wait_for = 'result[0] contains "test string"'
set_module_args(dict(commands=['show version'], wait_for=wait_for, retries=2))
self.execute_module(failed=True)
self.assertEqual(self.run_commands.call_count, 2)
def test_apcon_command_match_any(self):
wait_for = ['result[0] contains "test string"',
'result[0] contains "VERSION"']
set_module_args(dict(commands=['show version'], wait_for=wait_for, match='any'))
self.execute_module()
def test_apcon_command_match_all(self):
wait_for = ['result[0] contains "COMPONENT"',
'result[0] contains "MODEL"',
'result[0] contains "VERSION"']
set_module_args(dict(commands=['show version'], wait_for=wait_for, match='all'))
self.execute_module()
def test_apcon_command_match_all_failure(self):
wait_for = ['result[0] contains "APCON OS"',
'result[0] contains "test string"']
commands = ['show version', 'show version']
set_module_args(dict(commands=commands, wait_for=wait_for, match='all'))
self.execute_module(failed=True)
def test_apcon_command_checkmode_not_warning(self):
commands = ['enable ssh']
set_module_args(dict(commands=commands, _ansible_check_mode=False))
result = self.execute_module(changed=True)
self.assertEqual(result['warnings'], [])