[CLI_CONF] Refactor nxos module common library to use cliconf plugin (#31524)

* refactor nxos modules

Signed-off-by: Trishna Guha <trishnaguha17@gmail.com>

* get_config update

* fix get_config

* update nxos module

* device_info

* fix conflict

* add failure message check

* pep8 fixes

* use get_capabilities to check cliconf

* Add logic to detect platform in get_capabilities and cache in module_utils

* return msg in edit_config

* fix conflicts

* make modules compatible

* fix nxos cliconf api

Signed-off-by: Trishna Guha <trishnaguha17@gmail.com>

* cache capabilities

* Update transport queries to get_capabilities in module level

* fix unit tests

* load_config error code

* remove unnecessary change

* Refactor nxos_install_os module
This commit is contained in:
Trishna Guha 2017-12-19 12:22:33 +05:30 committed by GitHub
parent bd399ec674
commit 9f86b923e9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 220 additions and 132 deletions

View file

@ -29,11 +29,13 @@
#
import collections
import json
import re
from ansible.module_utils._text import to_text
from ansible.module_utils.basic import env_fallback, return_values
from ansible.module_utils.network.common.utils import to_list, ComplexList
from ansible.module_utils.connection import exec_command
from ansible.module_utils.connection import Connection, ConnectionError
from ansible.module_utils.six import iteritems, string_types
from ansible.module_utils.urls import fetch_url
@ -107,39 +109,34 @@ class Cli:
def __init__(self, module):
self._module = module
self._device_configs = {}
self._connection = None
def exec_command(self, command):
if isinstance(command, dict):
command = self._module.jsonify(command)
return exec_command(self._module, command)
def _get_connection(self):
if self._connection:
return self._connection
self._connection = Connection(self._module._socket_path)
return self._connection
def get_config(self, flags=None):
"""Retrieves the current config from the device or cache
"""
flags = [] if flags is None else flags
cmd = 'show running-config '
cmd += ' '.join(flags)
cmd = cmd.strip()
try:
return self._device_configs[cmd]
except KeyError:
rc, out, err = self.exec_command(cmd)
if rc != 0:
self._module.fail_json(msg=to_text(err))
try:
cfg = to_text(out, errors='surrogate_or_strict').strip()
except UnicodeError as e:
self._module.fail_json(msg=u'Failed to decode config: %s' % to_text(out))
self._device_configs[cmd] = cfg
if self._device_configs != {}:
return self._device_configs
else:
connection = self._get_connection()
out = connection.get_config(flags=flags)
cfg = to_text(out, errors='surrogate_then_replace').strip()
self._device_configs = cfg
return cfg
def run_commands(self, commands, check_rc=True):
"""Run list of commands on remote device and return results
"""
responses = list()
connection = self._get_connection()
for item in to_list(commands):
if item['output'] == 'json' and not is_json(item['command']):
@ -149,29 +146,34 @@ class Cli:
else:
cmd = item['command']
rc, out, err = self.exec_command(cmd)
out = ''
try:
out = connection.get(cmd)
code = 0
except ConnectionError as e:
code = getattr(e, 'code', 1)
message = getattr(e, 'err', e)
err = to_text(message, errors='surrogate_then_replace')
try:
out = to_text(out, errors='surrogate_or_strict')
except UnicodeError:
self._module.fail_json(msg=u'Failed to decode output from %s: %s' % (cmd, to_text(out)))
if check_rc and rc != 0:
self._module.fail_json(msg=to_text(err))
if check_rc and code != 0:
self._module.fail_json(msg=err)
if not check_rc and rc != 0:
if not check_rc and code != 0:
try:
out = self._module.from_json(err)
except ValueError:
out = to_text(err).strip()
out = to_text(message).strip()
else:
try:
out = self._module.from_json(out)
except ValueError:
out = to_text(out).strip()
if item['output'] == 'json' and out != '' and isinstance(out, string_types):
self._module.fail_json(msg='failed to retrieve output of %s in json format' % item['command'])
responses.append(out)
return responses
@ -181,24 +183,37 @@ class Cli:
if opts is None:
opts = {}
rc, out, err = self.exec_command('configure')
if rc != 0:
self._module.fail_json(msg='unable to enter configuration mode', output=to_text(err))
connection = self._get_connection()
msgs = []
for cmd in config:
rc, out, err = self.exec_command(cmd)
if opts.get('ignore_timeout') and rc == 1:
msgs.append(err)
try:
responses = connection.edit_config(config)
out = json.loads(responses)[1:-1]
msg = out
except ConnectionError as e:
code = getattr(e, 'code', 1)
message = getattr(e, 'err', e)
err = to_text(message, errors='surrogate_then_replace')
if opts.get('ignore_timeout') and code:
msgs.append(code)
return msgs
elif rc != 0:
self._module.fail_json(msg=to_text(err))
elif out:
msgs.append(out)
elif code:
self._module.fail_json(msg=err)
self.exec_command('end')
msgs.extend(msg)
return msgs
def get_capabilities(self):
"""Returns platform info of the remove device
"""
if hasattr(self._module, '_capabilities'):
return self._module._capabilities
connection = self._get_connection()
capabilities = connection.get_capabilities()
self._module._capabilities = json.loads(capabilities)
return self._module._capabilities
class Nxapi:
@ -301,8 +316,8 @@ class Nxapi:
)
self._nxapi_auth = headers.get('set-cookie')
if opts.get('ignore_timeout') and headers['status'] == -1:
result.append(headers['msg'])
if opts.get('ignore_timeout') and re.search(r'(-1|5\d\d)', str(headers['status'])):
result.append(headers['status'])
return result
elif headers['status'] != 200:
self._error(**headers)
@ -384,6 +399,9 @@ class Nxapi:
else:
return []
def get_capabilities(self):
return {}
def is_json(cmd):
return str(cmd).endswith('| json')
@ -425,7 +443,7 @@ def get_config(module, flags=None):
flags = [] if flags is None else flags
conn = get_connection(module)
return conn.get_config(flags)
return conn.get_config(flags=flags)
def run_commands(module, commands, check_rc=True):
@ -436,3 +454,8 @@ def run_commands(module, commands, check_rc=True):
def load_config(module, config, return_error=False, opts=None):
conn = get_connection(module)
return conn.load_config(config, return_error, opts)
def get_capabilities(module):
conn = get_connection(module)
return conn.get_capabilities()

View file

@ -156,18 +156,20 @@ changed:
import re
from ansible.module_utils.network.nxos.nxos import load_config, run_commands
from ansible.module_utils.network.nxos.nxos import nxos_argument_spec, check_args
from ansible.module_utils.network.nxos.nxos import get_capabilities, nxos_argument_spec
from ansible.module_utils.basic import AnsibleModule
def execute_show_command(command, module, command_type='cli_show'):
provider = module.params['provider']
if provider['transport'] == 'cli':
device_info = get_capabilities(module)
network_api = device_info.get('network_api', 'nxapi')
if network_api == 'cliconf':
if 'show run' not in command:
command += ' | json'
cmds = [command]
body = run_commands(module, cmds)
elif provider['transport'] == 'nxapi':
elif network_api == 'nxapi':
cmds = {'command': command, 'output': 'text'}
body = run_commands(module, cmds)
@ -279,7 +281,6 @@ def main():
supports_check_mode=True)
warnings = list()
check_args(module, warnings)
server_type = module.params['server_type']
address = module.params['address']

View file

@ -683,7 +683,7 @@ def main():
if candidate:
candidate = candidate.items_text()
warnings.extend(load_config(module, candidate))
load_config(module, candidate)
result['changed'] = True
result['commands'] = candidate
else:

View file

@ -493,7 +493,7 @@ def main():
if candidate:
candidate = candidate.items_text()
warnings.extend(load_config(module, candidate))
load_config(module, candidate)
result['changed'] = True
result['commands'] = candidate
else:

View file

@ -728,7 +728,7 @@ def main():
if candidate:
candidate = candidate.items_text()
warnings.extend(load_config(module, candidate))
load_config(module, candidate)
result['changed'] = True
result['commands'] = candidate
else:

View file

@ -184,11 +184,13 @@ class FactsBase(object):
def populate(self):
pass
def run(self, command, output=None):
def run(self, command, output='text'):
command_string = command
if output:
command = {'command': command, 'output': output}
resp = run_commands(self.module, command, check_rc=False)
command = {
'command': command,
'output': output
}
resp = run_commands(self.module, [command], check_rc=False)
try:
return resp[0]
except IndexError:
@ -226,7 +228,7 @@ class Default(FactsBase):
])
def populate(self):
data = self.run('show version', 'json')
data = self.run('show version', output='json')
if data:
if data.get('sys_ver_str'):
self.facts.update(self.transform_dict(data, self.VERSION_MAP_7K))
@ -244,11 +246,11 @@ class Config(FactsBase):
class Hardware(FactsBase):
def populate(self):
data = self.run('dir', 'text')
data = self.run('dir')
if data:
self.facts['filesystems'] = self.parse_filesystems(data)
data = self.run('show system resources', 'json')
data = self.run('show system resources', output='json')
if data:
self.facts['memtotal_mb'] = int(data['memory_usage_total']) / 1024
self.facts['memfree_mb'] = int(data['memory_usage_free']) / 1024
@ -282,7 +284,7 @@ class Interfaces(FactsBase):
])
def ipv6_structure_op_supported(self):
data = self.run('show version', 'json')
data = self.run('show version', output='json')
if data:
unsupported_versions = ['I2', 'F1', 'A8']
for ver in unsupported_versions:
@ -294,11 +296,11 @@ class Interfaces(FactsBase):
self.facts['all_ipv4_addresses'] = list()
self.facts['all_ipv6_addresses'] = list()
data = self.run('show interface', 'json')
data = self.run('show interface', output='json')
if data:
self.facts['interfaces'] = self.populate_interfaces(data)
data = self.run('show ipv6 interface', 'json') if self.ipv6_structure_op_supported() else None
data = self.run('show ipv6 interface', output='json') if self.ipv6_structure_op_supported() else None
if data and not isinstance(data, string_types):
self.parse_ipv6_interfaces(data)
@ -306,7 +308,7 @@ class Interfaces(FactsBase):
if data:
self.facts['neighbors'] = self.populate_neighbors(data)
data = self.run('show cdp neighbors detail', 'json')
data = self.run('show cdp neighbors detail', output='json')
if data:
self.facts['neighbors'] = self.populate_neighbors_cdp(data)
@ -428,23 +430,23 @@ class Legacy(FactsBase):
])
def populate(self):
data = self.run('show version', 'json')
data = self.run('show version', output='json')
if data:
self.facts.update(self.transform_dict(data, self.VERSION_MAP))
data = self.run('show interface', 'json')
data = self.run('show interface', output='json')
if data:
self.facts['_interfaces_list'] = self.parse_interfaces(data)
data = self.run('show vlan brief', 'json')
data = self.run('show vlan brief', output='json')
if data:
self.facts['_vlan_list'] = self.parse_vlans(data)
data = self.run('show module', 'json')
data = self.run('show module', output='json')
if data:
self.facts['_module'] = self.parse_module(data)
data = self.run('show environment', 'json')
data = self.run('show environment', output='json')
if data:
self.facts['_fan_info'] = self.parse_fan_info(data)
self.facts['_power_supply_info'] = self.parse_power_supply_info(data)

View file

@ -165,16 +165,18 @@ changed:
import re
from ansible.module_utils.network.nxos.nxos import get_config, load_config, run_commands
from ansible.module_utils.network.nxos.nxos import nxos_argument_spec, check_args
from ansible.module_utils.network.nxos.nxos import get_capabilities, nxos_argument_spec
from ansible.module_utils.basic import AnsibleModule
def execute_show_command(command, module, command_type='cli_show_ascii'):
cmds = [command]
provider = module.params['provider']
if provider['transport'] == 'cli':
device_info = get_capabilities(module)
network_api = device_info.get('network_api', 'nxapi')
if network_api == 'cliconf':
body = run_commands(module, cmds)
elif provider['transport'] == 'nxapi':
elif network_api == 'nxapi':
body = run_commands(module, cmds)
return body
@ -292,7 +294,6 @@ def main():
supports_check_mode=True)
warnings = list()
check_args(module, warnings)
state = module.params['state']
mode = get_system_mode(module)

View file

@ -124,17 +124,19 @@ commands:
'''
from ansible.module_utils.network.nxos.nxos import load_config, run_commands
from ansible.module_utils.network.nxos.nxos import nxos_argument_spec, check_args
from ansible.module_utils.network.nxos.nxos import get_capabilities, nxos_argument_spec
from ansible.module_utils.basic import AnsibleModule
def execute_show_command(command, module):
provider = module.params['provider']
if provider['transport'] == 'cli':
device_info = get_capabilities(module)
network_api = device_info.get('network_api', 'nxapi')
if network_api == 'cliconf':
command += ' | json'
cmds = [command]
body = run_commands(module, cmds)
elif provider['transport'] == 'nxapi':
elif network_api == 'nxapi':
cmds = [command]
body = run_commands(module, cmds)
@ -382,7 +384,6 @@ def main():
module = AnsibleModule(argument_spec=argument_spec, supports_check_mode=True)
warnings = list()
check_args(module, warnings)
results = dict(changed=False, warnings=warnings)
interface = module.params['interface'].lower()
@ -395,7 +396,8 @@ def main():
auth_type = module.params['auth_type']
auth_string = module.params['auth_string']
transport = module.params['provider']['transport']
device_info = get_capabilities(module)
network_api = device_info.get('network_api', 'nxapi')
if state == 'present' and not vip:
module.fail_json(msg='the "vip" param is required when state=present')
@ -405,7 +407,7 @@ def main():
validate_params(param, module)
intf_type = get_interface_type(interface)
if (intf_type != 'ethernet' and transport == 'cli'):
if (intf_type != 'ethernet' and network_api == 'cliconf'):
if is_default(interface, module) == 'DNE':
module.fail_json(msg='That interface does not exist yet. Create '
'it first.', interface=interface)
@ -463,7 +465,7 @@ def main():
load_config(module, commands)
# validate IP
if transport == 'cli' and state == 'present':
if network_api == 'cliconf' and state == 'present':
commands.insert(0, 'config t')
body = run_commands(module, commands)
validate_config(body, vip, module)

View file

@ -238,15 +238,27 @@ def parse_show_install(data):
if len(data) > 0:
data = massage_install_data(data)
ud = {'raw': data}
ud['list_data'] = data.split('\n')
ud['processed'] = []
ud['disruptive'] = False
ud['upgrade_needed'] = False
ud['error'] = False
ud['install_in_progress'] = False
ud['backend_processing_error'] = False
ud['server_error'] = False
ud['upgrade_succeeded'] = False
ud['use_impact_data'] = False
# Check for server errors
if isinstance(data, int):
if data == -1:
ud['server_error'] = True
elif data >= 500:
ud['server_error'] = True
elif data == -32603:
ud['server_error'] = True
return ud
else:
ud['list_data'] = data.split('\n')
for x in ud['list_data']:
# Check for errors and exit if found.
if re.search(r'Pre-upgrade check failed', x):
@ -264,7 +276,10 @@ def parse_show_install(data):
ud['install_in_progress'] = True
break
if re.search(r'Backend processing error', x):
ud['backend_processing_error'] = True
ud['server_error'] = True
break
if re.search(r'^(-1|5\d\d)$', x):
ud['server_error'] = True
break
# Check for messages indicating a successful upgrade.
@ -465,7 +480,7 @@ def check_install_in_progress(module, commands, opts):
def check_mode(module, issu, image, kick=None):
"""Check switch upgrade impact using 'show install all impact' command"""
data = check_mode_nextgen(module, issu, image, kick)
if data['backend_processing_error']:
if data['server_error']:
# We encountered an unrecoverable error in the attempt to get upgrade
# impact data from the 'show install all impact' command.
# Fallback to legacy method.
@ -500,11 +515,11 @@ def do_install_all(module, issu, image, kick=None):
# it's done.
upgrade = check_install_in_progress(module, commands, opts)
# Special case: If we encounter a backend processing error at this
# stage it means the command was sent and the upgrade was started but
# Special case: If we encounter a server error at this stage
# it means the command was sent and the upgrade was started but
# we will need to use the impact data instead of the current install
# data.
if upgrade['backend_processing_error']:
if upgrade['server_error']:
upgrade['upgrade_succeeded'] = True
upgrade['use_impact_data'] = True

View file

@ -176,7 +176,7 @@ except ImportError:
HAS_IPADDRESS = False
from ansible.module_utils.network.nxos.nxos import load_config, run_commands
from ansible.module_utils.network.nxos.nxos import nxos_argument_spec, check_args
from ansible.module_utils.network.nxos.nxos import get_capabilities, nxos_argument_spec
from ansible.module_utils.basic import AnsibleModule
@ -457,6 +457,9 @@ def flatten_list(command_lists):
def validate_params(addr, interface, mask, dot1q, tag, allow_secondary, version, state, intf_type, module):
device_info = get_capabilities(module)
network_api = device_info.get('network_api', 'nxapi')
if state == "present":
if addr is None or mask is None:
module.fail_json(msg="An IP address AND a mask must be provided "
@ -466,7 +469,7 @@ def validate_params(addr, interface, mask, dot1q, tag, allow_secondary, version,
module.fail_json(msg="IPv6 address and mask must be provided when "
"state=absent.")
if intf_type != "ethernet" and module.params["provider"]["transport"] == "cli":
if intf_type != "ethernet" and network_api == 'cliconf':
if is_default(interface, module) == "DNE":
module.fail_json(msg="That interface does not exist yet. Create "
"it first.", interface=interface)
@ -538,7 +541,6 @@ def main():
module.fail_json(msg="ipaddress is required for this module. Run 'pip install ipaddress' for install.")
warnings = list()
check_args(module, warnings)
addr = module.params['addr']
version = module.params['version']

View file

@ -126,18 +126,17 @@ import re
from ansible.module_utils.network.nxos.nxos import run_commands, load_config
from ansible.module_utils.network.nxos.nxos import nxos_argument_spec
from ansible.module_utils.network.nxos.nxos import check_args as nxos_check_args
from ansible.module_utils.network.nxos.nxos import get_capabilities
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.six import iteritems
def check_args(module, warnings):
provider = module.params['provider']
if provider['transport'] == 'nxapi':
device_info = get_capabilities(module)
network_api = device_info.get('network_api', 'nxapi')
if network_api == 'nxapi':
module.fail_json(msg='module not supported over nxapi transport')
nxos_check_args(module, warnings)
state = module.params['state']
if state == 'started':

View file

@ -99,7 +99,7 @@ import collections
import re
from ansible.module_utils.network.nxos.nxos import get_config, load_config, run_commands
from ansible.module_utils.network.nxos.nxos import nxos_argument_spec, check_args
from ansible.module_utils.network.nxos.nxos import get_capabilities, nxos_argument_spec
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.network.common.config import CustomNetworkConfig
@ -142,13 +142,15 @@ def get_custom_value(arg, config, module):
def execute_show_command(command, module):
provider = module.params['provider']
if provider['transport'] == 'cli':
device_info = get_capabilities(module)
network_api = device_info.get('network_api', 'nxapi')
if network_api == 'cliconf':
if 'show port-channel summary' in command:
command += ' | json'
cmds = [command]
body = run_commands(module, cmds)
elif provider['transport'] == 'nxapi':
elif network_api == 'nxapi':
cmds = [command]
body = run_commands(module, cmds)
@ -446,7 +448,6 @@ def main():
module = AnsibleModule(argument_spec=argument_spec, supports_check_mode=True)
warnings = list()
check_args(module, warnings)
results = dict(changed=False, warnings=warnings)
group = str(module.params['group'])

View file

@ -127,7 +127,7 @@ commands:
'''
from ansible.module_utils.network.nxos.nxos import load_config, run_commands
from ansible.module_utils.network.nxos.nxos import nxos_argument_spec, check_args
from ansible.module_utils.network.nxos.nxos import get_capabilities, nxos_argument_spec
from ansible.module_utils.basic import AnsibleModule
@ -434,12 +434,14 @@ def apply_value_map(value_map, resource):
def execute_show_command(command, module, command_type='cli_show'):
provider = module.params['provider']
if provider['transport'] == 'cli':
device_info = get_capabilities(module)
network_api = device_info.get('network_api', 'nxapi')
if network_api == 'cliconf':
command += ' | json'
cmds = [command]
body = run_commands(module, cmds)
elif provider['transport'] == 'nxapi':
elif network_api == 'nxapi':
cmds = [command]
body = run_commands(module, cmds)
@ -479,7 +481,6 @@ def main():
warnings = list()
commands = []
results = {'changed': False}
check_args(module, warnings)
interface = module.params['interface']
mode = module.params['mode']

View file

@ -110,24 +110,23 @@ changed:
sample: true
'''
import re
from ansible.module_utils.network.nxos.nxos import get_config, load_config, run_commands
from ansible.module_utils.network.nxos.nxos import nxos_argument_spec, check_args
from ansible.module_utils.network.nxos.nxos import get_capabilities, nxos_argument_spec
from ansible.module_utils.basic import AnsibleModule
import re
import re
def execute_show_command(command, module, command_type='cli_show'):
provider = module.params['provider']
if provider['transport'] == 'cli':
device_info = get_capabilities(module)
network_api = device_info.get('network_api', 'nxapi')
if network_api == 'cliconf':
if 'show run' not in command:
command += ' | json'
cmds = [command]
body = run_commands(module, cmds)
elif provider['transport'] == 'nxapi':
elif network_api == 'nxapi':
cmds = [command]
body = run_commands(module, cmds)
@ -227,7 +226,6 @@ def main():
supports_check_mode=True)
warnings = list()
check_args(module, warnings)
aggressive = module.params['aggressive']
msg_time = module.params['msg_time']

View file

@ -111,18 +111,20 @@ changed:
from ansible.module_utils.network.nxos.nxos import get_config, load_config, run_commands
from ansible.module_utils.network.nxos.nxos import nxos_argument_spec, check_args
from ansible.module_utils.network.nxos.nxos import get_capabilities, nxos_argument_spec
from ansible.module_utils.basic import AnsibleModule
def execute_show_command(command, module, command_type='cli_show'):
provider = module.params['provider']
if provider['transport'] == 'cli':
device_info = get_capabilities(module)
network_api = device_info.get('network_api', 'nxapi')
if network_api == 'cliconf':
if 'show run' not in command:
command += ' | json'
cmds = [command]
body = run_commands(module, cmds)
elif provider['transport'] == 'nxapi':
elif network_api == 'nxapi':
cmds = [command]
body = run_commands(module, cmds)
@ -248,7 +250,6 @@ def main():
supports_check_mode=True)
warnings = list()
check_args(module, warnings)
interface = module.params['interface'].lower()
mode = module.params['mode']

View file

@ -81,7 +81,7 @@ commands:
import re
from ansible.module_utils.network.nxos.nxos import load_config, run_commands
from ansible.module_utils.network.nxos.nxos import nxos_argument_spec, check_args
from ansible.module_utils.network.nxos.nxos import get_capabilities, nxos_argument_spec
from ansible.module_utils.basic import AnsibleModule
@ -199,20 +199,22 @@ def main():
module = AnsibleModule(argument_spec=argument_spec, supports_check_mode=True)
warnings = list()
check_args(module, warnings)
results = {'changed': False, 'commands': [], 'warnings': warnings}
vrf = module.params['vrf']
interface = module.params['interface'].lower()
state = module.params['state']
device_info = get_capabilities(module)
network_api = device_info.get('network_api', 'nxapi')
current_vrfs = get_vrf_list(module)
if vrf not in current_vrfs:
warnings.append("The VRF is not present/active on the device. "
"Use nxos_vrf to fix this.")
intf_type = get_interface_type(interface)
if (intf_type != 'ethernet' and module.params['provider']['transport'] == 'cli'):
if (intf_type != 'ethernet' and network_api == 'cliconf'):
if is_default(interface, module) == 'DNE':
module.fail_json(msg="interface does not exist on switch. Verify "
"switch platform or create it first with "

View file

@ -114,7 +114,7 @@ commands:
'''
from ansible.module_utils.network.nxos.nxos import load_config, run_commands
from ansible.module_utils.network.nxos.nxos import nxos_argument_spec, check_args
from ansible.module_utils.network.nxos.nxos import get_capabilities, nxos_argument_spec
from ansible.module_utils.basic import AnsibleModule
@ -343,7 +343,6 @@ def main():
module = AnsibleModule(argument_spec=argument_spec, supports_check_mode=True)
warnings = list()
check_args(module, warnings)
results = {'changed': False, 'commands': [], 'warnings': warnings}
state = module.params['state']
@ -355,13 +354,14 @@ def main():
authentication = module.params['authentication']
admin_state = module.params['admin_state']
transport = module.params['transport']
device_info = get_capabilities(module)
network_api = device_info.get('network_api', 'nxapi')
if state == 'present' and not vip:
module.fail_json(msg='the "vip" param is required when state=present')
intf_type = get_interface_type(interface)
if (intf_type != 'ethernet' and transport == 'cli'):
if (intf_type != 'ethernet' and network_api == 'cliconf'):
if is_default(interface, module) == 'DNE':
module.fail_json(msg='That interface does not exist yet. Create '
'it first.', interface=interface)

View file

@ -35,21 +35,36 @@ class Cliconf(CliconfBase):
device_info['network_os'] = 'nxos'
reply = self.get(b'show version | json')
data = json.loads(reply)
platform_reply = self.get(b'show inventory | json')
platform_info = json.loads(platform_reply)
device_info['network_os_version'] = data['sys_ver_str']
device_info['network_os_version'] = data.get('sys_ver_str') or data.get('kickstart_ver_str')
device_info['network_os_model'] = data['chassis_id']
device_info['network_os_hostname'] = data['host_name']
device_info['network_os_image'] = data['isan_file_name']
device_info['network_os_image'] = data.get('isan_file_name') or data.get('kick_file_name')
inventory_table = platform_info['TABLE_inv']['ROW_inv']
for info in inventory_table:
if 'Chassis' in info['name']:
device_info['network_os_platform'] = info['productid']
return device_info
def get_config(self, source='running'):
def get_config(self, source='running', flags=None):
lookup = {'running': 'running-config', 'startup': 'startup-config'}
return self.send_command(b'show %s' % lookup[source])
cmd = b'show {} '.format(lookup[source])
cmd += ' '.join(flags)
cmd = cmd.strip()
return self.send_command(cmd)
def edit_config(self, command):
responses = []
for cmd in chain([b'configure'], to_list(command), [b'end']):
self.send_command(cmd)
responses.append(self.send_command(cmd))
return json.dumps(responses)
def get(self, command, prompt=None, answer=None, sendonly=False):
return self.send_command(command, prompt=prompt, answer=answer, sendonly=sendonly)

View file

@ -36,10 +36,15 @@ class TestNxosHsrpModule(TestNxosModule):
self.mock_load_config = patch('ansible.modules.network.nxos.nxos_hsrp.load_config')
self.load_config = self.mock_load_config.start()
self.mock_get_capabilities = patch('ansible.modules.network.nxos.nxos_hsrp.get_capabilities')
self.get_capabilities = self.mock_get_capabilities.start()
self.get_capabilities.return_value = {'network_api': 'cliconf'}
def tearDown(self):
super(TestNxosHsrpModule, self).tearDown()
self.mock_run_commands.stop()
self.mock_load_config.stop()
self.mock_get_capabilities.stop()
def load_fixtures(self, commands=None, device=''):
self.load_config.return_value = None

View file

@ -42,11 +42,16 @@ class TestNxosIPInterfaceModule(TestNxosModule):
self.mock_load_config = patch('ansible.modules.network.nxos.nxos_ip_interface.load_config')
self.load_config = self.mock_load_config.start()
self.mock_get_capabilities = patch('ansible.modules.network.nxos.nxos_ip_interface.get_capabilities')
self.get_capabilities = self.mock_get_capabilities.start()
self.get_capabilities.return_value = {'network_api': 'cliconf'}
def tearDown(self):
super(TestNxosIPInterfaceModule, self).tearDown()
self.mock_get_interface_mode.stop()
self.mock_send_show_command.stop()
self.mock_load_config.stop()
self.mock_get_capabilities.stop()
def load_fixtures(self, commands=None, device=''):
self.get_interface_mode.return_value = 'layer3'

View file

@ -37,10 +37,15 @@ class TestNxosNxapiModule(TestNxosModule):
self.mock_load_config = patch('ansible.modules.network.nxos.nxos_nxapi.load_config')
self.load_config = self.mock_load_config.start()
self.mock_get_capabilities = patch('ansible.modules.network.nxos.nxos_nxapi.get_capabilities')
self.get_capabilities = self.mock_get_capabilities.start()
self.get_capabilities.return_value = {'network_api': 'cliconf'}
def tearDown(self):
super(TestNxosNxapiModule, self).tearDown()
self.mock_run_commands.stop()
self.mock_load_config.stop()
self.mock_get_capabilities.stop()
def load_fixtures(self, commands=None, device=''):
def load_from_file(*args, **kwargs):

View file

@ -40,11 +40,16 @@ class TestNxosPortchannelModule(TestNxosModule):
self.mock_get_config = patch('ansible.modules.network.nxos.nxos_portchannel.get_config')
self.get_config = self.mock_get_config.start()
self.mock_get_capabilities = patch('ansible.modules.network.nxos.nxos_portchannel.get_capabilities')
self.get_capabilities = self.mock_get_capabilities.start()
self.get_capabilities.return_value = {'network_api': 'cliconf'}
def tearDown(self):
super(TestNxosPortchannelModule, self).tearDown()
self.mock_run_commands.stop()
self.mock_load_config.stop()
self.mock_get_config.stop()
self.mock_get_capabilities.stop()
def load_fixtures(self, commands=None, device=''):
self.load_config.return_value = None

View file

@ -37,10 +37,15 @@ class TestNxosSwitchportModule(TestNxosModule):
self.mock_load_config = patch('ansible.modules.network.nxos.nxos_switchport.load_config')
self.load_config = self.mock_load_config.start()
self.mock_get_capabilities = patch('ansible.modules.network.nxos.nxos_switchport.get_capabilities')
self.get_capabilities = self.mock_get_capabilities.start()
self.get_capabilities.return_value = {'network_api': 'cliconf'}
def tearDown(self):
super(TestNxosSwitchportModule, self).tearDown()
self.mock_run_commands.stop()
self.mock_load_config.stop()
self.mock_get_capabilities.stop()
def load_fixtures(self, commands=None, device=''):
def load_from_file(*args, **kwargs):