FortiManager Plugin Module Conversion: fmgr_query (#52770)

* Auto Commit for: fmgr_query

* Auto Commit for: fmgr_query
This commit is contained in:
ftntcorecse 2019-03-04 00:36:12 -05:00 committed by Nilashish Chakraborty
parent c4d29868ca
commit a8f1ad8602
3 changed files with 330 additions and 122 deletions

View file

@ -29,6 +29,8 @@ DOCUMENTATION = '''
---
module: fmgr_query
version_added: "2.8"
notes:
- Full Documentation at U(https://ftnt-ansible-docs.readthedocs.io/en/latest/).
author: Luke Weighall (@lweighall)
short_description: Query FortiManager data objects for use in Ansible workflows.
description:
@ -41,21 +43,6 @@ options:
required: false
default: root
host:
description:
- The FortiManager's address.
required: true
username:
description:
- The username used to authenticate with the FortiManager.
required: false
password:
description:
- The password associated with the username account.
required: false
object:
description:
- The data object we wish to query (device, package, rule, etc). Will expand choices as improves.
@ -108,36 +95,24 @@ options:
EXAMPLES = '''
- name: QUERY FORTIGATE DEVICE BY IP
fmgr_query:
host: "{{inventory_hostname}}"
username: "{{ username }}"
password: "{{ password }}"
object: "device"
adom: "ansible"
device_ip: "10.7.220.41"
- name: QUERY FORTIGATE DEVICE BY SERIAL
fmgr_query:
host: "{{inventory_hostname}}"
username: "{{ username }}"
password: "{{ password }}"
adom: "ansible"
object: "device"
device_serial: "FGVM000000117992"
- name: QUERY FORTIGATE DEVICE BY FRIENDLY NAME
fmgr_query:
host: "{{inventory_hostname}}"
username: "{{ username }}"
password: "{{ password }}"
adom: "ansible"
object: "device"
device_unique_name: "ansible-fgt01"
- name: VERIFY CLUSTER MEMBERS AND STATUS
fmgr_query:
host: "{{inventory_hostname}}"
username: "{{ username }}"
password: "{{ password }}"
adom: "ansible"
object: "cluster_nodes"
device_unique_name: "fgt-cluster01"
@ -145,18 +120,12 @@ EXAMPLES = '''
- name: GET STATUS OF TASK ID
fmgr_query:
host: "{{inventory_hostname}}"
username: "{{ username }}"
password: "{{ password }}"
adom: "ansible"
object: "task"
task_id: "3"
- name: USE CUSTOM TYPE TO QUERY AVAILABLE SCRIPTS
fmgr_query:
host: "{{inventory_hostname}}"
username: "{{ username }}"
password: "{{ password }}"
adom: "ansible"
object: "custom"
custom_endpoint: "/dvmdb/adom/ansible/script"
@ -171,19 +140,23 @@ api_result:
"""
from ansible.module_utils.basic import AnsibleModule, env_fallback
from ansible.module_utils.network.fortimanager.fortimanager import AnsibleFortiManager
# Check for pyFMG lib
try:
from pyFMG.fortimgr import FortiManager
HAS_PYFMGR = True
except ImportError:
HAS_PYFMGR = False
from ansible.module_utils.connection import Connection
from ansible.module_utils.network.fortimanager.fortimanager import FortiManagerHandler
from ansible.module_utils.network.fortimanager.common import FMGBaseException
from ansible.module_utils.network.fortimanager.common import FMGRCommon
from ansible.module_utils.network.fortimanager.common import FMGRMethods
from ansible.module_utils.network.fortimanager.common import DEFAULT_RESULT_OBJ
from ansible.module_utils.network.fortimanager.common import FAIL_SOCKET_MSG
def fmgr_get_custom(fmg, paramgram):
def fmgr_get_custom(fmgr, paramgram):
"""
This method is used to perform very custom queries ad-hoc
:param fmgr: The fmgr object instance from fortimanager.py
:type fmgr: class object
:param paramgram: The formatted dictionary of options to process
:type paramgram: dict
:return: The response from the FortiManager
:rtype: dict
"""
# IF THE CUSTOM DICTIONARY (OFTEN CONTAINING FILTERS) IS DEFINED CREATED THAT
if paramgram["custom_dict"] is not None:
@ -194,13 +167,18 @@ def fmgr_get_custom(fmg, paramgram):
# SET THE CUSTOM ENDPOINT PROVIDED
url = paramgram["custom_endpoint"]
# MAKE THE CALL AND RETURN RESULTS
response = fmg.get(url, datagram)
response = fmgr.process_request(url, datagram, FMGRMethods.GET)
return response
def fmgr_get_task_status(fmg, paramgram):
def fmgr_get_task_status(fmgr, paramgram):
"""
This method is used to get information on tasks within the FortiManager Task Manager
:param fmgr: The fmgr object instance from fortimanager.py
:type fmgr: class object
:param paramgram: The formatted dictionary of options to process
:type paramgram: dict
:return: The response from the FortiManager
:rtype: dict
"""
# IF THE TASK_ID IS DEFINED, THEN GET THAT SPECIFIC TASK
# OTHERWISE, GET ALL RECENT TASKS IN A LIST
@ -210,23 +188,34 @@ def fmgr_get_task_status(fmg, paramgram):
"adom": paramgram["adom"]
}
url = '/task/task/{task_id}'.format(task_id=paramgram["task_id"])
response = fmg.get(url, datagram)
response = fmgr.process_request(url, datagram, FMGRMethods.GET)
else:
datagram = {
"adom": paramgram["adom"]
}
url = '/task/task'
response = fmg.get(url, datagram)
response = fmgr.process_request(url, datagram, FMGRMethods.GET)
return response
def fmgr_get_device(fmg, paramgram):
def fmgr_get_device(fmgr, paramgram):
"""
This method is used to get information on devices. This will not work on HA_SLAVE nodes, only top level devices.
Such as cluster objects and standalone devices.
:param fmgr: The fmgr object instance from fortimanager.py
:type fmgr: class object
:param paramgram: The formatted dictionary of options to process
:type paramgram: dict
:return: The response from the FortiManager
:rtype: dict
"""
# FIRST TRY TO RUN AN UPDATE ON THE DEVICE
# RUN A QUICK CLUSTER REFRESH/UPDATE ATTEMPT TO ENSURE WE'RE GETTING THE LATEST INFORMOATION
response = DEFAULT_RESULT_OBJ
url = ""
datagram = {}
update_url = '/dvm/cmd/update/device'
update_dict = {
"adom": paramgram['adom'],
@ -234,7 +223,7 @@ def fmgr_get_device(fmg, paramgram):
"flags": "create_task"
}
# DO THE UPDATE CALL
update_call = fmg.execute(update_url, update_dict)
fmgr.process_request(update_url, update_dict, FMGRMethods.EXEC)
# SET THE URL
url = '/dvmdb/adom/{adom}/device'.format(adom=paramgram["adom"])
@ -246,7 +235,7 @@ def fmgr_get_device(fmg, paramgram):
datagram = {
"filter": ["sn", "==", paramgram["device_serial"]]
}
response = fmg.get(url, datagram)
response = fmgr.process_request(url, datagram, FMGRMethods.GET)
if len(response[1]) >= 0:
device_found = 1
@ -255,7 +244,7 @@ def fmgr_get_device(fmg, paramgram):
datagram = {
"filter": ["name", "==", paramgram["device_unique_name"]]
}
response = fmg.get(url, datagram)
response = fmgr.process_request(url, datagram, FMGRMethods.GET)
if len(response[1]) >= 0:
device_found = 1
@ -264,20 +253,31 @@ def fmgr_get_device(fmg, paramgram):
datagram = {
"filter": ["ip", "==", paramgram["device_ip"]]
}
response = fmg.get(url, datagram)
response = fmgr.process_request(url, datagram, FMGRMethods.GET)
if len(response[1]) >= 0:
device_found = 1
return response
def fmgr_get_cluster_nodes(fmg, paramgram):
def fmgr_get_cluster_nodes(fmgr, paramgram):
"""
This method is used to get information on devices. This WILL work on HA_SLAVE nodes, but NOT top level standalone
devices.
Such as cluster objects and standalone devices.
:param fmgr: The fmgr object instance from fortimanager.py
:type fmgr: class object
:param paramgram: The formatted dictionary of options to process
:type paramgram: dict
:return: The response from the FortiManager
:rtype: dict
"""
response = DEFAULT_RESULT_OBJ
url = ""
datagram = {}
# USE THE DEVICE METHOD TO GET THE CLUSTER INFORMATION SO WE CAN SEE THE HA_SLAVE NODES
response = fmgr_get_device(fmg, paramgram)
response = fmgr_get_device(fmgr, paramgram)
# CHECK FOR HA_SLAVE NODES, IF CLUSTER IS MISSING COMPLETELY THEN QUIT
try:
returned_nodes = response[1][0]["ha_slave"]
@ -332,11 +332,7 @@ def fmgr_get_cluster_nodes(fmg, paramgram):
def main():
argument_spec = dict(
adom=dict(required=False, type="str", default="root"),
host=dict(required=True, type="str"),
username=dict(fallback=(env_fallback, ["ANSIBLE_NET_USERNAME"])),
password=dict(fallback=(env_fallback, ["ANSIBLE_NET_PASSWORD"]), no_log=True),
object=dict(required=True, type="str", choices=["device", "cluster_nodes", "task", "custom"]),
custom_endpoint=dict(required=False, type="str"),
custom_dict=dict(required=False, type="dict"),
device_ip=dict(required=False, type="str"),
@ -346,23 +342,7 @@ def main():
task_id=dict(required=False, type="str")
)
module = AnsibleModule(argument_spec, supports_check_mode=True, )
# CHECK IF THE HOST/USERNAME/PW EXISTS, AND IF IT DOES, LOGIN.
host = module.params["host"]
username = module.params["username"]
if host is None or username is None:
module.fail_json(msg="Host and username are required")
# CHECK IF LOGIN FAILED
fmg = AnsibleFortiManager(module, module.params["host"], module.params["username"], module.params["password"])
response = fmg.login()
if response[1]['status']['code'] != 0:
module.fail_json(msg="Connection to FortiManager Failed")
# START SESSION LOGIC
# MODULE PARAMGRAM
module = AnsibleModule(argument_spec=argument_spec, supports_check_mode=False, )
paramgram = {
"adom": module.params["adom"],
"object": module.params["object"],
@ -374,56 +354,77 @@ def main():
"custom_endpoint": module.params["custom_endpoint"],
"custom_dict": module.params["custom_dict"]
}
module.paramgram = paramgram
fmgr = None
if module._socket_path:
connection = Connection(module._socket_path)
fmgr = FortiManagerHandler(connection, module)
fmgr.tools = FMGRCommon()
else:
module.fail_json(**FAIL_SOCKET_MSG)
# IF OBJECT IS DEVICE
if paramgram["object"] == "device" and any(v is not None for v in [paramgram["device_unique_name"],
paramgram["device_serial"], paramgram["device_ip"]]):
results = fmgr_get_device(fmg, paramgram)
if results[0] not in [0]:
module.fail_json(msg="Device query failed!")
elif len(results[1]) == 0:
module.exit_json(msg="Device NOT FOUND!")
else:
module.exit_json(msg="Device Found", **results[1][0])
results = DEFAULT_RESULT_OBJ
# IF OBJECT IS CLUSTER_NODES
if paramgram["object"] == "cluster_nodes" and paramgram["nodes"] is not None:
results = fmgr_get_cluster_nodes(fmg, paramgram)
if results["cluster_status"] == "MISSING":
module.exit_json(msg="No cluster device found!", **results)
elif results["query_status"] == "good":
module.exit_json(msg="Cluster Found - Showing Nodes", **results)
elif results is None:
module.fail_json(msg="Query FAILED -- Check module or playbook syntax")
# IF OBJECT IS TASK
if paramgram["object"] == "task":
results = fmgr_get_task_status(fmg, paramgram)
if results[0] != 0:
module.fail_json(msg="QUERY FAILED -- Is FMGR online? Good Creds?")
if results[0] == 0:
module.exit_json(msg="Task Found", **results[1])
# IF OBJECT IS CUSTOM
if paramgram["object"] == "custom":
results = fmgr_get_custom(fmg, paramgram)
if results[0] != 0:
module.fail_json(msg="QUERY FAILED -- Please check syntax check JSON guide if needed.")
if results[0] == 0:
results_len = len(results[1])
if results_len > 0:
results_combine = dict()
if isinstance(results[1], dict):
results_combine["results"] = results[1]
if isinstance(results[1], list):
results_combine["results"] = results[1][0:results_len]
module.exit_json(msg="Custom Query Success", **results_combine)
try:
# IF OBJECT IS DEVICE
if paramgram["object"] == "device" and any(v is not None for v in [paramgram["device_unique_name"],
paramgram["device_serial"],
paramgram["device_ip"]]):
results = fmgr_get_device(fmgr, paramgram)
if results[0] not in [0]:
module.fail_json(msg="Device query failed!")
elif len(results[1]) == 0:
module.exit_json(msg="Device NOT FOUND!")
else:
module.exit_json(msg="NO RESULTS")
module.exit_json(msg="Device Found", **results[1][0])
except Exception as err:
raise FMGBaseException(err)
# logout
fmg.logout()
return module.fail_json(msg="Parameters weren't right, logic tree didn't validate. Check playbook.")
try:
# IF OBJECT IS CLUSTER_NODES
if paramgram["object"] == "cluster_nodes" and paramgram["nodes"] is not None:
results = fmgr_get_cluster_nodes(fmgr, paramgram)
if results["cluster_status"] == "MISSING":
module.exit_json(msg="No cluster device found!", **results)
elif results["query_status"] == "good":
module.exit_json(msg="Cluster Found - Showing Nodes", **results)
elif results is None:
module.fail_json(msg="Query FAILED -- Check module or playbook syntax")
except Exception as err:
raise FMGBaseException(err)
try:
# IF OBJECT IS TASK
if paramgram["object"] == "task":
results = fmgr_get_task_status(fmgr, paramgram)
if results[0] != 0:
module.fail_json(**results[1])
if results[0] == 0:
module.exit_json(**results[1])
except Exception as err:
raise FMGBaseException(err)
try:
# IF OBJECT IS CUSTOM
if paramgram["object"] == "custom":
results = fmgr_get_custom(fmgr, paramgram)
if results[0] != 0:
module.fail_json(msg="QUERY FAILED -- Please check syntax check JSON guide if needed.")
if results[0] == 0:
results_len = len(results[1])
if results_len > 0:
results_combine = dict()
if isinstance(results[1], dict):
results_combine["results"] = results[1]
if isinstance(results[1], list):
results_combine["results"] = results[1][0:results_len]
module.exit_json(msg="Custom Query Success", **results_combine)
else:
module.exit_json(msg="NO RESULTS")
except Exception as err:
raise FMGBaseException(err)
return module.exit_json(**results[1])
if __name__ == "__main__":

View file

@ -0,0 +1,101 @@
{
"fmgr_get_task_status": [
{
"url": "/task/task/247",
"paramgram_used": {
"custom_endpoint": null,
"object": "task",
"task_id": "247",
"adom": "ansible",
"device_ip": null,
"custom_dict": null,
"device_unique_name": null,
"nodes": null,
"device_serial": null
},
"datagram_sent": {
"adom": "ansible"
},
"raw_response": {
"src": "device manager",
"num_warn": 0,
"num_lines": 1,
"adom": 133,
"tot_percent": 100,
"pid": 0,
"end_tm": 1550716014,
"num_err": 0,
"percent": 100,
"state": "done",
"start_tm": 1550716010,
"flags": 0,
"user": "ansible",
"title": "upddevtitle",
"line": [
{
"name": "FGT3",
"err": 0,
"ip": "10.7.220.153",
"oid": 0,
"percent": 100,
"detail": "updatesuccess",
"state": "done",
"vdom": null
}
],
"num_done": 1,
"id": 247,
"history": [
{
"percent": 100,
"detail": "2019-02-20 18:26:54:updatesuccess",
"vdom": null,
"name": "FGT3"
}
]
},
"post_method": "get"
}
],
"fmgr_get_custom": [
{
"url": "/dvmdb/adom/ansible/script",
"raw_response": [
{
"filter_hostname": null,
"filter_device": 0,
"filter_serial": null,
"name": "TestScript",
"type": "cli",
"oid": 365,
"filter_osver": "unknown",
"content": "get system status",
"modification_time": "2019-02-13 23:45:29",
"filter_build": -1,
"filter_platform": "",
"desc": "Create by Ansible",
"script_schedule": null,
"filter_ostype": "unknown",
"target": "remote_device"
}
],
"datagram_sent": {
"type": "cli"
},
"paramgram_used": {
"custom_endpoint": "/dvmdb/adom/ansible/script",
"device_ip": null,
"device_unique_name": null,
"task_id": null,
"adom": "ansible",
"nodes": null,
"object": "custom",
"device_serial": null,
"custom_dict": {
"type": "cli"
}
},
"post_method": "get"
}
]
}

View file

@ -0,0 +1,106 @@
# Copyright 2018 Fortinet, Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <https://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import json
from ansible.module_utils.network.fortimanager.fortimanager import FortiManagerHandler
import pytest
try:
from ansible.modules.network.fortimanager import fmgr_query
except ImportError:
pytest.skip("Could not load required modules for testing", allow_module_level=True)
def load_fixtures():
fixture_path = os.path.join(os.path.dirname(__file__), 'fixtures') + "/{filename}.json".format(
filename=os.path.splitext(os.path.basename(__file__))[0])
try:
with open(fixture_path, "r") as fixture_file:
fixture_data = json.load(fixture_file)
except IOError:
return []
return [fixture_data]
@pytest.fixture(autouse=True)
def module_mock(mocker):
connection_class_mock = mocker.patch('ansible.module_utils.basic.AnsibleModule')
return connection_class_mock
@pytest.fixture(autouse=True)
def connection_mock(mocker):
connection_class_mock = mocker.patch('ansible.modules.network.fortimanager.fmgr_query.Connection')
return connection_class_mock
@pytest.fixture(scope="function", params=load_fixtures())
def fixture_data(request):
func_name = request.function.__name__.replace("test_", "")
return request.param.get(func_name, None)
fmg_instance = FortiManagerHandler(connection_mock, module_mock)
def test_fmgr_get_custom(fixture_data, mocker):
mocker.patch("ansible.module_utils.network.fortimanager.fortimanager.FortiManagerHandler.process_request",
side_effect=fixture_data)
# Fixture sets used:###########################
##################################################
# custom_endpoint: /dvmdb/adom/ansible/script
# device_ip: None
# device_unique_name: None
# task_id: None
# adom: ansible
# nodes: None
# object: custom
# device_serial: None
# custom_dict: {'type': 'cli'}
# mode: get
##################################################
# Test using fixture 1 #
output = fmgr_query.fmgr_get_custom(fmg_instance, fixture_data[0]['paramgram_used'])
assert isinstance(output['raw_response'], list) is True
def test_fmgr_get_task_status(fixture_data, mocker):
mocker.patch("ansible.module_utils.network.fortimanager.fortimanager.FortiManagerHandler.process_request",
side_effect=fixture_data)
# Fixture sets used:###########################
##################################################
# custom_endpoint: None
# object: task
# task_id: 247
# adom: ansible
# device_ip: None
# custom_dict: None
# device_unique_name: None
# nodes: None
# device_serial: None
# mode: get
##################################################
# Test using fixture 1 #
output = fmgr_query.fmgr_get_task_status(fmg_instance, fixture_data[0]['paramgram_used'])
assert isinstance(output['raw_response'], dict) is True