Migrated to netbox.netbox

This commit is contained in:
Ansible Core Team 2020-03-09 09:40:33 +00:00 committed by Matt Martz
parent 0743e733bb
commit bdd82adf61
9 changed files with 0 additions and 3006 deletions

View file

@ -1,357 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2018, Mikhail Yohman (@fragmentedpacket) <mikhail.yohman@gmail.com>
# Copyright: (c) 2018, David Gomez (@amb1s1) <david.gomez@networktocode.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
__metaclass__ = type
API_APPS_ENDPOINTS = dict(
circuits=[],
dcim=[
"devices",
"device_roles",
"device_types",
"devices",
"interfaces",
"platforms",
"racks",
"regions",
"sites",
],
extras=[],
ipam=["ip_addresses", "prefixes", "roles", "vlans", "vlan_groups", "vrfs"],
secrets=[],
tenancy=["tenants", "tenant_groups"],
virtualization=["clusters"],
)
QUERY_TYPES = dict(
cluster="name",
devices="name",
device_role="slug",
device_type="slug",
manufacturer="slug",
nat_inside="address",
nat_outside="address",
platform="slug",
primary_ip="address",
primary_ip4="address",
primary_ip6="address",
rack="slug",
region="slug",
role="slug",
site="slug",
tenant="name",
tenant_group="slug",
time_zone="timezone",
vlan="name",
vlan_group="slug",
vrf="name",
)
CONVERT_TO_ID = dict(
cluster="clusters",
device="devices",
device_role="device_roles",
device_type="device_types",
interface="interfaces",
lag="interfaces",
nat_inside="ip_addresses",
nat_outside="ip_addresses",
platform="platforms",
primary_ip="ip_addresses",
primary_ip4="ip_addresses",
primary_ip6="ip_addresses",
rack="racks",
region="regions",
role="roles",
site="sites",
tagged_vlans="vlans",
tenant="tenants",
tenant_group="tenant_groups",
untagged_vlan="vlans",
vlan="vlans",
vlan_group="vlan_groups",
vrf="vrfs",
)
FACE_ID = dict(front=0, rear=1)
NO_DEFAULT_ID = set(
[
"device",
"lag",
"primary_ip",
"primary_ip4",
"primary_ip6",
"role",
"vlan",
"vrf",
"nat_inside",
"nat_outside",
"region",
"untagged_vlan",
"tagged_vlans",
"tenant",
]
)
DEVICE_STATUS = dict(offline=0, active=1, planned=2, staged=3, failed=4, inventory=5)
IP_ADDRESS_STATUS = dict(active=1, reserved=2, deprecated=3, dhcp=5)
IP_ADDRESS_ROLE = dict(
loopback=10, secondary=20, anycast=30, vip=40, vrrp=41, hsrp=42, glbp=43, carp=44
)
PREFIX_STATUS = dict(container=0, active=1, reserved=2, deprecated=3)
VLAN_STATUS = dict(active=1, reserved=2, deprecated=3)
SITE_STATUS = dict(active=1, planned=2, retired=4)
INTF_FORM_FACTOR = {
"virtual": 0,
"link aggregation group (lag)": 200,
"100base-tx (10/100me)": 800,
"1000base-t (1ge)": 1000,
"10gbase-t (10ge)": 1150,
"10gbase-cx4 (10ge)": 1170,
"gbic (1ge)": 1050,
"sfp (1ge)": 1100,
"sfp+ (10ge)": 1200,
"xfp (10ge)": 1300,
"xenpak (10ge)": 1310,
"x2 (10ge)": 1320,
"sfp28 (25ge)": 1350,
"qsfp+ (40ge)": 1400,
"cfp (100ge)": 1500,
"cfp2 (100ge)": 1510,
"cfp2 (200ge)": 1650,
"cfp4 (100ge)": 1520,
"cisco cpak (100ge)": 1550,
"qsfp28 (100ge)": 1600,
"qsfp56 (200ge)": 1700,
"qsfp-dd (400ge)": 1750,
"ieee 802.11a": 2600,
"ieee 802.11b/g": 2610,
"ieee 802.11n": 2620,
"ieee 802.11ac": 2630,
"ieee 802.11ad": 2640,
"gsm": 2810,
"cdma": 2820,
"lte": 2830,
"oc-3/stm-1": 6100,
"oc-12/stm-4": 6200,
"oc-48/stm-16": 6300,
"oc-192/stm-64": 6400,
"oc-768/stm-256": 6500,
"oc-1920/stm-640": 6600,
"oc-3840/stm-1234": 6700,
"sfp (1gfc)": 3010,
"sfp (2gfc)": 3020,
"sfp (4gfc)": 3040,
"sfp+ (8gfc)": 3080,
"sfp+ (16gfc)": 3160,
"sfp28 (32gfc)": 3320,
"qsfp28 (128gfc)": 3400,
"t1 (1.544 mbps)": 4000,
"e1 (2.048 mbps)": 4010,
"t3 (45 mbps)": 4040,
"e3 (34 mbps)": 4050,
"cisco stackwise": 5000,
"cisco stackwise plus": 5050,
"cisco flexstack": 5100,
"cisco flexstack plus": 5150,
"juniper vcp": 5200,
"extreme summitstack": 5300,
"extreme summitstack-128": 5310,
"extreme summitstack-256": 5320,
"extreme summitstack-512": 5330,
"other": 32767,
}
INTF_MODE = {"access": 100, "tagged": 200, "tagged all": 300}
ALLOWED_QUERY_PARAMS = {
"interface": set(["name", "device"]),
"lag": set(["name"]),
"nat_inside": set(["vrf", "address"]),
"vlan": set(["name", "site", "vlan_group", "tenant"]),
"untagged_vlan": set(["name", "site", "vlan_group", "tenant"]),
"tagged_vlans": set(["name", "site", "vlan_group", "tenant"]),
}
QUERY_PARAMS_IDS = set(["vrf", "site", "vlan_group", "tenant"])
def _build_diff(before=None, after=None):
return {"before": before, "after": after}
def create_netbox_object(nb_endpoint, data, check_mode):
"""Create a Netbox object.
:returns tuple(serialized_nb_obj, diff): tuple of the serialized created
Netbox object and the Ansible diff.
"""
if check_mode:
serialized_nb_obj = data
else:
nb_obj = nb_endpoint.create(data)
try:
serialized_nb_obj = nb_obj.serialize()
except AttributeError:
serialized_nb_obj = nb_obj
diff = _build_diff(before={"state": "absent"}, after={"state": "present"})
return serialized_nb_obj, diff
def delete_netbox_object(nb_obj, check_mode):
"""Delete a Netbox object.
:returns tuple(serialized_nb_obj, diff): tuple of the serialized deleted
Netbox object and the Ansible diff.
"""
if not check_mode:
nb_obj.delete()
diff = _build_diff(before={"state": "present"}, after={"state": "absent"})
return nb_obj.serialize(), diff
def update_netbox_object(nb_obj, data, check_mode):
"""Update a Netbox object.
:returns tuple(serialized_nb_obj, diff): tuple of the serialized updated
Netbox object and the Ansible diff.
"""
serialized_nb_obj = nb_obj.serialize()
updated_obj = serialized_nb_obj.copy()
updated_obj.update(data)
if serialized_nb_obj == updated_obj:
return serialized_nb_obj, None
else:
data_before, data_after = {}, {}
for key in data:
if serialized_nb_obj[key] != updated_obj[key]:
data_before[key] = serialized_nb_obj[key]
data_after[key] = updated_obj[key]
if not check_mode:
nb_obj.update(data)
updated_obj = nb_obj.serialize()
diff = _build_diff(before=data_before, after=data_after)
return updated_obj, diff
def _get_query_param_id(nb, match, child):
endpoint = CONVERT_TO_ID[match]
app = find_app(endpoint)
nb_app = getattr(nb, app)
nb_endpoint = getattr(nb_app, endpoint)
result = nb_endpoint.get(**{QUERY_TYPES.get(match): child[match]})
if result:
return result.id
else:
return child
def find_app(endpoint):
for k, v in API_APPS_ENDPOINTS.items():
if endpoint in v:
nb_app = k
return nb_app
def build_query_params(nb, parent, module_data, child):
query_dict = dict()
query_params = ALLOWED_QUERY_PARAMS.get(parent)
matches = query_params.intersection(set(child.keys()))
for match in matches:
if match in QUERY_PARAMS_IDS:
value = _get_query_param_id(nb, match, child)
query_dict.update({match + "_id": value})
else:
value = child.get(match)
query_dict.update({match: value})
if parent == "lag":
query_dict.update({"form_factor": 200})
if isinstance(module_data["device"], int):
query_dict.update({"device_id": module_data["device"]})
else:
query_dict.update({"device": module_data["device"]})
return query_dict
def find_ids(nb, data):
for k, v in data.items():
if k in CONVERT_TO_ID:
endpoint = CONVERT_TO_ID[k]
search = v
app = find_app(endpoint)
nb_app = getattr(nb, app)
nb_endpoint = getattr(nb_app, endpoint)
if isinstance(v, dict):
query_params = build_query_params(nb, k, data, v)
query_id = nb_endpoint.get(**query_params)
elif isinstance(v, list):
id_list = list()
for index in v:
norm_data = normalize_data(index)
temp_dict = build_query_params(nb, k, data, norm_data)
query_id = nb_endpoint.get(**temp_dict)
if query_id:
id_list.append(query_id.id)
else:
return ValueError("%s not found" % (index))
else:
try:
query_id = nb_endpoint.get(**{QUERY_TYPES.get(k, "q"): search})
except ValueError:
raise ValueError(
"Multiple results found while searching for key: %s" % (k)
)
if isinstance(v, list):
data[k] = id_list
elif query_id:
data[k] = query_id.id
elif k in NO_DEFAULT_ID:
pass
else:
raise ValueError("Could not resolve id of %s: %s" % (k, v))
return data
def normalize_data(data):
for k, v in data.items():
if isinstance(v, dict):
for subk, subv in v.items():
sub_data_type = QUERY_TYPES.get(subk, "q")
if sub_data_type == "slug":
if "-" in subv:
data[k][subk] = subv.replace(" ", "").lower()
elif " " in subv:
data[k][subk] = subv.replace(" ", "-").lower()
else:
data[k][subk] = subv.lower()
else:
data_type = QUERY_TYPES.get(k, "q")
if data_type == "slug":
if "-" in v:
data[k] = v.replace(" ", "").lower()
elif " " in v:
data[k] = v.replace(" ", "-").lower()
else:
data[k] = v.lower()
elif data_type == "timezone":
if " " in v:
data[k] = v.replace(" ", "_")
return data

View file

@ -1,312 +0,0 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright: (c) 2018, Mikhail Yohman (@FragmentedPacket) <mikhail.yohman@gmail.com>
# Copyright: (c) 2018, David Gomez (@amb1s1) <david.gomez@networktocode.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
ANSIBLE_METADATA = {'metadata_version': '1.1',
'status': ['preview'],
'supported_by': 'community'}
DOCUMENTATION = r'''
---
module: netbox_device
short_description: Create, update or delete devices within Netbox
description:
- Creates, updates or removes devices from Netbox
notes:
- Tags should be defined as a YAML list
- This should be ran with connection C(local) and hosts C(localhost)
author:
- Mikhail Yohman (@FragmentedPacket)
- David Gomez (@amb1s1)
requirements:
- pynetbox
version_added: '2.8'
options:
netbox_url:
description:
- URL of the Netbox instance resolvable by Ansible control host
required: true
netbox_token:
description:
- The token created within Netbox to authorize API access
required: true
data:
description:
- Defines the device configuration
suboptions:
name:
description:
- The name of the device
required: true
device_type:
description:
- Required if I(state=present) and the device does not exist yet
device_role:
description:
- Required if I(state=present) and the device does not exist yet
tenant:
description:
- The tenant that the device will be assigned to
platform:
description:
- The platform of the device
serial:
description:
- Serial number of the device
asset_tag:
description:
- Asset tag that is associated to the device
site:
description:
- Required if I(state=present) and the device does not exist yet
rack:
description:
- The name of the rack to assign the device to
position:
description:
- The position of the device in the rack defined above
face:
description:
- Required if I(rack) is defined
status:
description:
- The status of the device
choices:
- Active
- Offline
- Planned
- Staged
- Failed
- Inventory
cluster:
description:
- Cluster that the device will be assigned to
comments:
description:
- Comments that may include additional information in regards to the device
tags:
description:
- Any tags that the device may need to be associated with
custom_fields:
description:
- must exist in Netbox
required: true
state:
description:
- Use C(present) or C(absent) for adding or removing.
choices: [ absent, present ]
default: present
validate_certs:
description:
- If C(no), SSL certificates will not be validated. This should only be used on personally controlled sites using self-signed certificates.
default: 'yes'
type: bool
'''
EXAMPLES = r'''
- name: "Test Netbox modules"
connection: local
hosts: localhost
gather_facts: False
tasks:
- name: Create device within Netbox with only required information
netbox_device:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
name: Test Device
device_type: C9410R
device_role: Core Switch
site: Main
state: present
- name: Delete device within netbox
netbox_device:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
name: Test Device
state: absent
- name: Create device with tags
netbox_device:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
name: Another Test Device
device_type: C9410R
device_role: Core Switch
site: Main
tags:
- Schnozzberry
state: present
- name: Update the rack and position of an existing device
netbox_device:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
name: Test Device
rack: Test Rack
position: 10
face: Front
state: present
'''
RETURN = r'''
device:
description: Serialized object as created or already existent within Netbox
returned: success (when I(state=present))
type: dict
msg:
description: Message indicating failure or info about what has been achieved
returned: always
type: str
'''
import json
import traceback
from ansible.module_utils.basic import AnsibleModule, missing_required_lib
from ansible.module_utils.net_tools.netbox.netbox_utils import (
find_ids,
normalize_data,
create_netbox_object,
delete_netbox_object,
update_netbox_object,
DEVICE_STATUS,
FACE_ID
)
PYNETBOX_IMP_ERR = None
try:
import pynetbox
HAS_PYNETBOX = True
except ImportError:
PYNETBOX_IMP_ERR = traceback.format_exc()
HAS_PYNETBOX = False
def main():
'''
Main entry point for module execution
'''
argument_spec = dict(
netbox_url=dict(type="str", required=True),
netbox_token=dict(type="str", required=True, no_log=True),
data=dict(type="dict", required=True),
state=dict(required=False, default='present', choices=['present', 'absent']),
validate_certs=dict(type="bool", default=True)
)
global module
module = AnsibleModule(argument_spec=argument_spec,
supports_check_mode=True)
# Fail module if pynetbox is not installed
if not HAS_PYNETBOX:
module.fail_json(msg=missing_required_lib('pynetbox'), exception=PYNETBOX_IMP_ERR)
# Fail if device name is not given
if not module.params["data"].get("name"):
module.fail_json(msg="missing device name")
# Assign variables to be used with module
app = 'dcim'
endpoint = 'devices'
url = module.params["netbox_url"]
token = module.params["netbox_token"]
data = module.params["data"]
state = module.params["state"]
validate_certs = module.params["validate_certs"]
# Attempt to create Netbox API object
try:
nb = pynetbox.api(url, token=token, ssl_verify=validate_certs)
except Exception:
module.fail_json(msg="Failed to establish connection to Netbox API")
try:
nb_app = getattr(nb, app)
except AttributeError:
module.fail_json(msg="Incorrect application specified: %s" % (app))
nb_endpoint = getattr(nb_app, endpoint)
norm_data = normalize_data(data)
try:
if 'present' in state:
result = ensure_device_present(nb, nb_endpoint, norm_data)
else:
result = ensure_device_absent(nb_endpoint, norm_data)
return module.exit_json(**result)
except pynetbox.RequestError as e:
return module.fail_json(msg=json.loads(e.error))
except ValueError as e:
return module.fail_json(msg=str(e))
def _find_ids(nb, data):
if data.get("status"):
data["status"] = DEVICE_STATUS.get(data["status"].lower())
if data.get("face"):
data["face"] = FACE_ID.get(data["face"].lower())
return find_ids(nb, data)
def ensure_device_present(nb, nb_endpoint, normalized_data):
'''
:returns dict(device, msg, changed, diff): dictionary resulting of the request,
where `device` is the serialized device fetched or newly created in
Netbox
'''
data = _find_ids(nb, normalized_data)
nb_device = nb_endpoint.get(name=data["name"])
result = {}
if not nb_device:
device, diff = create_netbox_object(nb_endpoint, data, module.check_mode)
msg = "Device %s created" % (data["name"])
changed = True
result["diff"] = diff
else:
device, diff = update_netbox_object(nb_device, data, module.check_mode)
if device is False:
module.fail_json(
msg="Request failed, couldn't update device: %s" % data["name"]
)
if diff:
msg = "Device %s updated" % (data["name"])
changed = True
result["diff"] = diff
else:
msg = "Device %s already exists" % (data["name"])
changed = False
result.update({"device": device, "changed": changed, "msg": msg})
return result
def ensure_device_absent(nb_endpoint, data):
'''
:returns dict(msg, changed, diff)
'''
nb_device = nb_endpoint.get(name=data["name"])
result = {}
if nb_device:
dummy, diff = delete_netbox_object(nb_device, module.check_mode)
msg = 'Device %s deleted' % (data["name"])
changed = True
result["diff"] = diff
else:
msg = 'Device %s already absent' % (data["name"])
changed = False
result.update({"changed": changed, "msg": msg})
return result
if __name__ == "__main__":
main()

View file

@ -1,351 +0,0 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright: (c) 2018, Mikhail Yohman (@FragmentedPacket) <mikhail.yohman@gmail.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
ANSIBLE_METADATA = {"metadata_version": "1.1",
"status": ["preview"],
"supported_by": "community"}
DOCUMENTATION = r"""
---
module: netbox_interface
short_description: Creates or removes interfaces from Netbox
description:
- Creates or removes interfaces from Netbox
notes:
- Tags should be defined as a YAML list
- This should be ran with connection C(local) and hosts C(localhost)
author:
- Mikhail Yohman (@FragmentedPacket)
requirements:
- pynetbox
version_added: "2.8"
options:
netbox_url:
description:
- URL of the Netbox instance resolvable by Ansible control host
required: true
type: str
netbox_token:
description:
- The token created within Netbox to authorize API access
required: true
type: str
data:
description:
- Defines the prefix configuration
suboptions:
device:
description:
- Name of the device the interface will be associated with (case-sensitive)
required: true
type: str
name:
description:
- Name of the interface to be created
required: true
type: str
form_factor:
description:
- |
Form factor of the interface:
ex. 1000Base-T (1GE), Virtual, 10GBASE-T (10GE)
This has to be specified exactly as what is found within UI
type: str
enabled:
description:
- Sets whether interface shows enabled or disabled
type: bool
lag:
description:
- Parent LAG interface will be a member of
type: dict
mtu:
description:
- The MTU of the interface
type: str
mac_address:
description:
- The MAC address of the interface
type: str
mgmt_only:
description:
- This interface is used only for out-of-band management
type: bool
description:
description:
- The description of the prefix
type: str
mode:
description:
- The mode of the interface
choices:
- Access
- Tagged
- Tagged All
type: str
untagged_vlan:
description:
- The untagged VLAN to be assigned to interface
type: dict
tagged_vlans:
description:
- A list of tagged VLANS to be assigned to interface. Mode must be set to either C(Tagged) or C(Tagged All)
type: list
tags:
description:
- Any tags that the prefix may need to be associated with
type: list
required: true
state:
description:
- Use C(present) or C(absent) for adding or removing.
choices: [ absent, present ]
default: present
type: str
validate_certs:
description:
- |
If C(no), SSL certificates will not be validated.
This should only be used on personally controlled sites using self-signed certificates.
default: "yes"
type: bool
"""
EXAMPLES = r"""
- name: "Test Netbox interface module"
connection: local
hosts: localhost
gather_facts: False
tasks:
- name: Create interface within Netbox with only required information
netbox_interface:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
device: test100
name: GigabitEthernet1
state: present
- name: Delete interface within netbox
netbox_interface:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
device: test100
name: GigabitEthernet1
state: absent
- name: Create LAG with several specified options
netbox_interface:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
device: test100
name: port-channel1
form_factor: Link Aggregation Group (LAG)
mtu: 1600
mgmt_only: false
mode: Access
state: present
- name: Create interface and assign it to parent LAG
netbox_interface:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
device: test100
name: GigabitEthernet1
enabled: false
form_factor: 1000Base-t (1GE)
lag:
name: port-channel1
mtu: 1600
mgmt_only: false
mode: Access
state: present
- name: Create interface as a trunk port
netbox_interface:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
device: test100
name: GigabitEthernet25
enabled: false
form_factor: 1000Base-t (1GE)
untagged_vlan:
name: Wireless
site: Test Site
tagged_vlans:
- name: Data
site: Test Site
- name: VoIP
site: Test Site
mtu: 1600
mgmt_only: true
mode: Tagged
state: present
"""
RETURN = r"""
interface:
description: Serialized object as created or already existent within Netbox
returned: on creation
type: dict
msg:
description: Message indicating failure or info about what has been achieved
returned: always
type: str
"""
import json
import traceback
from ansible.module_utils.basic import AnsibleModule, missing_required_lib
from ansible.module_utils.net_tools.netbox.netbox_utils import (
find_ids,
normalize_data,
create_netbox_object,
delete_netbox_object,
update_netbox_object,
INTF_FORM_FACTOR,
INTF_MODE,
)
from ansible.module_utils.compat import ipaddress
from ansible.module_utils._text import to_text
PYNETBOX_IMP_ERR = None
try:
import pynetbox
HAS_PYNETBOX = True
except ImportError:
PYNETBOX_IMP_ERR = traceback.format_exc()
HAS_PYNETBOX = False
def main():
"""
Main entry point for module execution
"""
argument_spec = dict(
netbox_url=dict(type="str", required=True),
netbox_token=dict(type="str", required=True, no_log=True),
data=dict(type="dict", required=True),
state=dict(required=False, default="present", choices=["present", "absent"]),
validate_certs=dict(type="bool", default=True)
)
global module
module = AnsibleModule(argument_spec=argument_spec,
supports_check_mode=True)
# Fail module if pynetbox is not installed
if not HAS_PYNETBOX:
module.fail_json(msg=missing_required_lib('pynetbox'), exception=PYNETBOX_IMP_ERR)
# Assign variables to be used with module
app = "dcim"
endpoint = "interfaces"
url = module.params["netbox_url"]
token = module.params["netbox_token"]
data = module.params["data"]
state = module.params["state"]
validate_certs = module.params["validate_certs"]
# Attempt to create Netbox API object
try:
nb = pynetbox.api(url, token=token, ssl_verify=validate_certs)
except Exception:
module.fail_json(msg="Failed to establish connection to Netbox API")
try:
nb_app = getattr(nb, app)
except AttributeError:
module.fail_json(msg="Incorrect application specified: %s" % (app))
nb_endpoint = getattr(nb_app, endpoint)
norm_data = normalize_data(data)
try:
norm_data = _check_and_adapt_data(nb, norm_data)
if "present" in state:
return module.exit_json(
**ensure_interface_present(nb, nb_endpoint, norm_data)
)
else:
return module.exit_json(
**ensure_interface_absent(nb, nb_endpoint, norm_data)
)
except pynetbox.RequestError as e:
return module.fail_json(msg=json.loads(e.error))
except ValueError as e:
return module.fail_json(msg=str(e))
except AttributeError as e:
return module.fail_json(msg=str(e))
def _check_and_adapt_data(nb, data):
data = find_ids(nb, data)
if data.get("form_factor"):
data["form_factor"] = INTF_FORM_FACTOR.get(data["form_factor"].lower())
if data.get("mode"):
data["mode"] = INTF_MODE.get(data["mode"].lower())
return data
def ensure_interface_present(nb, nb_endpoint, data):
"""
:returns dict(interface, msg, changed): dictionary resulting of the request,
where 'interface' is the serialized interface fetched or newly created in Netbox
"""
if not isinstance(data, dict):
changed = False
return {"msg": data, "changed": changed}
nb_intf = nb_endpoint.get(name=data["name"], device_id=data["device"])
result = dict()
if not nb_intf:
intf, diff = create_netbox_object(nb_endpoint, data, module.check_mode)
changed = True
msg = "Interface %s created" % (data["name"])
else:
intf, diff = update_netbox_object(nb_intf, data, module.check_mode)
if intf is False:
module.fail_json(
msg="Request failed, couldn't update device: %s" % (data["name"])
)
if diff:
msg = "Interface %s updated" % (data["name"])
changed = True
result["diff"] = diff
else:
msg = "Interface %s already exists" % (data["name"])
changed = False
result.update({"interface": intf, "msg": msg, "changed": changed})
return result
def ensure_interface_absent(nb, nb_endpoint, data):
"""
:returns dict(msg, changed, diff)
"""
nb_intf = nb_endpoint.get(name=data["name"], device_id=data["device"])
result = dict()
if nb_intf:
dummy, diff = delete_netbox_object(nb_intf, module.check_mode)
changed = True
msg = "Interface %s deleted" % (data["name"])
result["diff"] = diff
else:
msg = "Interface %s already absent" % (data["name"])
changed = False
result.update({"msg": msg, "changed": changed})
return result
if __name__ == "__main__":
main()

View file

@ -1,517 +0,0 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright: (c) 2018, Mikhail Yohman (@FragmentedPacket) <mikhail.yohman@gmail.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
ANSIBLE_METADATA = {'metadata_version': '1.1',
'status': ['preview'],
'supported_by': 'community'}
DOCUMENTATION = r'''
---
module: netbox_ip_address
short_description: Creates or removes IP addresses from Netbox
description:
- Creates or removes IP addresses from Netbox
notes:
- Tags should be defined as a YAML list
- This should be ran with connection C(local) and hosts C(localhost)
author:
- Mikhail Yohman (@FragmentedPacket)
- Anthony Ruhier (@Anthony25)
requirements:
- pynetbox
version_added: '2.8'
options:
netbox_url:
description:
- URL of the Netbox instance resolvable by Ansible control host
required: true
netbox_token:
description:
- The token created within Netbox to authorize API access
required: true
data:
description:
- Defines the IP address configuration
suboptions:
family:
description:
- Specifies with address family the IP address belongs to
choices:
- 4
- 6
address:
description:
- Required if state is C(present)
prefix:
description:
- |
With state C(present), if an interface is given, it will ensure
that an IP inside this prefix (and vrf, if given) is attached
to this interface. Otherwise, it will get the next available IP
of this prefix and attach it.
With state C(new), it will force to get the next available IP in
this prefix. If an interface is given, it will also force to attach
it.
Required if state is C(present) or C(new) when no address is given.
Unused if an address is specified.
vrf:
description:
- VRF that IP address is associated with
tenant:
description:
- The tenant that the device will be assigned to
status:
description:
- The status of the IP address
choices:
- Active
- Reserved
- Deprecated
- DHCP
role:
description:
- The role of the IP address
choices:
- Loopback
- Secondary
- Anycast
- VIP
- VRRP
- HSRP
- GLBP
- CARP
interface:
description:
- |
The name and device of the interface that the IP address should be assigned to
Required if state is C(present) and a prefix specified.
description:
description:
- The description of the interface
nat_inside:
description:
- The inside IP address this IP is assigned to
tags:
description:
- Any tags that the IP address may need to be associated with
custom_fields:
description:
- must exist in Netbox
required: true
state:
description:
- |
Use C(present), C(new) or C(absent) for adding, force adding or removing.
C(present) will check if the IP is already created, and return it if
true. C(new) will force to create it anyway (useful for anycasts, for
example).
choices: [ absent, new, present ]
default: present
validate_certs:
description:
- If C(no), SSL certificates will not be validated. This should only be used on personally controlled sites using self-signed certificates.
default: 'yes'
type: bool
'''
EXAMPLES = r'''
- name: "Test Netbox IP address module"
connection: local
hosts: localhost
gather_facts: False
tasks:
- name: Create IP address within Netbox with only required information
netbox_ip_address:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
address: 192.168.1.10
state: present
- name: Force to create (even if it already exists) the IP
netbox_ip_address:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
address: 192.168.1.10
state: new
- name: Get a new available IP inside 192.168.1.0/24
netbox_ip_address:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
prefix: 192.168.1.0/24
state: new
- name: Delete IP address within netbox
netbox_ip_address:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
address: 192.168.1.10
state: absent
- name: Create IP address with several specified options
netbox_ip_address:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
family: 4
address: 192.168.1.20
vrf: Test
tenant: Test Tenant
status: Reserved
role: Loopback
description: Test description
tags:
- Schnozzberry
state: present
- name: Create IP address and assign a nat_inside IP
netbox_ip_address:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
family: 4
address: 192.168.1.30
vrf: Test
nat_inside:
address: 192.168.1.20
vrf: Test
interface:
name: GigabitEthernet1
device: test100
- name: Ensure that an IP inside 192.168.1.0/24 is attached to GigabitEthernet1
netbox_ip_address:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
prefix: 192.168.1.0/24
vrf: Test
interface:
name: GigabitEthernet1
device: test100
state: present
- name: Attach a new available IP of 192.168.1.0/24 to GigabitEthernet1
netbox_ip_address:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
prefix: 192.168.1.0/24
vrf: Test
interface:
name: GigabitEthernet1
device: test100
state: new
'''
RETURN = r'''
ip_address:
description: Serialized object as created or already existent within Netbox
returned: on creation
type: dict
msg:
description: Message indicating failure or info about what has been achieved
returned: always
type: str
'''
import json
import traceback
from ansible.module_utils.basic import AnsibleModule, missing_required_lib
from ansible.module_utils.net_tools.netbox.netbox_utils import (
find_ids,
normalize_data,
create_netbox_object,
delete_netbox_object,
update_netbox_object,
IP_ADDRESS_ROLE,
IP_ADDRESS_STATUS
)
from ansible.module_utils.compat import ipaddress
from ansible.module_utils._text import to_text
PYNETBOX_IMP_ERR = None
try:
import pynetbox
HAS_PYNETBOX = True
except ImportError:
PYNETBOX_IMP_ERR = traceback.format_exc()
HAS_PYNETBOX = False
def main():
'''
Main entry point for module execution
'''
argument_spec = dict(
netbox_url=dict(type="str", required=True),
netbox_token=dict(type="str", required=True, no_log=True),
data=dict(type="dict", required=True),
state=dict(required=False, default='present', choices=['present', 'absent', 'new']),
validate_certs=dict(type="bool", default=True)
)
global module
module = AnsibleModule(argument_spec=argument_spec,
supports_check_mode=True)
# Fail module if pynetbox is not installed
if not HAS_PYNETBOX:
module.fail_json(msg=missing_required_lib('pynetbox'), exception=PYNETBOX_IMP_ERR)
# Assign variables to be used with module
changed = False
app = 'ipam'
endpoint = 'ip_addresses'
url = module.params["netbox_url"]
token = module.params["netbox_token"]
data = module.params["data"]
state = module.params["state"]
validate_certs = module.params["validate_certs"]
# Attempt to create Netbox API object
try:
nb = pynetbox.api(url, token=token, ssl_verify=validate_certs)
except Exception:
module.fail_json(msg="Failed to establish connection to Netbox API")
try:
nb_app = getattr(nb, app)
except AttributeError:
module.fail_json(msg="Incorrect application specified: %s" % (app))
nb_endpoint = getattr(nb_app, endpoint)
norm_data = normalize_data(data)
try:
norm_data = _check_and_adapt_data(nb, norm_data)
if state in ("new", "present"):
return _handle_state_new_present(
module, state, nb_app, nb_endpoint, norm_data
)
elif state == "absent":
return module.exit_json(
**ensure_ip_address_absent(nb_endpoint, norm_data)
)
else:
return module.fail_json(msg="Invalid state %s" % state)
except pynetbox.RequestError as e:
return module.fail_json(msg=json.loads(e.error))
except ValueError as e:
return module.fail_json(msg=str(e))
def _check_and_adapt_data(nb, data):
data = find_ids(nb, data)
if data.get("vrf") and not isinstance(data["vrf"], int):
raise ValueError(
"%s does not exist - Please create VRF" % (data["vrf"])
)
if data.get("status"):
data["status"] = IP_ADDRESS_STATUS.get(data["status"].lower())
if data.get("role"):
data["role"] = IP_ADDRESS_ROLE.get(data["role"].lower())
return data
def _handle_state_new_present(module, state, nb_app, nb_endpoint, data):
if data.get("address"):
if state == "present":
return module.exit_json(
**ensure_ip_address_present(nb_endpoint, data)
)
elif state == "new":
return module.exit_json(
**create_ip_address(nb_endpoint, data)
)
else:
if state == "present":
return module.exit_json(
**ensure_ip_in_prefix_present_on_netif(
nb_app, nb_endpoint, data
)
)
elif state == "new":
return module.exit_json(
**get_new_available_ip_address(nb_app, data)
)
def ensure_ip_address_present(nb_endpoint, data):
"""
:returns dict(ip_address, msg, changed): dictionary resulting of the request,
where 'ip_address' is the serialized ip fetched or newly created in Netbox
"""
if not isinstance(data, dict):
changed = False
return {"msg": data, "changed": changed}
try:
nb_addr = _search_ip(nb_endpoint, data)
except ValueError:
return _error_multiple_ip_results(data)
result = {}
if not nb_addr:
return create_ip_address(nb_endpoint, data)
else:
ip_addr, diff = update_netbox_object(nb_addr, data, module.check_mode)
if ip_addr is False:
module.fail_json(
msg="Request failed, couldn't update IP: %s" % (data["address"])
)
if diff:
msg = "IP Address %s updated" % (data["address"])
changed = True
result["diff"] = diff
else:
ip_addr = nb_addr.serialize()
changed = False
msg = "IP Address %s already exists" % (data["address"])
return {"ip_address": ip_addr, "msg": msg, "changed": changed}
def _search_ip(nb_endpoint, data):
get_query_params = {"address": data["address"]}
if data.get("vrf"):
get_query_params["vrf_id"] = data["vrf"]
ip_addr = nb_endpoint.get(**get_query_params)
return ip_addr
def _error_multiple_ip_results(data):
changed = False
if "vrf" in data:
return {"msg": "Returned more than result", "changed": changed}
else:
return {
"msg": "Returned more than one result - Try specifying VRF.",
"changed": changed
}
def create_ip_address(nb_endpoint, data):
if not isinstance(data, dict):
changed = False
return {"msg": data, "changed": changed}
ip_addr, diff = create_netbox_object(nb_endpoint, data, module.check_mode)
changed = True
msg = "IP Addresses %s created" % (data["address"])
return {"ip_address": ip_addr, "msg": msg, "changed": changed, "diff": diff}
def ensure_ip_in_prefix_present_on_netif(nb_app, nb_endpoint, data):
"""
:returns dict(ip_address, msg, changed): dictionary resulting of the request,
where 'ip_address' is the serialized ip fetched or newly created in Netbox
"""
if not isinstance(data, dict):
changed = False
return {"msg": data, "changed": changed}
if not data.get("interface") or not data.get("prefix"):
raise ValueError("A prefix and interface are required")
get_query_params = {
"interface_id": data["interface"], "parent": data["prefix"],
}
if data.get("vrf"):
get_query_params["vrf_id"] = data["vrf"]
attached_ips = nb_endpoint.filter(**get_query_params)
if attached_ips:
ip_addr = attached_ips[-1].serialize()
changed = False
msg = "IP Address %s already attached" % (ip_addr["address"])
return {"ip_address": ip_addr, "msg": msg, "changed": changed}
else:
return get_new_available_ip_address(nb_app, data)
def get_new_available_ip_address(nb_app, data):
prefix_query = {"prefix": data["prefix"]}
if data.get("vrf"):
prefix_query["vrf_id"] = data["vrf"]
result = {}
prefix = nb_app.prefixes.get(**prefix_query)
if not prefix:
changed = False
msg = "%s does not exist - please create first" % (data["prefix"])
return {"msg": msg, "changed": changed}
elif prefix.available_ips.list():
ip_addr, diff = create_netbox_object(prefix.available_ips, data, module.check_mode)
changed = True
msg = "IP Addresses %s created" % (ip_addr["address"])
result["diff"] = diff
else:
changed = False
msg = "No available IPs available within %s" % (data['prefix'])
return {"msg": msg, "changed": changed}
result.update({"ip_address": ip_addr, "msg": msg, "changed": changed})
return result
def _get_prefix_id(nb_app, prefix, vrf_id=None):
ipaddr_prefix = ipaddress.ip_network(prefix)
network = to_text(ipaddr_prefix.network_address)
mask = ipaddr_prefix.prefixlen
prefix_query_params = {
"prefix": network,
"mask_length": mask
}
if vrf_id:
prefix_query_params["vrf_id"] = vrf_id
prefix_id = nb_app.prefixes.get(prefix_query_params)
if not prefix_id:
if vrf_id:
raise ValueError("Prefix %s does not exist in VRF %s - Please create it" % (prefix, vrf_id))
else:
raise ValueError("Prefix %s does not exist - Please create it" % (prefix))
return prefix_id
def ensure_ip_address_absent(nb_endpoint, data):
"""
:returns dict(msg, changed)
"""
if not isinstance(data, dict):
changed = False
return {"msg": data, "changed": changed}
try:
ip_addr = _search_ip(nb_endpoint, data)
except ValueError:
return _error_multiple_ip_results(data)
result = {}
if ip_addr:
dummy, diff = delete_netbox_object(ip_addr, module.check_mode)
changed = True
msg = "IP Address %s deleted" % (data["address"])
result["diff"] = diff
else:
changed = False
msg = "IP Address %s already absent" % (data["address"])
result.update({"msg": msg, "changed": changed})
return result
if __name__ == "__main__":
main()

View file

@ -1,461 +0,0 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright: (c) 2018, Mikhail Yohman (@FragmentedPacket) <mikhail.yohman@gmail.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
ANSIBLE_METADATA = {"metadata_version": "1.1",
"status": ["preview"],
"supported_by": "community"}
DOCUMENTATION = r"""
---
module: netbox_prefix
short_description: Creates or removes prefixes from Netbox
description:
- Creates or removes prefixes from Netbox
notes:
- Tags should be defined as a YAML list
- This should be ran with connection C(local) and hosts C(localhost)
author:
- Mikhail Yohman (@FragmentedPacket)
- Anthony Ruhier (@Anthony25)
requirements:
- pynetbox
version_added: '2.8'
options:
netbox_url:
description:
- URL of the Netbox instance resolvable by Ansible control host
required: true
type: str
netbox_token:
description:
- The token created within Netbox to authorize API access
required: true
type: str
data:
description:
- Defines the prefix configuration
suboptions:
family:
description:
- Specifies which address family the prefix prefix belongs to
choices:
- 4
- 6
type: int
prefix:
description:
- Required if state is C(present) and first_available is False. Will allocate or free this prefix.
type: str
parent:
description:
- Required if state is C(present) and first_available is C(yes). Will get a new available prefix in this parent prefix.
type: str
prefix_length:
description:
- |
Required ONLY if state is C(present) and first_available is C(yes).
Will get a new available prefix of the given prefix_length in this parent prefix.
type: str
site:
description:
- Site that prefix is associated with
type: str
vrf:
description:
- VRF that prefix is associated with
type: str
tenant:
description:
- The tenant that the prefix will be assigned to
type: str
vlan:
description:
- The VLAN the prefix will be assigned to
type: dict
status:
description:
- The status of the prefix
choices:
- Active
- Container
- Deprecated
- Reserved
type: str
role:
description:
- The role of the prefix
type: str
is_pool:
description:
- All IP Addresses within this prefix are considered usable
type: bool
description:
description:
- The description of the prefix
type: str
tags:
description:
- Any tags that the prefix may need to be associated with
type: list
custom_fields:
description:
- Must exist in Netbox and in key/value format
type: dict
required: true
state:
description:
- Use C(present) or C(absent) for adding or removing.
choices: [ absent, present ]
default: present
first_available:
description:
- If C(yes) and state C(present), if an parent is given, it will get the
first available prefix of the given prefix_length inside the given parent (and
vrf, if given).
Unused with state C(absent).
default: 'no'
type: bool
validate_certs:
description:
- If C(no), SSL certificates will not be validated. This should only be used on personally controlled sites using self-signed certificates.
default: "yes"
type: bool
"""
EXAMPLES = r"""
- name: "Test Netbox prefix module"
connection: local
hosts: localhost
gather_facts: False
tasks:
- name: Create prefix within Netbox with only required information
netbox_prefix:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
prefix: 10.156.0.0/19
state: present
- name: Delete prefix within netbox
netbox_prefix:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
prefix: 10.156.0.0/19
state: absent
- name: Create prefix with several specified options
netbox_prefix:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
family: 4
prefix: 10.156.32.0/19
site: Test Site
vrf: Test VRF
tenant: Test Tenant
vlan:
name: Test VLAN
site: Test Site
tenant: Test Tenant
vlan_group: Test Vlan Group
status: Reserved
role: Network of care
description: Test description
is_pool: true
tags:
- Schnozzberry
state: present
- name: Get a new /24 inside 10.156.0.0/19 within Netbox - Parent doesn't exist
netbox_prefix:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
parent: 10.156.0.0/19
prefix_length: 24
state: present
first_available: yes
- name: Create prefix within Netbox with only required information
netbox_prefix:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
prefix: 10.156.0.0/19
state: present
- name: Get a new /24 inside 10.156.0.0/19 within Netbox
netbox_prefix:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
parent: 10.156.0.0/19
prefix_length: 24
state: present
first_available: yes
- name: Get a new /24 inside 10.157.0.0/19 within Netbox with additional values
netbox_prefix:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
parent: 10.157.0.0/19
prefix_length: 24
vrf: Test VRF
site: Test Site
state: present
first_available: yes
"""
RETURN = r"""
prefix:
description: Serialized object as created or already existent within Netbox
returned: on creation
type: dict
msg:
description: Message indicating failure or info about what has been achieved
returned: always
type: str
"""
import json
import traceback
import re
from ansible.module_utils.basic import AnsibleModule, missing_required_lib
from ansible.module_utils.net_tools.netbox.netbox_utils import (
find_ids,
normalize_data,
create_netbox_object,
delete_netbox_object,
update_netbox_object,
PREFIX_STATUS,
)
from ansible.module_utils.compat import ipaddress
from ansible.module_utils._text import to_text
PYNETBOX_IMP_ERR = None
try:
import pynetbox
HAS_PYNETBOX = True
except ImportError:
PYNETBOX_IMP_ERR = traceback.format_exc()
HAS_PYNETBOX = False
def main():
"""
Main entry point for module execution
"""
argument_spec = dict(
netbox_url=dict(type="str", required=True),
netbox_token=dict(type="str", required=True, no_log=True),
data=dict(type="dict", required=True),
state=dict(required=False, default="present", choices=["present", "absent"]),
first_available=dict(type="bool", required=False, default=False),
validate_certs=dict(type="bool", default=True),
)
global module
module = AnsibleModule(argument_spec=argument_spec,
supports_check_mode=True)
# Fail module if pynetbox is not installed
if not HAS_PYNETBOX:
module.fail_json(msg=missing_required_lib('pynetbox'), exception=PYNETBOX_IMP_ERR)
# Assign variables to be used with module
app = "ipam"
endpoint = "prefixes"
url = module.params["netbox_url"]
token = module.params["netbox_token"]
data = module.params["data"]
state = module.params["state"]
first_available = module.params["first_available"]
validate_certs = module.params["validate_certs"]
# Attempt to create Netbox API object
try:
nb = pynetbox.api(url, token=token, ssl_verify=validate_certs)
except Exception:
module.fail_json(msg="Failed to establish connection to Netbox API")
try:
nb_app = getattr(nb, app)
except AttributeError:
module.fail_json(msg="Incorrect application specified: %s" % (app))
nb_endpoint = getattr(nb_app, endpoint)
norm_data = normalize_data(data)
try:
norm_data = _check_and_adapt_data(nb, norm_data)
if "present" in state:
return module.exit_json(**ensure_prefix_present(
nb, nb_endpoint, norm_data, first_available
))
else:
return module.exit_json(
**ensure_prefix_absent(nb, nb_endpoint, norm_data)
)
except pynetbox.RequestError as e:
return module.fail_json(msg=json.loads(e.error))
except ValueError as e:
return module.fail_json(msg=str(e))
except AttributeError as e:
return module.fail_json(msg=str(e))
def ensure_prefix_present(nb, nb_endpoint, data, first_available=False):
"""
:returns dict(prefix, msg, changed): dictionary resulting of the request,
where 'prefix' is the serialized device fetched or newly created in Netbox
"""
if not isinstance(data, dict):
changed = False
return {"msg": data, "changed": changed}
if first_available:
for k in ("parent", "prefix_length"):
if k not in data:
raise ValueError("'%s' is required with first_available" % k)
return get_new_available_prefix(nb_endpoint, data)
else:
if "prefix" not in data:
raise ValueError("'prefix' is required without first_available")
return get_or_create_prefix(nb_endpoint, data)
def _check_and_adapt_data(nb, data):
data = find_ids(nb, data)
if data.get("vrf") and not isinstance(data["vrf"], int):
raise ValueError(
"%s does not exist - Please create VRF" % (data["vrf"])
)
if data.get("status"):
data["status"] = PREFIX_STATUS.get(data["status"].lower())
return data
def _search_prefix(nb_endpoint, data):
if data.get("prefix"):
prefix = ipaddress.ip_network(data["prefix"])
elif data.get("parent"):
prefix = ipaddress.ip_network(data["parent"])
network = to_text(prefix.network_address)
mask = prefix.prefixlen
if data.get("vrf"):
if not isinstance(data["vrf"], int):
raise ValueError("%s does not exist - Please create VRF" % (data["vrf"]))
else:
prefix = nb_endpoint.get(q=network, mask_length=mask, vrf_id=data["vrf"])
else:
prefix = nb_endpoint.get(q=network, mask_length=mask, vrf="null")
return prefix
def _error_multiple_prefix_results(data):
changed = False
if data.get("vrf"):
return {"msg": "Returned more than one result", "changed": changed}
else:
return {
"msg": "Returned more than one result - Try specifying VRF.",
"changed": changed
}
def get_or_create_prefix(nb_endpoint, data):
try:
nb_prefix = _search_prefix(nb_endpoint, data)
except ValueError:
return _error_multiple_prefix_results(data)
result = dict()
if not nb_prefix:
prefix, diff = create_netbox_object(nb_endpoint, data, module.check_mode)
changed = True
msg = "Prefix %s created" % (prefix["prefix"])
result["diff"] = diff
else:
prefix, diff = update_netbox_object(nb_prefix, data, module.check_mode)
if prefix is False:
module.fail_json(
msg="Request failed, couldn't update prefix: %s" % (data["prefix"])
)
if diff:
msg = "Prefix %s updated" % (data["prefix"])
changed = True
result["diff"] = diff
else:
msg = "Prefix %s already exists" % (data["prefix"])
changed = False
result.update({"prefix": prefix, "msg": msg, "changed": changed})
return result
def get_new_available_prefix(nb_endpoint, data):
try:
parent_prefix = _search_prefix(nb_endpoint, data)
except ValueError:
return _error_multiple_prefix_results(data)
result = dict()
if not parent_prefix:
changed = False
msg = "Parent prefix does not exist: %s" % (data["parent"])
return {"msg": msg, "changed": changed}
elif parent_prefix.available_prefixes.list():
prefix, diff = create_netbox_object(parent_prefix.available_prefixes, data, module.check_mode)
changed = True
msg = "Prefix %s created" % (prefix["prefix"])
result["diff"] = diff
else:
changed = False
msg = "No available prefixes within %s" % (data["parent"])
result.update({"prefix": prefix, "msg": msg, "changed": changed})
return result
def ensure_prefix_absent(nb, nb_endpoint, data):
"""
:returns dict(msg, changed)
"""
try:
nb_prefix = _search_prefix(nb_endpoint, data)
except ValueError:
return _error_multiple_prefix_results(data)
result = dict()
if nb_prefix:
dummy, diff = delete_netbox_object(nb_prefix, module.check_mode)
changed = True
msg = "Prefix %s deleted" % (nb_prefix.prefix)
result["diff"] = diff
else:
msg = "Prefix %s already absent" % (data["prefix"])
changed = False
result.update({"msg": msg, "changed": changed})
return result
if __name__ == "__main__":
main()

View file

@ -1,348 +0,0 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright: (c) 2018, Mikhail Yohman (@FragmentedPacket) <mikhail.yohman@gmail.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
ANSIBLE_METADATA = {"metadata_version": "1.1",
"status": ["preview"],
"supported_by": "community"}
DOCUMENTATION = r"""
---
module: netbox_site
short_description: Creates or removes sites from Netbox
description:
- Creates or removes sites from Netbox
notes:
- Tags should be defined as a YAML list
- This should be ran with connection C(local) and hosts C(localhost)
author:
- Mikhail Yohman (@FragmentedPacket)
requirements:
- pynetbox
version_added: "2.8"
options:
netbox_url:
description:
- URL of the Netbox instance resolvable by Ansible control host
required: true
type: str
netbox_token:
description:
- The token created within Netbox to authorize API access
required: true
type: str
data:
description:
- Defines the site configuration
suboptions:
name:
description:
- Name of the site to be created
required: true
type: str
status:
description:
- Status of the site
choices:
- Active
- Planned
- Retired
type: str
region:
description:
- The region that the site should be associated with
type: str
tenant:
description:
- The tenant the site will be assigned to
type: str
facility:
description:
- Data center provider or facility, ex. Equinix NY7
type: str
asn:
description:
- The ASN associated with the site
type: int
time_zone:
description:
- Timezone associated with the site, ex. America/Denver
type: str
description:
description:
- The description of the prefix
type: str
physical_address:
description:
- Physical address of site
type: str
shipping_address:
description:
- Shipping address of site
type: str
latitude:
description:
- Latitude in decimal format
type: int
longitude:
description:
- Longitude in decimal format
type: int
contact_name:
description:
- Name of contact for site
type: str
contact_phone:
description:
- Contact phone number for site
type: str
contact_email:
description:
- Contact email for site
type: str
comments:
description:
- Comments for the site. This can be markdown syntax
type: str
tags:
description:
- Any tags that the prefix may need to be associated with
type: list
custom_fields:
description:
- must exist in Netbox
type: dict
required: true
state:
description:
- Use C(present) or C(absent) for adding or removing.
choices: [ absent, present ]
default: present
type: str
validate_certs:
description:
- |
If C(no), SSL certificates will not be validated.
This should only be used on personally controlled sites using self-signed certificates.
default: "yes"
type: bool
"""
EXAMPLES = r"""
- name: "Test Netbox site module"
connection: local
hosts: localhost
gather_facts: False
tasks:
- name: Create site within Netbox with only required information
netbox_site:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
name: Test - Colorado
state: present
- name: Delete site within netbox
netbox_site:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
name: Test - Colorado
state: absent
- name: Create site with all parameters
netbox_site:
netbox_url: http://netbox.local
netbox_token: thisIsMyToken
data:
name: Test - California
status: Planned
region: Test Region
tenant: Test Tenant
facility: EquinoxCA7
asn: 65001
time_zone: America/Los Angeles
description: This is a test description
physical_address: Hollywood, CA, 90210
shipping_address: Hollywood, CA, 90210
latitude: 10.100000
longitude: 12.200000
contact_name: Jenny
contact_phone: 867-5309
contact_email: jenny@changednumber.com
comments: ### Placeholder
state: present
"""
RETURN = r"""
site:
description: Serialized object as created or already existent within Netbox
returned: on creation
type: dict
msg:
description: Message indicating failure or info about what has been achieved
returned: always
type: str
"""
import json
import traceback
from ansible.module_utils.basic import AnsibleModule, missing_required_lib
from ansible.module_utils.net_tools.netbox.netbox_utils import (
find_ids,
normalize_data,
create_netbox_object,
delete_netbox_object,
update_netbox_object,
SITE_STATUS,
)
from ansible.module_utils.compat import ipaddress
from ansible.module_utils._text import to_text
PYNETBOX_IMP_ERR = None
try:
import pynetbox
HAS_PYNETBOX = True
except ImportError:
PYNETBOX_IMP_ERR = traceback.format_exc()
HAS_PYNETBOX = False
def main():
"""
Main entry point for module execution
"""
argument_spec = dict(
netbox_url=dict(type="str", required=True),
netbox_token=dict(type="str", required=True, no_log=True),
data=dict(type="dict", required=True),
state=dict(required=False, default="present", choices=["present", "absent"]),
validate_certs=dict(type="bool", default=True)
)
global module
module = AnsibleModule(argument_spec=argument_spec,
supports_check_mode=True)
# Fail module if pynetbox is not installed
if not HAS_PYNETBOX:
module.fail_json(msg=missing_required_lib('pynetbox'), exception=PYNETBOX_IMP_ERR)
# Assign variables to be used with module
app = "dcim"
endpoint = "sites"
url = module.params["netbox_url"]
token = module.params["netbox_token"]
data = module.params["data"]
state = module.params["state"]
validate_certs = module.params["validate_certs"]
# Attempt to create Netbox API object
try:
nb = pynetbox.api(url, token=token, ssl_verify=validate_certs)
except Exception:
module.fail_json(msg="Failed to establish connection to Netbox API")
try:
nb_app = getattr(nb, app)
except AttributeError:
module.fail_json(msg="Incorrect application specified: %s" % (app))
nb_endpoint = getattr(nb_app, endpoint)
norm_data = normalize_data(data)
try:
norm_data = _check_and_adapt_data(nb, norm_data)
if "present" in state:
return module.exit_json(
**ensure_site_present(nb, nb_endpoint, norm_data)
)
else:
return module.exit_json(
**ensure_site_absent(nb, nb_endpoint, norm_data)
)
except pynetbox.RequestError as e:
return module.fail_json(msg=json.loads(e.error))
except ValueError as e:
return module.fail_json(msg=str(e))
except AttributeError as e:
return module.fail_json(msg=str(e))
def _check_and_adapt_data(nb, data):
data = find_ids(nb, data)
if data.get("status"):
data["status"] = SITE_STATUS.get(data["status"].lower())
if "-" in data["name"]:
site_slug = data["name"].replace(" ", "").lower()
elif " " in data["name"]:
site_slug = data["name"].replace(" ", "-").lower()
else:
site_slug = data["name"].lower()
data["slug"] = site_slug
return data
def ensure_site_present(nb, nb_endpoint, data):
"""
:returns dict(interface, msg, changed): dictionary resulting of the request,
where 'site' is the serialized interface fetched or newly created in Netbox
"""
if not isinstance(data, dict):
changed = False
return {"msg": data, "changed": changed}
nb_site = nb_endpoint.get(slug=data["slug"])
result = dict()
if not nb_site:
site, diff = create_netbox_object(nb_endpoint, data, module.check_mode)
changed = True
msg = "Site %s created" % (data["name"])
result["diff"] = diff
else:
site, diff = update_netbox_object(nb_site, data, module.check_mode)
if site is False:
module.fail_json(
msg="Request failed, couldn't update device: %s" % (data["name"])
)
if diff:
msg = "Site %s updated" % (data["name"])
changed = True
result["diff"] = diff
else:
msg = "Site %s already exists" % (data["name"])
changed = False
result.update({"site": site, "msg": msg, "changed": changed})
return result
def ensure_site_absent(nb, nb_endpoint, data):
"""
:returns dict(msg, changed)
"""
nb_site = nb_endpoint.get(slug=data["slug"])
result = dict()
if nb_site:
dummy, diff = delete_netbox_object(nb_site, module.check_mode)
changed = True
msg = "Site %s deleted" % (data["name"])
result["diff"] = diff
else:
msg = "Site %s already absent" % (data["name"])
changed = False
result.update({"msg": msg, "changed": changed})
return result
if __name__ == "__main__":
main()

View file

@ -1,499 +0,0 @@
# Copyright (c) 2018 Remy Leone
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
DOCUMENTATION = '''
name: netbox
plugin_type: inventory
author:
- Remy Leone (@sieben)
- Anthony Ruhier (@Anthony25)
- Nikhil Singh Baliyan (@nikkytub)
- Sander Steffann (@steffann)
short_description: NetBox inventory source
description:
- Get inventory hosts from NetBox
extends_documentation_fragment:
- constructed
- inventory_cache
options:
plugin:
description: token that ensures this is a source file for the 'netbox' plugin.
required: True
choices: ['netbox']
api_endpoint:
description: Endpoint of the NetBox API
required: True
env:
- name: NETBOX_API
validate_certs:
description:
- Allows connection when SSL certificates are not valid. Set to C(false) when certificates are not trusted.
default: True
type: boolean
config_context:
description:
- If True, it adds config-context in host vars.
- Config-context enables the association of arbitrary data to devices and virtual machines grouped by
region, site, role, platform, and/or tenant. Please check official netbox docs for more info.
default: False
type: boolean
token:
required: True
description: NetBox token.
env:
# in order of precedence
- name: NETBOX_TOKEN
- name: NETBOX_API_KEY
group_by:
description: Keys used to create groups.
type: list
choices:
- sites
- tenants
- racks
- tags
- device_roles
- device_types
- manufacturers
- platforms
default: []
query_filters:
description: List of parameters passed to the query string (Multiple values may be separated by commas)
type: list
default: []
timeout:
description: Timeout for Netbox requests in seconds
type: int
default: 60
compose:
description: List of custom ansible host vars to create from the device object fetched from NetBox
default: {}
type: dict
'''
EXAMPLES = '''
# netbox_inventory.yml file in YAML format
# Example command line: ansible-inventory -v --list -i netbox_inventory.yml
plugin: netbox
api_endpoint: http://localhost:8000
validate_certs: True
config_context: False
group_by:
- device_roles
query_filters:
- role: network-edge-router
# Query filters are passed directly as an argument to the fetching queries.
# You can repeat tags in the query string.
query_filters:
- role: server
- tag: web
- tag: production
# See the NetBox documentation at https://netbox.readthedocs.io/en/latest/api/overview/
# the query_filters work as a logical **OR**
#
# Prefix any custom fields with cf_ and pass the field value with the regular NetBox query string
query_filters:
- cf_foo: bar
# NetBox inventory plugin also supports Constructable semantics
# You can fill your hosts vars using the compose option:
plugin: netbox
compose:
foo: last_updated
bar: display_name
nested_variable: rack.display_name
'''
import json
import uuid
from sys import version as python_version
from threading import Thread
from itertools import chain
from ansible.plugins.inventory import BaseInventoryPlugin, Constructable, Cacheable
from ansible.module_utils.ansible_release import __version__ as ansible_version
from ansible.errors import AnsibleError
from ansible.module_utils._text import to_text
from ansible.module_utils.urls import open_url
from ansible.module_utils.six.moves.urllib.parse import urlencode
from ansible.module_utils.compat.ipaddress import ip_interface
ALLOWED_DEVICE_QUERY_PARAMETERS = (
"asset_tag",
"cluster_id",
"device_type_id",
"has_primary_ip",
"is_console_server",
"is_full_depth",
"is_network_device",
"is_pdu",
"mac_address",
"manufacturer",
"manufacturer_id",
"model",
"name",
"platform",
"platform_id",
"position",
"rack_group_id",
"rack_id",
"role",
"role_id",
"serial",
"site",
"site_id",
"status",
"tag",
"tenant",
"tenant_id",
"virtual_chassis_id",
)
class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
NAME = 'netbox'
def _fetch_information(self, url):
results = None
cache_key = self.get_cache_key(url)
# get the user's cache option to see if we should save the cache if it is changing
user_cache_setting = self.get_option('cache')
# read if the user has caching enabled and the cache isn't being refreshed
attempt_to_read_cache = user_cache_setting and self.use_cache
# attempt to read the cache if inventory isn't being refreshed and the user has caching enabled
if attempt_to_read_cache:
try:
results = self._cache[cache_key]
need_to_fetch = False
except KeyError:
# occurs if the cache_key is not in the cache or if the cache_key expired
# we need to fetch the URL now
need_to_fetch = True
else:
# not reading from cache so do fetch
need_to_fetch = True
if need_to_fetch:
self.display.v("Fetching: " + url)
response = open_url(url, headers=self.headers, timeout=self.timeout, validate_certs=self.validate_certs)
try:
raw_data = to_text(response.read(), errors='surrogate_or_strict')
except UnicodeError:
raise AnsibleError("Incorrect encoding of fetched payload from NetBox API.")
try:
results = json.loads(raw_data)
except ValueError:
raise AnsibleError("Incorrect JSON payload: %s" % raw_data)
# put result in cache if enabled
if user_cache_setting:
self._cache[cache_key] = results
return results
def get_resource_list(self, api_url):
"""Retrieves resource list from netbox API.
Returns:
A list of all resource from netbox API.
"""
if not api_url:
raise AnsibleError("Please check API URL in script configuration file.")
hosts_list = []
# Pagination.
while api_url:
self.display.v("Fetching: " + api_url)
# Get hosts list.
api_output = self._fetch_information(api_url)
hosts_list += api_output["results"]
api_url = api_output["next"]
# Get hosts list.
return hosts_list
@property
def group_extractors(self):
return {
"sites": self.extract_site,
"tenants": self.extract_tenant,
"racks": self.extract_rack,
"tags": self.extract_tags,
"disk": self.extract_disk,
"memory": self.extract_memory,
"vcpus": self.extract_vcpus,
"device_roles": self.extract_device_role,
"platforms": self.extract_platform,
"device_types": self.extract_device_type,
"config_context": self.extract_config_context,
"manufacturers": self.extract_manufacturer
}
def extract_disk(self, host):
return host.get("disk")
def extract_vcpus(self, host):
return host.get("vcpus")
def extract_memory(self, host):
return host.get("memory")
def extract_platform(self, host):
try:
return [self.platforms_lookup[host["platform"]["id"]]]
except Exception:
return
def extract_device_type(self, host):
try:
return [self.device_types_lookup[host["device_type"]["id"]]]
except Exception:
return
def extract_rack(self, host):
try:
return [self.racks_lookup[host["rack"]["id"]]]
except Exception:
return
def extract_site(self, host):
try:
return [self.sites_lookup[host["site"]["id"]]]
except Exception:
return
def extract_tenant(self, host):
try:
return [self.tenants_lookup[host["tenant"]["id"]]]
except Exception:
return
def extract_device_role(self, host):
try:
if 'device_role' in host:
return [self.device_roles_lookup[host["device_role"]["id"]]]
elif 'role' in host:
return [self.device_roles_lookup[host["role"]["id"]]]
except Exception:
return
def extract_config_context(self, host):
try:
return [host["config_context"]]
except Exception:
return
def extract_manufacturer(self, host):
try:
return [self.manufacturers_lookup[host["device_type"]["manufacturer"]["id"]]]
except Exception:
return
def extract_primary_ip(self, host):
try:
address = host["primary_ip"]["address"]
return str(ip_interface(address).ip)
except Exception:
return
def extract_primary_ip4(self, host):
try:
address = host["primary_ip4"]["address"]
return str(ip_interface(address).ip)
except Exception:
return
def extract_primary_ip6(self, host):
try:
address = host["primary_ip6"]["address"]
return str(ip_interface(address).ip)
except Exception:
return
def extract_tags(self, host):
return host["tags"]
def refresh_platforms_lookup(self):
url = self.api_endpoint + "/api/dcim/platforms/?limit=0"
platforms = self.get_resource_list(api_url=url)
self.platforms_lookup = dict((platform["id"], platform["name"]) for platform in platforms)
def refresh_sites_lookup(self):
url = self.api_endpoint + "/api/dcim/sites/?limit=0"
sites = self.get_resource_list(api_url=url)
self.sites_lookup = dict((site["id"], site["name"]) for site in sites)
def refresh_regions_lookup(self):
url = self.api_endpoint + "/api/dcim/regions/?limit=0"
regions = self.get_resource_list(api_url=url)
self.regions_lookup = dict((region["id"], region["name"]) for region in regions)
def refresh_tenants_lookup(self):
url = self.api_endpoint + "/api/tenancy/tenants/?limit=0"
tenants = self.get_resource_list(api_url=url)
self.tenants_lookup = dict((tenant["id"], tenant["name"]) for tenant in tenants)
def refresh_racks_lookup(self):
url = self.api_endpoint + "/api/dcim/racks/?limit=0"
racks = self.get_resource_list(api_url=url)
self.racks_lookup = dict((rack["id"], rack["name"]) for rack in racks)
def refresh_device_roles_lookup(self):
url = self.api_endpoint + "/api/dcim/device-roles/?limit=0"
device_roles = self.get_resource_list(api_url=url)
self.device_roles_lookup = dict((device_role["id"], device_role["name"]) for device_role in device_roles)
def refresh_device_types_lookup(self):
url = self.api_endpoint + "/api/dcim/device-types/?limit=0"
device_types = self.get_resource_list(api_url=url)
self.device_types_lookup = dict((device_type["id"], device_type["model"]) for device_type in device_types)
def refresh_manufacturers_lookup(self):
url = self.api_endpoint + "/api/dcim/manufacturers/?limit=0"
manufacturers = self.get_resource_list(api_url=url)
self.manufacturers_lookup = dict((manufacturer["id"], manufacturer["name"]) for manufacturer in manufacturers)
def refresh_lookups(self):
lookup_processes = (
self.refresh_sites_lookup,
self.refresh_regions_lookup,
self.refresh_tenants_lookup,
self.refresh_racks_lookup,
self.refresh_device_roles_lookup,
self.refresh_platforms_lookup,
self.refresh_device_types_lookup,
self.refresh_manufacturers_lookup,
)
thread_list = []
for p in lookup_processes:
t = Thread(target=p)
thread_list.append(t)
t.start()
for thread in thread_list:
thread.join()
def validate_query_parameters(self, x):
if not (isinstance(x, dict) and len(x) == 1):
self.display.warning("Warning query parameters %s not a dict with a single key." % x)
return
k = tuple(x.keys())[0]
v = tuple(x.values())[0]
if not (k in ALLOWED_DEVICE_QUERY_PARAMETERS or k.startswith("cf_")):
msg = "Warning: %s not in %s or starting with cf (Custom field)" % (k, ALLOWED_DEVICE_QUERY_PARAMETERS)
self.display.warning(msg=msg)
return
return k, v
def refresh_url(self):
query_parameters = [("limit", 0)]
if self.query_filters:
query_parameters.extend(filter(lambda x: x,
map(self.validate_query_parameters, self.query_filters)))
if self.config_context:
self.device_url = self.api_endpoint + "/api/dcim/devices/?" + urlencode(query_parameters)
self.virtual_machines_url = self.api_endpoint + "/api/virtualization/virtual-machines/?" + urlencode(query_parameters)
else:
self.device_url = self.api_endpoint + "/api/dcim/devices/?" + urlencode(query_parameters) + "&exclude=config_context"
self.virtual_machines_url = self.api_endpoint + "/api/virtualization/virtual-machines/?" + urlencode(query_parameters) + "&exclude=config_context"
def fetch_hosts(self):
return chain(
self.get_resource_list(self.device_url),
self.get_resource_list(self.virtual_machines_url),
)
def extract_name(self, host):
# An host in an Ansible inventory requires an hostname.
# name is an unique but not required attribute for a device in NetBox
# We default to an UUID for hostname in case the name is not set in NetBox
return host["name"] or str(uuid.uuid4())
def add_host_to_groups(self, host, hostname):
for group in self.group_by:
sub_groups = self.group_extractors[group](host)
if not sub_groups:
continue
for sub_group in sub_groups:
group_name = "_".join([group, sub_group])
self.inventory.add_group(group=group_name)
self.inventory.add_host(group=group_name, host=hostname)
def _fill_host_variables(self, host, hostname):
for attribute, extractor in self.group_extractors.items():
if not extractor(host):
continue
self.inventory.set_variable(hostname, attribute, extractor(host))
if self.extract_primary_ip(host):
self.inventory.set_variable(hostname, "ansible_host", self.extract_primary_ip(host=host))
if self.extract_primary_ip4(host):
self.inventory.set_variable(hostname, "primary_ip4", self.extract_primary_ip4(host=host))
if self.extract_primary_ip6(host):
self.inventory.set_variable(hostname, "primary_ip6", self.extract_primary_ip6(host=host))
def main(self):
self.refresh_lookups()
self.refresh_url()
hosts_list = self.fetch_hosts()
for host in hosts_list:
hostname = self.extract_name(host=host)
self.inventory.add_host(host=hostname)
self._fill_host_variables(host=host, hostname=hostname)
strict = self.get_option("strict")
# Composed variables
self._set_composite_vars(self.get_option('compose'), host, hostname, strict=strict)
# Complex groups based on jinja2 conditionals, hosts that meet the conditional are added to group
self._add_host_to_composed_groups(self.get_option('groups'), host, hostname, strict=strict)
# Create groups based on variable values and add the corresponding hosts to it
self._add_host_to_keyed_groups(self.get_option('keyed_groups'), host, hostname, strict=strict)
self.add_host_to_groups(host=host, hostname=hostname)
def parse(self, inventory, loader, path, cache=True):
super(InventoryModule, self).parse(inventory, loader, path)
self._read_config_data(path=path)
self.use_cache = cache
# Netbox access
token = self.get_option("token")
# Handle extra "/" from api_endpoint configuration and trim if necessary, see PR#49943
self.api_endpoint = self.get_option("api_endpoint").strip('/')
self.timeout = self.get_option("timeout")
self.validate_certs = self.get_option("validate_certs")
self.config_context = self.get_option("config_context")
self.headers = {
'Authorization': "Token %s" % token,
'User-Agent': "ansible %s Python %s" % (ansible_version, python_version.split(' ')[0]),
'Content-type': 'application/json'
}
# Filter and group_by options
self.group_by = self.get_option("group_by")
self.query_filters = self.get_option("query_filters")
self.main()

View file

@ -86,7 +86,6 @@ lib/ansible/module_utils/gcp_utils.py future-import-boilerplate
lib/ansible/module_utils/gcp_utils.py metaclass-boilerplate
lib/ansible/module_utils/json_utils.py future-import-boilerplate
lib/ansible/module_utils/json_utils.py metaclass-boilerplate
lib/ansible/module_utils/net_tools/netbox/netbox_utils.py future-import-boilerplate
lib/ansible/module_utils/netapp.py future-import-boilerplate
lib/ansible/module_utils/netapp.py metaclass-boilerplate
lib/ansible/module_utils/netapp_elementsw_module.py future-import-boilerplate
@ -1509,14 +1508,6 @@ lib/ansible/modules/net_tools/basics/uri.py pylint:blacklisted-name
lib/ansible/modules/net_tools/basics/uri.py validate-modules:doc-required-mismatch
lib/ansible/modules/net_tools/basics/uri.py validate-modules:parameter-list-no-elements
lib/ansible/modules/net_tools/basics/uri.py validate-modules:parameter-type-not-in-doc
lib/ansible/modules/net_tools/netbox/netbox_device.py validate-modules:doc-missing-type
lib/ansible/modules/net_tools/netbox/netbox_device.py validate-modules:parameter-type-not-in-doc
lib/ansible/modules/net_tools/netbox/netbox_interface.py validate-modules:parameter-type-not-in-doc
lib/ansible/modules/net_tools/netbox/netbox_ip_address.py validate-modules:doc-missing-type
lib/ansible/modules/net_tools/netbox/netbox_ip_address.py validate-modules:parameter-type-not-in-doc
lib/ansible/modules/net_tools/netbox/netbox_prefix.py validate-modules:doc-missing-type
lib/ansible/modules/net_tools/netbox/netbox_prefix.py validate-modules:parameter-type-not-in-doc
lib/ansible/modules/net_tools/netbox/netbox_site.py validate-modules:parameter-type-not-in-doc
lib/ansible/modules/network/aci/aci_aaa_user.py validate-modules:doc-required-mismatch
lib/ansible/modules/network/aci/aci_aaa_user_certificate.py validate-modules:doc-required-mismatch
lib/ansible/modules/network/aci/aci_access_port_block_to_access_port.py validate-modules:doc-required-mismatch

View file

@ -1,152 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2019, Bruno Inec (@sweenu) <bruno@inec.fr>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
import pytest
from ansible.module_utils.net_tools.netbox.netbox_utils import (
QUERY_TYPES,
_build_diff,
create_netbox_object,
delete_netbox_object,
update_netbox_object,
normalize_data,
)
def test_normalize_data():
assert "name" not in QUERY_TYPES
assert QUERY_TYPES.get("rack") == "slug"
assert QUERY_TYPES.get("primary_ip") != "slug"
raw_data = {
"name": "Some name",
"primary_ip": "10.3.72.74/31",
"rack": "Some rack",
}
normalized_data = raw_data.copy()
normalized_data["rack"] = "some-rack"
assert normalize_data(raw_data) == normalized_data
def test_build_diff():
before = "The state before"
after = {"A": "more", "complicated": "state"}
diff = _build_diff(before=before, after=after)
assert diff == {"before": before, "after": after}
@pytest.fixture
def nb_obj_mock(mocker):
serialized_object = {"The serialized": "object"}
nb_obj = mocker.Mock(name="nb_obj_mock")
nb_obj.delete.return_value = True
nb_obj.update.return_value = True
nb_obj.update.side_effect = serialized_object.update
nb_obj.serialize.return_value = serialized_object
return nb_obj
@pytest.fixture
def endpoint_mock(mocker, nb_obj_mock):
endpoint = mocker.Mock(name="endpoint_mock")
endpoint.create.return_value = nb_obj_mock
return endpoint
@pytest.fixture
def on_creation_diff():
return _build_diff(before={"state": "absent"}, after={"state": "present"})
@pytest.fixture
def on_deletion_diff():
return _build_diff(before={"state": "present"}, after={"state": "absent"})
@pytest.fixture
def data():
return {"name": "Some Netbox object name"}
def test_create_netbox_object(endpoint_mock, data, on_creation_diff):
return_value = endpoint_mock.create().serialize()
serialized_obj, diff = create_netbox_object(
endpoint_mock, data, check_mode=False
)
assert endpoint_mock.create.called_once_with(data)
assert serialized_obj == return_value
assert diff == on_creation_diff
def test_create_netbox_object_in_check_mode(endpoint_mock, data, on_creation_diff):
serialized_obj, diff = create_netbox_object(
endpoint_mock, data, check_mode=True
)
assert endpoint_mock.create.not_called()
assert serialized_obj == data
assert diff == on_creation_diff
def test_delete_netbox_object(nb_obj_mock, on_deletion_diff):
serialized_obj, diff = delete_netbox_object(nb_obj_mock, check_mode=False)
assert nb_obj_mock.delete.called_once()
assert serialized_obj == nb_obj_mock.serialize()
assert diff == on_deletion_diff
def test_delete_netbox_object_in_check_mode(nb_obj_mock, on_deletion_diff):
serialized_obj, diff = delete_netbox_object(nb_obj_mock, check_mode=True)
assert nb_obj_mock.delete.not_called()
assert serialized_obj == nb_obj_mock.serialize()
assert diff == on_deletion_diff
def test_update_netbox_object_no_changes(nb_obj_mock):
unchanged_data = nb_obj_mock.serialize()
serialized_obj, diff = update_netbox_object(nb_obj_mock, unchanged_data, check_mode=True)
assert nb_obj_mock.update.not_called()
assert serialized_obj == unchanged_data
assert diff is None
@pytest.fixture
def changed_serialized_obj(nb_obj_mock):
changed_serialized_obj = nb_obj_mock.serialize().copy()
changed_serialized_obj[list(changed_serialized_obj.keys())[0]] += " (modified)"
return changed_serialized_obj
@pytest.fixture
def on_update_diff(nb_obj_mock, changed_serialized_obj):
return _build_diff(before=nb_obj_mock.serialize().copy(), after=changed_serialized_obj)
def test_update_netbox_object_with_changes(
nb_obj_mock, changed_serialized_obj, on_update_diff
):
serialized_obj, diff = update_netbox_object(
nb_obj_mock, changed_serialized_obj, check_mode=False
)
assert nb_obj_mock.update.called_once_with(changed_serialized_obj)
assert serialized_obj == nb_obj_mock.serialize()
assert diff == on_update_diff
def test_update_netbox_object_with_changes_in_check_mode(
nb_obj_mock, changed_serialized_obj, on_update_diff
):
updated_serialized_obj = nb_obj_mock.serialize().copy()
updated_serialized_obj.update(changed_serialized_obj)
serialized_obj, diff = update_netbox_object(
nb_obj_mock, changed_serialized_obj, check_mode=True
)
assert nb_obj_mock.update.not_called()
assert serialized_obj == updated_serialized_obj
assert diff == on_update_diff