diff --git a/lib/ansible/module_utils/network/ftd/__init__.py b/lib/ansible/module_utils/network/ftd/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/lib/ansible/module_utils/network/ftd/common.py b/lib/ansible/module_utils/network/ftd/common.py
new file mode 100644
index 00000000000..1125fb36c30
--- /dev/null
+++ b/lib/ansible/module_utils/network/ftd/common.py
@@ -0,0 +1,177 @@
+# Copyright (c) 2018 Cisco and/or its affiliates.
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see .
+#
+
+import re
+
+INVALID_IDENTIFIER_SYMBOLS = r'[^a-zA-Z0-9_]'
+
+IDENTITY_PROPERTIES = ['id', 'version', 'ruleId']
+NON_COMPARABLE_PROPERTIES = IDENTITY_PROPERTIES + ['isSystemDefined', 'links']
+
+
+class HTTPMethod:
+ GET = 'get'
+ POST = 'post'
+ PUT = 'put'
+ DELETE = 'delete'
+
+
+class ResponseParams:
+ SUCCESS = 'success'
+ STATUS_CODE = 'status_code'
+ RESPONSE = 'response'
+
+
+class FtdConfigurationError(Exception):
+ pass
+
+
+class FtdServerError(Exception):
+ def __init__(self, response, code):
+ super(FtdServerError, self).__init__(response)
+ self.response = response
+ self.code = code
+
+
+def construct_ansible_facts(response, params):
+ facts = dict()
+ if response:
+ response_body = response['items'] if 'items' in response else response
+ if params.get('register_as'):
+ facts[params['register_as']] = response_body
+ elif 'name' in response_body and 'type' in response_body:
+ object_name = re.sub(INVALID_IDENTIFIER_SYMBOLS, '_', response_body['name'].lower())
+ fact_name = '%s_%s' % (response_body['type'], object_name)
+ facts[fact_name] = response_body
+ return facts
+
+
+def copy_identity_properties(source_obj, dest_obj):
+ for property_name in IDENTITY_PROPERTIES:
+ if property_name in source_obj:
+ dest_obj[property_name] = source_obj[property_name]
+ return dest_obj
+
+
+def is_object_ref(d):
+ """
+ Checks if a dictionary is a reference object. The dictionary is considered to be a
+ reference object when it contains non-empty 'id' and 'type' fields.
+
+ :type d: dict
+ :return: True if passed dictionary is a reference object, otherwise False
+ """
+ has_id = 'id' in d.keys() and d['id']
+ has_type = 'type' in d.keys() and d['type']
+ return has_id and has_type
+
+
+def equal_object_refs(d1, d2):
+ """
+ Checks whether two references point to the same object.
+
+ :type d1: dict
+ :type d2: dict
+ :return: True if passed references point to the same object, otherwise False
+ """
+ have_equal_ids = d1['id'] == d2['id']
+ have_equal_types = d1['type'] == d2['type']
+ return have_equal_ids and have_equal_types
+
+
+def equal_lists(l1, l2):
+ """
+ Checks whether two lists are equal. The order of elements in the arrays is important.
+
+ :type l1: list
+ :type l2: list
+ :return: True if passed lists, their elements and order of elements are equal. Otherwise, returns False.
+ """
+ if len(l1) != len(l2):
+ return False
+
+ for v1, v2 in zip(l1, l2):
+ if not equal_values(v1, v2):
+ return False
+
+ return True
+
+
+def equal_dicts(d1, d2, compare_by_reference=True):
+ """
+ Checks whether two dictionaries are equal. If `compare_by_reference` is set to True, dictionaries referencing
+ objects are compared using `equal_object_refs` method. Otherwise, every key and value is checked.
+
+ :type d1: dict
+ :type d2: dict
+ :param compare_by_reference: if True, dictionaries referencing objects are compared using `equal_object_refs` method
+ :return: True if passed dicts are equal. Otherwise, returns False.
+ """
+ if compare_by_reference and is_object_ref(d1) and is_object_ref(d2):
+ return equal_object_refs(d1, d2)
+
+ if len(d1) != len(d2):
+ return False
+
+ for key, v1 in d1.items():
+ if key not in d2:
+ return False
+
+ v2 = d2[key]
+ if not equal_values(v1, v2):
+ return False
+
+ return True
+
+
+def equal_values(v1, v2):
+ """
+ Checks whether types and content of two values are the same. In case of complex objects, the method might be
+ called recursively.
+
+ :param v1: first value
+ :param v2: second value
+ :return: True if types and content of passed values are equal. Otherwise, returns False.
+ :rtype: bool
+ """
+ if type(v1) != type(v2):
+ return False
+ value_type = type(v1)
+
+ if value_type == list:
+ return equal_lists(v1, v2)
+ elif value_type == dict:
+ return equal_dicts(v1, v2)
+ else:
+ return v1 == v2
+
+
+def equal_objects(d1, d2):
+ """
+ Checks whether two objects are equal. Ignores special object properties (e.g. 'id', 'version') and
+ properties with None and empty values. In case properties contains a reference to the other object,
+ only object identities (ids and types) are checked.
+
+ :type d1: dict
+ :type d2: dict
+ :return: True if passed objects and their properties are equal. Otherwise, returns False.
+ """
+ d1 = dict((k, d1[k]) for k in d1.keys() if k not in NON_COMPARABLE_PROPERTIES and d1[k])
+ d2 = dict((k, d2[k]) for k in d2.keys() if k not in NON_COMPARABLE_PROPERTIES and d2[k])
+
+ return equal_dicts(d1, d2, compare_by_reference=False)
diff --git a/lib/ansible/module_utils/network/ftd/configuration.py b/lib/ansible/module_utils/network/ftd/configuration.py
new file mode 100644
index 00000000000..c594a98da8a
--- /dev/null
+++ b/lib/ansible/module_utils/network/ftd/configuration.py
@@ -0,0 +1,146 @@
+# Copyright (c) 2018 Cisco and/or its affiliates.
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see .
+#
+
+from functools import partial
+
+from ansible.module_utils.network.ftd.common import HTTPMethod, equal_objects, copy_identity_properties, \
+ FtdConfigurationError, FtdServerError, ResponseParams
+
+DEFAULT_PAGE_SIZE = 10
+DEFAULT_OFFSET = 0
+
+UNPROCESSABLE_ENTITY_STATUS = 422
+INVALID_UUID_ERROR_MESSAGE = "Validation failed due to an invalid UUID"
+DUPLICATE_NAME_ERROR_MESSAGE = "Validation failed due to a duplicate name"
+
+
+class BaseConfigurationResource(object):
+ def __init__(self, conn):
+ self._conn = conn
+ self.config_changed = False
+
+ def get_object_by_name(self, url_path, name, path_params=None):
+ item_generator = iterate_over_pageable_resource(
+ partial(self.send_request, url_path=url_path, http_method=HTTPMethod.GET, path_params=path_params),
+ {'filter': 'name:%s' % name}
+ )
+ # not all endpoints support filtering so checking name explicitly
+ return next((item for item in item_generator if item['name'] == name), None)
+
+ def get_objects_by_filter(self, url_path, filters, path_params=None, query_params=None):
+ def match_filters(obj):
+ for k, v in filters.items():
+ if k not in obj or obj[k] != v:
+ return False
+ return True
+
+ item_generator = iterate_over_pageable_resource(
+ partial(self.send_request, url_path=url_path, http_method=HTTPMethod.GET, path_params=path_params),
+ query_params
+ )
+ return [i for i in item_generator if match_filters(i)]
+
+ def add_object(self, url_path, body_params, path_params=None, query_params=None, update_if_exists=False):
+ def is_duplicate_name_error(err):
+ return err.code == UNPROCESSABLE_ENTITY_STATUS and DUPLICATE_NAME_ERROR_MESSAGE in str(err)
+
+ def update_existing_object(obj):
+ new_path_params = {} if path_params is None else path_params
+ new_path_params['objId'] = obj['id']
+ return self.send_request(url_path=url_path + '/{objId}',
+ http_method=HTTPMethod.PUT,
+ body_params=copy_identity_properties(obj, body_params),
+ path_params=new_path_params,
+ query_params=query_params)
+
+ try:
+ return self.send_request(url_path=url_path, http_method=HTTPMethod.POST, body_params=body_params,
+ path_params=path_params, query_params=query_params)
+ except FtdServerError as e:
+ if is_duplicate_name_error(e):
+ existing_obj = self.get_object_by_name(url_path, body_params['name'], path_params)
+ if equal_objects(existing_obj, body_params):
+ return existing_obj
+ elif update_if_exists:
+ return update_existing_object(existing_obj)
+ else:
+ raise FtdConfigurationError(
+ 'Cannot add new object. An object with the same name but different parameters already exists.')
+ else:
+ raise e
+
+ def delete_object(self, url_path, path_params):
+ def is_invalid_uuid_error(err):
+ return err.code == UNPROCESSABLE_ENTITY_STATUS and INVALID_UUID_ERROR_MESSAGE in str(err)
+
+ try:
+ return self.send_request(url_path=url_path, http_method=HTTPMethod.DELETE, path_params=path_params)
+ except FtdServerError as e:
+ if is_invalid_uuid_error(e):
+ return {'status': 'Referenced object does not exist'}
+ else:
+ raise e
+
+ def edit_object(self, url_path, body_params, path_params=None, query_params=None):
+ existing_object = self.send_request(url_path=url_path, http_method=HTTPMethod.GET, path_params=path_params)
+
+ if not existing_object:
+ raise FtdConfigurationError('Referenced object does not exist')
+ elif equal_objects(existing_object, body_params):
+ return existing_object
+ else:
+ return self.send_request(url_path=url_path, http_method=HTTPMethod.PUT, body_params=body_params,
+ path_params=path_params, query_params=query_params)
+
+ def send_request(self, url_path, http_method, body_params=None, path_params=None, query_params=None):
+ def raise_for_failure(resp):
+ if not resp[ResponseParams.SUCCESS]:
+ raise FtdServerError(resp[ResponseParams.RESPONSE], resp[ResponseParams.STATUS_CODE])
+
+ response = self._conn.send_request(url_path=url_path, http_method=http_method, body_params=body_params,
+ path_params=path_params, query_params=query_params)
+ raise_for_failure(response)
+ if http_method != HTTPMethod.GET:
+ self.config_changed = True
+ return response[ResponseParams.RESPONSE]
+
+
+def iterate_over_pageable_resource(resource_func, query_params=None):
+ """
+ A generator function that iterates over a resource that supports pagination and lazily returns present items
+ one by one.
+
+ :param resource_func: function that receives `query_params` named argument and returns a page of objects
+ :type resource_func: callable
+ :param query_params: initial dictionary of query parameters that will be passed to the resource_func
+ :type query_params: dict
+ :return: an iterator containing returned items
+ :rtype: iterator of dict
+ """
+ query_params = {} if query_params is None else dict(query_params)
+ query_params.setdefault('limit', DEFAULT_PAGE_SIZE)
+ query_params.setdefault('offset', DEFAULT_OFFSET)
+
+ result = resource_func(query_params=query_params)
+ while result['items']:
+ for item in result['items']:
+ yield item
+ # creating a copy not to mutate existing dict
+ query_params = dict(query_params)
+ query_params['offset'] = int(query_params['offset']) + int(query_params['limit'])
+ result = resource_func(query_params=query_params)
diff --git a/lib/ansible/module_utils/network/ftd/fdm_swagger_client.py b/lib/ansible/module_utils/network/ftd/fdm_swagger_client.py
new file mode 100644
index 00000000000..5d8becb077d
--- /dev/null
+++ b/lib/ansible/module_utils/network/ftd/fdm_swagger_client.py
@@ -0,0 +1,513 @@
+# Copyright (c) 2018 Cisco and/or its affiliates.
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see .
+#
+
+from ansible.module_utils.network.ftd.common import HTTPMethod
+from ansible.module_utils.six import integer_types, string_types
+
+FILE_MODEL_NAME = '_File'
+SUCCESS_RESPONSE_CODE = '200'
+
+
+class OperationField:
+ URL = 'url'
+ METHOD = 'method'
+ PARAMETERS = 'parameters'
+ MODEL_NAME = 'modelName'
+
+
+class SpecProp:
+ DEFINITIONS = 'definitions'
+ OPERATIONS = 'operations'
+ MODELS = 'models'
+
+
+class PropName:
+ ENUM = 'enum'
+ TYPE = 'type'
+ REQUIRED = 'required'
+ INVALID_TYPE = 'invalid_type'
+ REF = '$ref'
+ ALL_OF = 'allOf'
+ BASE_PATH = 'basePath'
+ PATHS = 'paths'
+ OPERATION_ID = 'operationId'
+ SCHEMA = 'schema'
+ ITEMS = 'items'
+ PROPERTIES = 'properties'
+ RESPONSES = 'responses'
+ NAME = 'name'
+
+
+class PropType:
+ STRING = 'string'
+ BOOLEAN = 'boolean'
+ INTEGER = 'integer'
+ NUMBER = 'number'
+ OBJECT = 'object'
+ ARRAY = 'array'
+ FILE = 'file'
+
+
+class OperationParams:
+ PATH = 'path'
+ QUERY = 'query'
+
+
+def _get_model_name_from_url(schema_ref):
+ path = schema_ref.split('/')
+ return path[len(path) - 1]
+
+
+class IllegalArgumentException(ValueError):
+ """
+ Exception raised when the function parameters:
+ - not all passed
+ - empty string
+ - wrong type
+ """
+ pass
+
+
+class ValidationError(ValueError):
+ pass
+
+
+class FdmSwaggerParser:
+ _definitions = None
+
+ def parse_spec(self, spec):
+ """
+ This method simplifies a swagger format and also resolves a model name for each operation
+ :param spec: dict
+ expect data in the swagger format see
+ :rtype: (bool, string|dict)
+ :return:
+ Ex.
+ The models field contains model definition from swagger see <#https://github.com/OAI/OpenAPI-Specification/blob/master/versions/2.0.md#definitions>
+ {
+ 'models':{
+ 'model_name':{...},
+ ...
+ },
+ 'operations':{
+ 'operation_name':{
+ 'method': 'get', #post, put, delete
+ 'url': '/api/fdm/v2/object/networks', #url already contains a value from `basePath`
+ 'modelName': 'NetworkObject', # it is a link to the model from 'models'
+ # None - for a delete operation or we don't have information
+ # '_File' - if an endpoint works with files
+ 'parameters': {
+ 'path':{
+ 'param_name':{
+ 'type': 'string'#integer, boolean, number
+ 'required' True #False
+ }
+ ...
+ },
+ 'query':{
+ 'param_name':{
+ 'type': 'string'#integer, boolean, number
+ 'required' True #False
+ }
+ ...
+ }
+ }
+ },
+ ...
+ }
+ }
+ """
+ self._definitions = spec[SpecProp.DEFINITIONS]
+ config = {
+ SpecProp.MODELS: self._definitions,
+ SpecProp.OPERATIONS: self._get_operations(spec)
+ }
+ return config
+
+ def _get_operations(self, spec):
+ base_path = spec[PropName.BASE_PATH]
+ paths_dict = spec[PropName.PATHS]
+ operations_dict = {}
+ for url, operation_params in paths_dict.items():
+ for method, params in operation_params.items():
+ operation = {
+ OperationField.METHOD: method,
+ OperationField.URL: base_path + url,
+ OperationField.MODEL_NAME: self._get_model_name(method, params)
+ }
+ if OperationField.PARAMETERS in params:
+ operation[OperationField.PARAMETERS] = self._get_rest_params(params[OperationField.PARAMETERS])
+
+ operation_id = params[PropName.OPERATION_ID]
+ operations_dict[operation_id] = operation
+ return operations_dict
+
+ def _get_model_name(self, method, params):
+ if method == HTTPMethod.GET:
+ return self._get_model_name_from_responses(params)
+ elif method == HTTPMethod.POST or method == HTTPMethod.PUT:
+ return self._get_model_name_for_post_put_requests(params)
+ else:
+ return None
+
+ def _get_model_name_for_post_put_requests(self, params):
+ model_name = None
+ if OperationField.PARAMETERS in params:
+ body_param_dict = self._get_body_param_from_parameters(params[OperationField.PARAMETERS])
+ if body_param_dict:
+ schema_ref = body_param_dict[PropName.SCHEMA][PropName.REF]
+ model_name = self._get_model_name_byschema_ref(schema_ref)
+ if model_name is None:
+ model_name = self._get_model_name_from_responses(params)
+ return model_name
+
+ @staticmethod
+ def _get_body_param_from_parameters(params):
+ return next((param for param in params if param['in'] == 'body'), None)
+
+ def _get_model_name_from_responses(self, params):
+ responses = params[PropName.RESPONSES]
+ if SUCCESS_RESPONSE_CODE in responses:
+ response = responses[SUCCESS_RESPONSE_CODE][PropName.SCHEMA]
+ if PropName.REF in response:
+ return self._get_model_name_byschema_ref(response[PropName.REF])
+ elif PropName.PROPERTIES in response:
+ ref = response[PropName.PROPERTIES][PropName.ITEMS][PropName.ITEMS][PropName.REF]
+ return self._get_model_name_byschema_ref(ref)
+ elif (PropName.TYPE in response) and response[PropName.TYPE] == PropType.FILE:
+ return FILE_MODEL_NAME
+ else:
+ return None
+
+ def _get_rest_params(self, params):
+ path = {}
+ query = {}
+ operation_param = {
+ OperationParams.PATH: path,
+ OperationParams.QUERY: query
+ }
+ for param in params:
+ in_param = param['in']
+ if in_param == OperationParams.QUERY:
+ query[param[PropName.NAME]] = self._simplify_param_def(param)
+ elif in_param == OperationParams.PATH:
+ path[param[PropName.NAME]] = self._simplify_param_def(param)
+ return operation_param
+
+ @staticmethod
+ def _simplify_param_def(param):
+ return {
+ PropName.TYPE: param[PropName.TYPE],
+ PropName.REQUIRED: param[PropName.REQUIRED]
+ }
+
+ def _get_model_name_byschema_ref(self, schema_ref):
+ model_name = _get_model_name_from_url(schema_ref)
+ model_def = self._definitions[model_name]
+ if PropName.ALL_OF in model_def:
+ return self._get_model_name_byschema_ref(model_def[PropName.ALL_OF][0][PropName.REF])
+ else:
+ return model_name
+
+
+class FdmSwaggerValidator:
+ def __init__(self, spec):
+ """
+ :param spec: dict
+ data from FdmSwaggerParser().parse_spec()
+ """
+ self._operations = spec[SpecProp.OPERATIONS]
+ self._models = spec[SpecProp.MODELS]
+
+ def validate_data(self, operation_name, data=None):
+ """
+ Validate data for the post|put requests
+ :param operation_name: string
+ The value must be non empty string.
+ The operation name is used to get a model specification
+ :param data: dict
+ The value must be in the format that the model(from operation) expects
+ :rtype: (bool, string|dict)
+ :return:
+ (True, None) - if data valid
+ Invalid:
+ (False, {
+ 'required': [ #list of the fields that are required but were not present in the data
+ 'field_name',
+ 'patent.field_name',# when the nested field is omitted
+ 'patent.list[2].field_name' # if data is array and one of the field is omitted
+ ],
+ 'invalid_type':[ #list of the fields with invalid data
+ {
+ 'path': 'objId', #field name or path to the field. Ex. objects[3].id, parent.name
+ 'expected_type': 'string',# expected type. Ex. 'object', 'array', 'string', 'integer',
+ # 'boolean', 'number'
+ 'actually_value': 1 # the value that user passed
+ }
+ ]
+ })
+ :raises IllegalArgumentException
+ 'The operation_name parameter must be a non-empty string' if operation_name is not valid
+ 'The data parameter must be a dict' if data neither dict or None
+ '{operation_name} operation does not support' if the spec does not contain the operation
+ """
+ if data is None:
+ data = {}
+
+ self._check_validate_data_params(data, operation_name)
+
+ operation = self._operations[operation_name]
+ model = self._models[operation[OperationField.MODEL_NAME]]
+ status = self._init_report()
+
+ self._validate_object(status, model, data, '')
+
+ if len(status[PropName.REQUIRED]) > 0 or len(status[PropName.INVALID_TYPE]) > 0:
+ return False, self._delete_empty_field_from_report(status)
+ return True, None
+
+ def _check_validate_data_params(self, data, operation_name):
+ if not operation_name or not isinstance(operation_name, string_types):
+ raise IllegalArgumentException("The operation_name parameter must be a non-empty string")
+ if not isinstance(data, dict):
+ raise IllegalArgumentException("The data parameter must be a dict")
+ if operation_name not in self._operations:
+ raise IllegalArgumentException("{0} operation does not support".format(operation_name))
+
+ def validate_query_params(self, operation_name, params):
+ """
+ Validate params for the get requests. Use this method for validating the query part of the url.
+ :param operation_name: string
+ The value must be non empty string.
+ The operation name is used to get a params specification
+ :param params: dict
+ should be in the format that the specification(from operation) expects
+ Ex.
+ {
+ 'objId': "string_value",
+ 'p_integer': 1,
+ 'p_boolean': True,
+ 'p_number': 2.3
+ }
+ :rtype:(Boolean, msg)
+ :return:
+ (True, None) - if params valid
+ Invalid:
+ (False, {
+ 'required': [ #list of the fields that are required but are not present in the params
+ 'field_name'
+ ],
+ 'invalid_type':[ #list of the fields with invalid data and expected type of the params
+ {
+ 'path': 'objId', #field name
+ 'expected_type': 'string',#expected type. Ex. 'string', 'integer', 'boolean', 'number'
+ 'actually_value': 1 # the value that user passed
+ }
+ ]
+ })
+ :raises IllegalArgumentException
+ 'The operation_name parameter must be a non-empty string' if operation_name is not valid
+ 'The params parameter must be a dict' if params neither dict or None
+ '{operation_name} operation does not support' if the spec does not contain the operation
+ """
+ return self._validate_url_params(operation_name, params, resource=OperationParams.QUERY)
+
+ def validate_path_params(self, operation_name, params):
+ """
+ Validate params for the get requests. Use this method for validating the path part of the url.
+ :param operation_name: string
+ The value must be non empty string.
+ The operation name is used to get a params specification
+ :param params: dict
+ should be in the format that the specification(from operation) expects
+
+ Ex.
+ {
+ 'objId': "string_value",
+ 'p_integer': 1,
+ 'p_boolean': True,
+ 'p_number': 2.3
+ }
+ :rtype:(Boolean, msg)
+ :return:
+ (True, None) - if params valid
+ Invalid:
+ (False, {
+ 'required': [ #list of the fields that are required but are not present in the params
+ 'field_name'
+ ],
+ 'invalid_type':[ #list of the fields with invalid data and expected type of the params
+ {
+ 'path': 'objId', #field name
+ 'expected_type': 'string',#expected type. Ex. 'string', 'integer', 'boolean', 'number'
+ 'actually_value': 1 # the value that user passed
+ }
+ ]
+ })
+ :raises IllegalArgumentException
+ 'The operation_name parameter must be a non-empty string' if operation_name is not valid
+ 'The params parameter must be a dict' if params neither dict or None
+ '{operation_name} operation does not support' if the spec does not contain the operation
+ """
+ return self._validate_url_params(operation_name, params, resource=OperationParams.PATH)
+
+ def _validate_url_params(self, operation, params, resource):
+ if params is None:
+ params = {}
+
+ self._check_validate_url_params(operation, params)
+
+ operation = self._operations[operation]
+ if OperationField.PARAMETERS in operation and resource in operation[OperationField.PARAMETERS]:
+ spec = operation[OperationField.PARAMETERS][resource]
+ status = self._init_report()
+ self._check_url_params(status, spec, params)
+
+ if len(status[PropName.REQUIRED]) > 0 or len(status[PropName.INVALID_TYPE]) > 0:
+ return False, self._delete_empty_field_from_report(status)
+ return True, None
+ else:
+ return True, None
+
+ def _check_validate_url_params(self, operation, params):
+ if not operation or not isinstance(operation, string_types):
+ raise IllegalArgumentException("The operation_name parameter must be a non-empty string")
+ if not isinstance(params, dict):
+ raise IllegalArgumentException("The params parameter must be a dict")
+ if operation not in self._operations:
+ raise IllegalArgumentException("{0} operation does not support".format(operation))
+
+ def _check_url_params(self, status, spec, params):
+ for prop_name in spec.keys():
+ prop = spec[prop_name]
+ if prop[PropName.REQUIRED] and prop_name not in params:
+ status[PropName.REQUIRED].append(prop_name)
+ continue
+ if prop_name in params:
+ expected_type = prop[PropName.TYPE]
+ value = params[prop_name]
+ if prop_name in params and not self._is_correct_simple_types(expected_type, value):
+ self._add_invalid_type_report(status, '', prop_name, expected_type, value)
+
+ def _validate_object(self, status, model, data, path):
+ if self._is_enum(model):
+ self._check_enum(status, model, data, path)
+ elif self._is_object(model):
+ self._check_object(status, model, data, path)
+
+ def _is_enum(self, model):
+ return self._is_string_type(model) and PropName.ENUM in model
+
+ def _check_enum(self, status, model, value, path):
+ if value not in model[PropName.ENUM]:
+ self._add_invalid_type_report(status, path, '', PropName.ENUM, value)
+
+ def _add_invalid_type_report(self, status, path, prop_name, expected_type, actually_value):
+ status[PropName.INVALID_TYPE].append({
+ 'path': self._create_path_to_field(path, prop_name),
+ 'expected_type': expected_type,
+ 'actually_value': actually_value
+ })
+
+ def _check_object(self, status, model, data, path):
+ if not isinstance(data, dict):
+ self._add_invalid_type_report(status, path, '', PropType.OBJECT, data)
+ return None
+
+ self._check_required_fields(status, model[PropName.REQUIRED], data, path)
+
+ model_properties = model[PropName.PROPERTIES]
+ for prop in model_properties.keys():
+ if prop in data:
+ model_prop_val = model_properties[prop]
+ expected_type = model_prop_val[PropName.TYPE]
+ actually_value = data[prop]
+ self._check_types(status, actually_value, expected_type, model_prop_val, path, prop)
+
+ def _check_types(self, status, actually_value, expected_type, model, path, prop_name):
+ if expected_type == PropType.OBJECT:
+ ref_model = self._get_model_by_ref(model)
+
+ self._validate_object(status, ref_model, actually_value,
+ path=self._create_path_to_field(path, prop_name))
+ elif expected_type == PropType.ARRAY:
+ self._check_array(status, model, actually_value,
+ path=self._create_path_to_field(path, prop_name))
+ elif not self._is_correct_simple_types(expected_type, actually_value):
+ self._add_invalid_type_report(status, path, prop_name, expected_type, actually_value)
+
+ def _get_model_by_ref(self, model_prop_val):
+ model = _get_model_name_from_url(model_prop_val[PropName.REF])
+ return self._models[model]
+
+ def _check_required_fields(self, status, required_fields, data, path):
+ missed_required_fields = [self._create_path_to_field(path, field) for field in
+ required_fields if field not in data.keys()]
+ if len(missed_required_fields) > 0:
+ status[PropName.REQUIRED] += missed_required_fields
+
+ def _check_array(self, status, model, data, path):
+ if not isinstance(data, list):
+ self._add_invalid_type_report(status, path, '', PropType.ARRAY, data)
+ else:
+ item_model = model[PropName.ITEMS]
+ for i, item_data in enumerate(data):
+ self._check_types(status, item_data, item_model[PropName.TYPE], item_model, "{0}[{1}]".format(path, i),
+ '')
+
+ @staticmethod
+ def _is_correct_simple_types(expected_type, value):
+ if expected_type == PropType.STRING:
+ return isinstance(value, string_types)
+ elif expected_type == PropType.BOOLEAN:
+ return isinstance(value, bool)
+ elif expected_type == PropType.INTEGER:
+ return isinstance(value, integer_types) and not isinstance(value, bool)
+ elif expected_type == PropType.NUMBER:
+ return isinstance(value, (integer_types, float)) and not isinstance(value, bool)
+ return False
+
+ @staticmethod
+ def _is_string_type(model):
+ return PropName.TYPE in model and model[PropName.TYPE] == PropType.STRING
+
+ @staticmethod
+ def _init_report():
+ return {
+ PropName.REQUIRED: [],
+ PropName.INVALID_TYPE: []
+ }
+
+ @staticmethod
+ def _delete_empty_field_from_report(status):
+ if not status[PropName.REQUIRED]:
+ del status[PropName.REQUIRED]
+ if not status[PropName.INVALID_TYPE]:
+ del status[PropName.INVALID_TYPE]
+ return status
+
+ @staticmethod
+ def _create_path_to_field(path='', field=''):
+ separator = ''
+ if path and field:
+ separator = '.'
+ return "{0}{1}{2}".format(path, separator, field)
+
+ @staticmethod
+ def _is_object(model):
+ return PropName.TYPE in model and model[PropName.TYPE] == PropType.OBJECT
diff --git a/lib/ansible/modules/network/ftd/__init__.py b/lib/ansible/modules/network/ftd/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/lib/ansible/modules/network/ftd/ftd_configuration.py b/lib/ansible/modules/network/ftd/ftd_configuration.py
new file mode 100644
index 00000000000..d21d541f5b8
--- /dev/null
+++ b/lib/ansible/modules/network/ftd/ftd_configuration.py
@@ -0,0 +1,219 @@
+#!/usr/bin/python
+
+# Copyright (c) 2018 Cisco and/or its affiliates.
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see .
+#
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+
+ANSIBLE_METADATA = {'metadata_version': '1.1',
+ 'status': ['preview'],
+ 'supported_by': 'network'}
+
+DOCUMENTATION = """
+---
+module: ftd_configuration
+short_description: Manages configuration on Cisco FTD devices over REST API
+description:
+ - Manages configuration on Cisco FTD devices including creating, updating, removing configuration objects,
+ scheduling and staring jobs, deploying pending changes, etc. All operation are performed over REST API.
+version_added: "2.7"
+author: "Cisco Systems, Inc."
+options:
+ operation:
+ description:
+ - The name of the operation to execute. Commonly, the operation starts with 'add', 'edit', 'get'
+ or 'delete' verbs, but can have an arbitrary name too.
+ required: true
+ data:
+ description:
+ - Key-value pairs that should be sent as body parameters in a REST API call
+ query_params:
+ description:
+ - Key-value pairs that should be sent as query parameters in a REST API call.
+ path_params:
+ description:
+ - Key-value pairs that should be sent as path parameters in a REST API call.
+ register_as:
+ description:
+ - Specifies Ansible fact name that is used to register received response from the FTD device.
+ filters:
+ description:
+ - Key-value dict that represents equality filters. Every key is a property name and value is its desired value.
+ If multiple filters are present, they are combined with logical operator AND.
+"""
+
+EXAMPLES = """
+- name: Create a network object
+ ftd_configuration:
+ operation: "addNetworkObject"
+ data:
+ name: "Ansible-network-host"
+ description: "From Ansible with love"
+ subType: "HOST"
+ value: "192.168.2.0"
+ dnsResolution: "IPV4_AND_IPV6"
+ type: "networkobject"
+ isSystemDefined: false
+ register_as: "hostNetwork"
+
+- name: Delete the network object
+ ftd_configuration:
+ operation: "deleteNetworkObject"
+ path_params:
+ objId: "{{ hostNetwork['id'] }}"
+"""
+
+RETURN = """
+response:
+ description: HTTP response returned from the API call.
+ returned: success
+ type: dict
+"""
+from ansible.module_utils.basic import AnsibleModule
+from ansible.module_utils.connection import Connection
+from ansible.module_utils.network.ftd.common import HTTPMethod, construct_ansible_facts, FtdConfigurationError, \
+ FtdServerError
+from ansible.module_utils.network.ftd.configuration import BaseConfigurationResource
+from ansible.module_utils.network.ftd.fdm_swagger_client import OperationField, ValidationError
+
+
+def is_post_request(operation_spec):
+ return operation_spec[OperationField.METHOD] == HTTPMethod.POST
+
+
+def is_put_request(operation_spec):
+ return operation_spec[OperationField.METHOD] == HTTPMethod.PUT
+
+
+def is_add_operation(operation_name, operation_spec):
+ # Some endpoints have non-CRUD operations, so checking operation name is required in addition to the HTTP method
+ return operation_name.startswith('add') and is_post_request(operation_spec)
+
+
+def is_edit_operation(operation_name, operation_spec):
+ # Some endpoints have non-CRUD operations, so checking operation name is required in addition to the HTTP method
+ return operation_name.startswith('edit') and is_put_request(operation_spec)
+
+
+def is_delete_operation(operation_name, operation_spec):
+ # Some endpoints have non-CRUD operations, so checking operation name is required in addition to the HTTP method
+ return operation_name.startswith('delete') and operation_spec[OperationField.METHOD] == HTTPMethod.DELETE
+
+
+def validate_params(connection, op_name, query_params, path_params, data, op_spec):
+ report = {}
+
+ def validate(validation_method, field_name, params):
+ key = 'Invalid %s provided' % field_name
+ try:
+ is_valid, validation_report = validation_method(op_name, params)
+ if not is_valid:
+ report[key] = validation_report
+ except Exception as e:
+ report[key] = str(e)
+ return report
+
+ validate(connection.validate_query_params, 'query_params', query_params)
+ validate(connection.validate_path_params, 'path_params', path_params)
+ if is_post_request(op_spec) or is_post_request(op_spec):
+ validate(connection.validate_data, 'data', data)
+
+ if report:
+ raise ValidationError(report)
+
+
+def is_find_by_filter_operation(operation_name, operation_spec, params):
+ """
+ Checks whether the called operation is 'find by filter'. This operation fetches all objects and finds
+ the matching ones by the given filter. As filtering is done on the client side, this operation should be used
+ only when selected filters are not implemented on the server side.
+
+ :param operation_name: name of the operation being called by the user
+ :type operation_name: str
+ :param operation_spec: specification of the operation being called by the user
+ :type operation_spec: dict
+ :param params: module parameters
+ :return: True if called operation is find by filter, otherwise False
+ :rtype: bool
+ """
+ is_get_list_operation = operation_name.startswith('get') and operation_name.endswith('List')
+ is_get_method = operation_spec[OperationField.METHOD] == HTTPMethod.GET
+ return is_get_list_operation and is_get_method and params['filters']
+
+
+def main():
+ fields = dict(
+ operation=dict(type='str', required=True),
+ data=dict(type='dict'),
+ query_params=dict(type='dict'),
+ path_params=dict(type='dict'),
+ register_as=dict(type='str'),
+ filters=dict(type='dict')
+ )
+ module = AnsibleModule(argument_spec=fields,
+ supports_check_mode=True)
+ params = module.params
+
+ connection = Connection(module._socket_path)
+
+ op_name = params['operation']
+ op_spec = connection.get_operation_spec(op_name)
+ if op_spec is None:
+ module.fail_json(msg='Invalid operation name provided: %s' % op_name)
+
+ data, query_params, path_params = params['data'], params['query_params'], params['path_params']
+
+ try:
+ validate_params(connection, op_name, query_params, path_params, data, op_spec)
+ except ValidationError as e:
+ module.fail_json(msg=e.args[0])
+
+ try:
+ if module.check_mode:
+ module.exit_json(changed=False)
+
+ resource = BaseConfigurationResource(connection)
+ url = op_spec[OperationField.URL]
+
+ if is_add_operation(op_name, op_spec):
+ resp = resource.add_object(url, data, path_params, query_params)
+ elif is_edit_operation(op_name, op_spec):
+ resp = resource.edit_object(url, data, path_params, query_params)
+ elif is_delete_operation(op_name, op_spec):
+ resp = resource.delete_object(url, path_params)
+ elif is_find_by_filter_operation(op_name, op_spec, params):
+ resp = resource.get_objects_by_filter(url, params['filters'], path_params,
+ query_params)
+ else:
+ resp = resource.send_request(url, op_spec[OperationField.METHOD], data,
+ path_params,
+ query_params)
+
+ module.exit_json(changed=resource.config_changed, response=resp,
+ ansible_facts=construct_ansible_facts(resp, module.params))
+ except FtdConfigurationError as e:
+ module.fail_json(msg='Failed to execute %s operation because of the configuration error: %s' % (op_name, e))
+ except FtdServerError as e:
+ module.fail_json(msg='Server returned an error trying to execute %s operation. Status code: %s. '
+ 'Server response: %s' % (op_name, e.code, e.response))
+
+
+if __name__ == '__main__':
+ main()
diff --git a/lib/ansible/modules/network/ftd/ftd_file_download.py b/lib/ansible/modules/network/ftd/ftd_file_download.py
new file mode 100644
index 00000000000..aafccb2b0de
--- /dev/null
+++ b/lib/ansible/modules/network/ftd/ftd_file_download.py
@@ -0,0 +1,128 @@
+#!/usr/bin/python
+
+# Copyright (c) 2018 Cisco and/or its affiliates.
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see .
+#
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+
+ANSIBLE_METADATA = {'metadata_version': '1.1',
+ 'status': ['preview'],
+ 'supported_by': 'network'}
+
+DOCUMENTATION = """
+---
+module: ftd_file_download
+short_description: Downloads files from Cisco FTD devices over HTTP(S)
+description:
+ - Downloads files from Cisco FTD devices including pending changes, disk files, certificates,
+ troubleshoot reports, and backups.
+version_added: "2.7"
+author: "Cisco Systems, Inc."
+options:
+ operation:
+ description:
+ - The name of the operation to execute.
+ - Only operations that return a file can be used in this module.
+ required: true
+ path_params:
+ description:
+ - Key-value pairs that should be sent as path parameters in a REST API call.
+ destination:
+ description:
+ - Absolute path of where to download the file to.
+ - If destination is a directory, the module uses a filename from 'Content-Disposition' header specified by the server.
+ required: true
+"""
+
+EXAMPLES = """
+- name: Download pending changes
+ ftd_file_download:
+ operation: 'getdownload'
+ path_params:
+ objId: 'default'
+ destination: /tmp/
+"""
+
+RETURN = """
+msg:
+ description: the error message describing why the module failed
+ returned: error
+ type: string
+"""
+from ansible.module_utils.basic import AnsibleModule
+from ansible.module_utils.connection import Connection
+from ansible.module_utils.network.ftd.common import FtdServerError, HTTPMethod
+from ansible.module_utils.network.ftd.fdm_swagger_client import OperationField, ValidationError, FILE_MODEL_NAME
+
+
+def is_download_operation(op_spec):
+ return op_spec[OperationField.METHOD] == HTTPMethod.GET and op_spec[OperationField.MODEL_NAME] == FILE_MODEL_NAME
+
+
+def validate_params(connection, op_name, path_params):
+ field_name = 'Invalid path_params provided'
+ try:
+ is_valid, validation_report = connection.validate_path_params(op_name, path_params)
+ if not is_valid:
+ raise ValidationError({
+ field_name: validation_report
+ })
+ except Exception as e:
+ raise ValidationError({
+ field_name: str(e)
+ })
+
+
+def main():
+ fields = dict(
+ operation=dict(type='str', required=True),
+ path_params=dict(type='dict'),
+ destination=dict(type='path', required=True)
+ )
+ module = AnsibleModule(argument_spec=fields,
+ supports_check_mode=True)
+ params = module.params
+ connection = Connection(module._socket_path)
+
+ op_name = params['operation']
+ op_spec = connection.get_operation_spec(op_name)
+ if op_spec is None:
+ module.fail_json(msg='Operation with specified name is not found: %s' % op_name)
+ if not is_download_operation(op_spec):
+ module.fail_json(
+ msg='Invalid download operation: %s. The operation must make GET request and return a file.' %
+ op_name)
+
+ try:
+ path_params = params['path_params']
+ validate_params(connection, op_name, path_params)
+ if module.check_mode:
+ module.exit_json(changed=False)
+ connection.download_file(op_spec[OperationField.URL], params['destination'], path_params)
+ module.exit_json(changed=False)
+ except FtdServerError as e:
+ module.fail_json(msg='Download request for %s operation failed. Status code: %s. '
+ 'Server response: %s' % (op_name, e.code, e.response))
+ except ValidationError as e:
+ module.fail_json(msg=e.args[0])
+
+
+if __name__ == '__main__':
+ main()
diff --git a/lib/ansible/modules/network/ftd/ftd_file_upload.py b/lib/ansible/modules/network/ftd/ftd_file_upload.py
new file mode 100644
index 00000000000..a1187dc12d0
--- /dev/null
+++ b/lib/ansible/modules/network/ftd/ftd_file_upload.py
@@ -0,0 +1,105 @@
+#!/usr/bin/python
+
+# Copyright (c) 2018 Cisco and/or its affiliates.
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see .
+#
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+
+ANSIBLE_METADATA = {'metadata_version': '1.1',
+ 'status': ['preview'],
+ 'supported_by': 'network'}
+
+DOCUMENTATION = """
+---
+module: ftd_file_upload
+short_description: Uploads files to Cisco FTD devices over HTTP(S)
+description:
+ - Uploads files to Cisco FTD devices including disk files, backups, and upgrades.
+version_added: "2.7"
+author: "Cisco Systems, Inc."
+options:
+ operation:
+ description:
+ - The name of the operation to execute.
+ - Only operations that upload file can be used in this module.
+ required: true
+ fileToUpload:
+ description:
+ - Absolute path to the file that should be uploaded.
+ required: true
+ register_as:
+ description:
+ - Specifies Ansible fact name that is used to register received response from the FTD device.
+"""
+
+EXAMPLES = """
+- name: Upload disk file
+ ftd_file_upload:
+ operation: 'postuploaddiskfile'
+ fileToUpload: /tmp/test1.txt
+"""
+
+RETURN = """
+msg:
+ description: the error message describing why the module failed
+ returned: error
+ type: string
+"""
+from ansible.module_utils.basic import AnsibleModule
+from ansible.module_utils.connection import Connection
+from ansible.module_utils.network.ftd.common import construct_ansible_facts, FtdServerError, HTTPMethod
+from ansible.module_utils.network.ftd.fdm_swagger_client import OperationField
+
+
+def is_upload_operation(op_spec):
+ return op_spec[OperationField.METHOD] == HTTPMethod.POST or 'UploadStatus' in op_spec[OperationField.MODEL_NAME]
+
+
+def main():
+ fields = dict(
+ operation=dict(type='str', required=True),
+ fileToUpload=dict(type='path', required=True),
+ register_as=dict(type='str'),
+ )
+ module = AnsibleModule(argument_spec=fields,
+ supports_check_mode=True)
+ params = module.params
+ connection = Connection(module._socket_path)
+
+ op_spec = connection.get_operation_spec(params['operation'])
+ if op_spec is None:
+ module.fail_json(msg='Operation with specified name is not found: %s' % params['operation'])
+ if not is_upload_operation(op_spec):
+ module.fail_json(
+ msg='Invalid upload operation: %s. The operation must make POST request and return UploadStatus model.' %
+ params['operation'])
+
+ try:
+ if module.check_mode:
+ module.exit_json()
+ resp = connection.upload_file(params['fileToUpload'], op_spec[OperationField.URL])
+ module.exit_json(changed=True, response=resp, ansible_facts=construct_ansible_facts(resp, module.params))
+ except FtdServerError as e:
+ module.fail_json(msg='Upload request for %s operation failed. Status code: %s. '
+ 'Server response: %s' % (params['operation'], e.code, e.response))
+
+
+if __name__ == '__main__':
+ main()
diff --git a/lib/ansible/plugins/httpapi/ftd.py b/lib/ansible/plugins/httpapi/ftd.py
index f88c5e19acc..b0a623aeeee 100644
--- a/lib/ansible/plugins/httpapi/ftd.py
+++ b/lib/ansible/plugins/httpapi/ftd.py
@@ -1,36 +1,47 @@
-# Copyright Ansible Project
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+# Copyright (c) 2018 Cisco and/or its affiliates.
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see .
+#
from __future__ import (absolute_import, division, print_function)
+
__metaclass__ = type
import json
import os
import re
-import shutil
-from ansible.module_utils._text import to_text
+from ansible.module_utils.basic import to_text
+from ansible.errors import AnsibleConnectionFailure
+from ansible.module_utils.network.ftd.fdm_swagger_client import FdmSwaggerParser, SpecProp, FdmSwaggerValidator
+from ansible.module_utils.network.ftd.common import HTTPMethod, ResponseParams
from ansible.module_utils.six.moves.urllib.error import HTTPError
from ansible.module_utils.six.moves.urllib.parse import urlencode
from ansible.plugins.httpapi import HttpApiBase
-from ansible.module_utils.six import wraps
from urllib3 import encode_multipart_formdata
from urllib3.fields import RequestField
from ansible.module_utils.connection import ConnectionError
-from ansible.errors import AnsibleConnectionFailure
-
-try:
- from __main__ import display
-except ImportError:
- from ansible.utils.display import Display
- display = Display()
BASE_HEADERS = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
-API_PREFIX = "/api/fdm/v2"
-API_TOKEN_PATH = "/fdm/token"
+API_TOKEN_PATH_ENV_VAR = 'FTD_API_TOKEN_PATH'
+DEFAULT_API_TOKEN_PATH = '/api/fdm/v2/fdm/token'
+API_SPEC_PATH = '/apispec/ngfw.json'
TOKEN_EXPIRATION_STATUS_CODE = 408
UNAUTHORIZED_STATUS_CODE = 401
@@ -39,54 +50,82 @@ UNAUTHORIZED_STATUS_CODE = 401
class HttpApi(HttpApiBase):
def __init__(self, connection):
self.connection = connection
- self.access_token = False
- self.refresh_token = False
+ self.access_token = None
+ self.refresh_token = None
+ self._api_spec = None
+ self._api_validator = None
- def login(self, username=None, password=None):
- # Clean any old auth if present in connection plugin
- self.connection._auth = None
+ def login(self, username, password):
+ def request_token_payload(username, password):
+ return {
+ 'grant_type': 'password',
+ 'username': username,
+ 'password': password
+ }
+
+ def refresh_token_payload(refresh_token):
+ return {
+ 'grant_type': 'refresh_token',
+ 'refresh_token': refresh_token
+ }
if self.refresh_token:
- payload = {
- 'grant_type': 'refresh_token',
- 'refresh_token': self.refresh_token
- }
+ payload = refresh_token_payload(self.refresh_token)
+ elif username and password:
+ payload = request_token_payload(username, password)
else:
- if username and password:
- payload = {
- 'grant_type': 'password',
- 'username': username,
- 'password': password
- }
- else:
- raise AnsibleConnectionFailure(
- 'username and password are required for login'
- 'in absence of refresh token'
- )
- response, response_data = self.connection.send(
- API_PREFIX + API_TOKEN_PATH,
- json.dumps(payload), method='POST', headers=BASE_HEADERS
+ raise AnsibleConnectionFailure('Username and password are required for login in absence of refresh token')
+
+ dummy, response_data = self.connection.send(
+ self._get_api_token_path(), json.dumps(payload), method=HTTPMethod.POST, headers=BASE_HEADERS
)
+ response = self._response_to_json(response_data.getvalue())
+
try:
- self._set_token_info(response_data)
- except ValueError as vexc:
- raise ConnectionError('Did not receive access_token during Auth got'
- '{0}'.format(to_text(vexc)))
+ self.refresh_token = response['refresh_token']
+ self.access_token = response['access_token']
+ except KeyError:
+ raise ConnectionError(
+ 'Server returned response without token info during connection authentication: %s' % response)
+
+ def logout(self):
+ auth_payload = {
+ 'grant_type': 'revoke_token',
+ 'access_token': self.access_token,
+ 'token_to_revoke': self.refresh_token
+ }
+ self.connection.send(
+ self._get_api_token_path(), json.dumps(auth_payload), method=HTTPMethod.POST,
+ headers=self._authorized_headers()
+ )
+ self.refresh_token = None
+ self.access_token = None
+
+ def update_auth(self, response, response_data):
+ # With tokens, authentication should not be checked and updated on each request
+ return None
def send_request(self, url_path, http_method, body_params=None, path_params=None, query_params=None):
url = construct_url_path(url_path, path_params, query_params)
data = json.dumps(body_params) if body_params else None
-
- response, response_data = self.connection.send(
- url, data, method=http_method,
- headers=self._authorized_headers()
- )
try:
- ret = json.loads(to_text(response_data.getvalue()))
- except:
- raise ConnectionError('Response was not valid JSON, got {0}'
- .format(response_data.getvalue()))
- return ret
+ response, response_data = self.connection.send(
+ url, data, method=http_method,
+ headers=self._authorized_headers()
+ )
+ return {
+ ResponseParams.SUCCESS: True,
+ ResponseParams.STATUS_CODE: response.getcode(),
+ ResponseParams.RESPONSE: self._response_to_json(response_data.getvalue())
+ }
+ # Being invoked via JSON-RPC, this method does not serialize and pass HTTPError correctly to the method caller.
+ # Thus, in order to handle non-200 responses, we need to wrap them into a simple structure and pass explicitly.
+ except HTTPError as e:
+ return {
+ ResponseParams.SUCCESS: False,
+ ResponseParams.STATUS_CODE: e.code,
+ ResponseParams.RESPONSE: self._response_to_json(e.read())
+ }
def upload_file(self, from_path, to_url):
url = construct_url_path(to_url)
@@ -94,83 +133,89 @@ class HttpApi(HttpApiBase):
rf = RequestField('fileToUpload', src_file.read(), os.path.basename(src_file.name))
rf.make_multipart()
body, content_type = encode_multipart_formdata([rf])
+
headers = self._authorized_headers()
headers['Content-Type'] = content_type
headers['Content-Length'] = len(body)
- response, response_data = self.connection.send(
- url, data=body, method='POST', headers=headers
- )
- try:
- ret = json.loads(to_text(response_data.getvalue()))
- except:
- raise ConnectionError('Response was not valid JSON, got {0}'
- .format(response_data.getvalue()))
- return ret
- def download_file(self, from_url, to_path):
- url = construct_url_path(from_url)
+ dummy, response_data = self.connection.send(url, data=body, method=HTTPMethod.POST, headers=headers)
+ return self._response_to_json(response_data.getvalue())
+
+ def download_file(self, from_url, to_path, path_params=None):
+ url = construct_url_path(from_url, path_params=path_params)
response, response_data = self.connection.send(
- url, data=None, method='GET',
+ url, data=None, method=HTTPMethod.GET,
headers=self._authorized_headers()
)
+
if os.path.isdir(to_path):
filename = extract_filename_from_headers(response.info())
to_path = os.path.join(to_path, filename)
with open(to_path, "wb") as output_file:
- output_file.write(to_text(response_data.getvalue()))
-
- def update_auth(self, response, response_data):
- return None
-
- def _set_token_info(self, response_data):
- try:
- token_info = json.loads(to_text(response_data.getvalue()))
- except ValueError:
- raise
- if 'refresh_token' in token_info:
- self.refresh_token = token_info['refresh_token']
- if 'access_token' in token_info:
- self.access_token = token_info['access_token']
+ output_file.write(response_data.getvalue())
def handle_httperror(self, exc):
- # Called by connection plugin when it gets HTTP Error for a request.
- # Connection plugin will resend this request if we return true here.
- if (exc.code == TOKEN_EXPIRATION_STATUS_CODE or
- exc.code == UNAUTHORIZED_STATUS_CODE):
- # Stored auth appears to be invalid, clear and retry
+ if exc.code == TOKEN_EXPIRATION_STATUS_CODE or exc.code == UNAUTHORIZED_STATUS_CODE:
self.connection._auth = None
- self.login(self.connection.get_option('remote_user'),
- self.connection.get_option('password'))
+ self.login(self.connection.get_option('remote_user'), self.connection.get_option('password'))
return True
-
- return False
+ # None means that the exception will be passed further to the caller
+ return None
def _authorized_headers(self):
headers = dict(BASE_HEADERS)
headers['Authorization'] = 'Bearer %s' % self.access_token
return headers
- def logout(self):
- # Revoke the tokens
- auth_payload = {
- 'grant_type': 'revoke_token',
- 'access_token': self.access_token,
- 'token_to_revoke': self.refresh_token
- }
- self.connection.send(
- API_PREFIX + API_TOKEN_PATH, json.dumps(auth_payload),
- method='POST', headers=self._authorized_headers()
- )
- # HTTP error would cause exception Connection failure in connection
- # plugin
- self.refresh_token = False
- self.access_token = False
- display.vvvv("logged out successfully")
+ @staticmethod
+ def _get_api_token_path():
+ return os.environ.get(API_TOKEN_PATH_ENV_VAR, DEFAULT_API_TOKEN_PATH)
+
+ @staticmethod
+ def _response_to_json(response_data):
+ response_text = to_text(response_data)
+ try:
+ return json.loads(response_text) if response_text else {}
+ # JSONDecodeError only available on Python 3.5+
+ except getattr(json.decoder, 'JSONDecodeError', ValueError):
+ raise ConnectionError('Invalid JSON response: %s' % response_text)
+
+ def get_operation_spec(self, operation_name):
+ return self.api_spec[SpecProp.OPERATIONS].get(operation_name, None)
+
+ def get_model_spec(self, model_name):
+ return self.api_spec[SpecProp.MODELS].get(model_name, None)
+
+ def validate_data(self, operation_name, data):
+ return self.api_validator.validate_data(operation_name, data)
+
+ def validate_query_params(self, operation_name, params):
+ return self.api_validator.validate_query_params(operation_name, params)
+
+ def validate_path_params(self, operation_name, params):
+ return self.api_validator.validate_path_params(operation_name, params)
+
+ @property
+ def api_spec(self):
+ if self._api_spec is None:
+ response = self.send_request(url_path=API_SPEC_PATH, http_method=HTTPMethod.GET)
+ if response[ResponseParams.SUCCESS]:
+ self._api_spec = FdmSwaggerParser().parse_spec(response[ResponseParams.RESPONSE])
+ else:
+ raise ConnectionError('Failed to download API specification. Status code: %s. Response: %s' % (
+ response[ResponseParams.STATUS_CODE], response[ResponseParams.RESPONSE]))
+ return self._api_spec
+
+ @property
+ def api_validator(self):
+ if self._api_validator is None:
+ self._api_validator = FdmSwaggerValidator(self.api_spec)
+ return self._api_validator
def construct_url_path(path, path_params=None, query_params=None):
- url = API_PREFIX + path
+ url = path
if path_params:
url = url.format(**path_params)
if query_params:
diff --git a/test/units/module_utils/network/ftd/__init__.py b/test/units/module_utils/network/ftd/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/test/units/module_utils/network/ftd/test_common.py b/test/units/module_utils/network/ftd/test_common.py
new file mode 100644
index 00000000000..b3b609993d9
--- /dev/null
+++ b/test/units/module_utils/network/ftd/test_common.py
@@ -0,0 +1,241 @@
+# Copyright (c) 2018 Cisco and/or its affiliates.
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see .
+#
+
+from ansible.module_utils.network.ftd.common import equal_objects
+
+
+# simple objects
+
+def test_equal_objects_return_false_with_different_length():
+ assert not equal_objects(
+ {'foo': 1},
+ {'foo': 1, 'bar': 2}
+ )
+
+
+def test_equal_objects_return_false_with_different_fields():
+ assert not equal_objects(
+ {'foo': 1},
+ {'bar': 1}
+ )
+
+
+def test_equal_objects_return_false_with_different_value_types():
+ assert not equal_objects(
+ {'foo': 1},
+ {'foo': '1'}
+ )
+
+
+def test_equal_objects_return_false_with_different_values():
+ assert not equal_objects(
+ {'foo': 1},
+ {'foo': 2}
+ )
+
+
+def test_equal_objects_return_false_with_different_nested_values():
+ assert not equal_objects(
+ {'foo': {'bar': 1}},
+ {'foo': {'bar': 2}}
+ )
+
+
+def test_equal_objects_return_false_with_different_list_length():
+ assert not equal_objects(
+ {'foo': []},
+ {'foo': ['bar']}
+ )
+
+
+def test_equal_objects_return_true_with_equal_objects():
+ assert equal_objects(
+ {'foo': 1, 'bar': 2},
+ {'bar': 2, 'foo': 1}
+ )
+
+
+def test_equal_objects_return_true_with_equal_nested_dicts():
+ assert equal_objects(
+ {'foo': {'bar': 1, 'buz': 2}},
+ {'foo': {'buz': 2, 'bar': 1}}
+ )
+
+
+def test_equal_objects_return_true_with_equal_lists():
+ assert equal_objects(
+ {'foo': ['bar']},
+ {'foo': ['bar']}
+ )
+
+
+def test_equal_objects_return_true_with_ignored_fields():
+ assert equal_objects(
+ {'foo': 1, 'version': '123', 'id': '123123'},
+ {'foo': 1}
+ )
+
+
+# objects with object references
+
+def test_equal_objects_return_true_with_different_ref_ids():
+ assert not equal_objects(
+ {'foo': {'id': '1', 'type': 'network', 'ignored_field': 'foo'}},
+ {'foo': {'id': '2', 'type': 'network', 'ignored_field': 'bar'}}
+ )
+
+
+def test_equal_objects_return_true_with_different_ref_types():
+ assert not equal_objects(
+ {'foo': {'id': '1', 'type': 'network', 'ignored_field': 'foo'}},
+ {'foo': {'id': '1', 'type': 'accessRule', 'ignored_field': 'bar'}}
+ )
+
+
+def test_equal_objects_return_true_with_same_object_refs():
+ assert equal_objects(
+ {'foo': {'id': '1', 'type': 'network', 'ignored_field': 'foo'}},
+ {'foo': {'id': '1', 'type': 'network', 'ignored_field': 'bar'}}
+ )
+
+
+# objects with array of object references
+
+def test_equal_objects_return_false_with_different_array_length():
+ assert not equal_objects(
+ {'foo': [
+ {'id': '1', 'type': 'network', 'ignored_field': 'foo'}
+ ]},
+ {'foo': []}
+ )
+
+
+def test_equal_objects_return_false_with_different_array_order():
+ assert not equal_objects(
+ {'foo': [
+ {'id': '1', 'type': 'network', 'ignored_field': 'foo'},
+ {'id': '2', 'type': 'network', 'ignored_field': 'bar'}
+ ]},
+ {'foo': [
+ {'id': '2', 'type': 'network', 'ignored_field': 'foo'},
+ {'id': '1', 'type': 'network', 'ignored_field': 'bar'}
+ ]}
+ )
+
+
+def test_equal_objects_return_true_with_equal_ref_arrays():
+ assert equal_objects(
+ {'foo': [
+ {'id': '1', 'type': 'network', 'ignored_field': 'foo'}
+ ]},
+ {'foo': [
+ {'id': '1', 'type': 'network', 'ignored_field': 'bar'}
+ ]}
+ )
+
+
+# objects with nested structures and object references
+
+def test_equal_objects_return_true_with_equal_nested_object_references():
+ assert equal_objects(
+ {
+ 'name': 'foo',
+ 'config': {
+ 'version': '1',
+ 'port': {
+ 'name': 'oldPortName',
+ 'type': 'port',
+ 'id': '123'
+ }
+ }
+ },
+ {
+ 'name': 'foo',
+ 'config': {
+ 'version': '1',
+ 'port': {
+ 'name': 'newPortName',
+ 'type': 'port',
+ 'id': '123'
+ }
+ }
+ }
+ )
+
+
+def test_equal_objects_return_false_with_different_nested_object_references():
+ assert not equal_objects(
+ {
+ 'name': 'foo',
+ 'config': {
+ 'version': '1',
+ 'port': {
+ 'name': 'oldPortName',
+ 'type': 'port',
+ 'id': '123'
+ }
+ }
+ },
+ {
+ 'name': 'foo',
+ 'config': {
+ 'version': '1',
+ 'port': {
+ 'name': 'oldPortName',
+ 'type': 'port',
+ 'id': '234'
+ }
+ }
+ }
+ )
+
+
+def test_equal_objects_return_true_with_equal_nested_list_of_object_references():
+ assert equal_objects(
+ {
+ 'name': 'foo',
+ 'config': {
+ 'version': '1',
+ 'ports': [{
+ 'name': 'oldPortName',
+ 'type': 'port',
+ 'id': '123'
+ }, {
+ 'name': 'oldPortName2',
+ 'type': 'port',
+ 'id': '234'
+ }]
+ }
+ },
+ {
+ 'name': 'foo',
+ 'config': {
+ 'version': '1',
+ 'ports': [{
+ 'name': 'newPortName',
+ 'type': 'port',
+ 'id': '123'
+ }, {
+ 'name': 'newPortName2',
+ 'type': 'port',
+ 'id': '234',
+ 'extraField': 'foo'
+ }]
+ }
+ }
+ )
diff --git a/test/units/module_utils/network/ftd/test_configuration.py b/test/units/module_utils/network/ftd/test_configuration.py
new file mode 100644
index 00000000000..ff6f475e23d
--- /dev/null
+++ b/test/units/module_utils/network/ftd/test_configuration.py
@@ -0,0 +1,147 @@
+# Copyright (c) 2018 Cisco and/or its affiliates.
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see .
+#
+
+from ansible.compat.tests import mock
+from ansible.compat.tests.mock import call, patch
+from ansible.module_utils.network.ftd.configuration import iterate_over_pageable_resource, BaseConfigurationResource
+
+
+class TestBaseConfigurationResource(object):
+
+ @patch.object(BaseConfigurationResource, 'send_request')
+ def test_get_objects_by_filter_with_multiple_filters(self, send_request_mock):
+ objects = [
+ {'name': 'obj1', 'type': 1, 'foo': {'bar': 'buzz'}},
+ {'name': 'obj2', 'type': 1, 'foo': {'bar': 'buz'}},
+ {'name': 'obj3', 'type': 2, 'foo': {'bar': 'buzz'}}
+ ]
+ resource = BaseConfigurationResource(None)
+
+ send_request_mock.side_effect = [{'items': objects}, {'items': []}]
+ assert objects == resource.get_objects_by_filter('/objects', {})
+
+ send_request_mock.side_effect = [{'items': objects}, {'items': []}]
+ assert [objects[0]] == resource.get_objects_by_filter('/objects', {'name': 'obj1'})
+
+ send_request_mock.side_effect = [{'items': objects}, {'items': []}]
+ assert [objects[1]] == resource.get_objects_by_filter('/objects',
+ {'type': 1, 'foo': {'bar': 'buz'}})
+
+ @patch.object(BaseConfigurationResource, 'send_request')
+ def test_get_objects_by_filter_with_multiple_responses(self, send_request_mock):
+ send_request_mock.side_effect = [
+ {'items': [
+ {'name': 'obj1', 'type': 'foo'},
+ {'name': 'obj2', 'type': 'bar'}
+ ]},
+ {'items': [
+ {'name': 'obj3', 'type': 'foo'}
+ ]},
+ {'items': []}
+ ]
+
+ resource = BaseConfigurationResource(None)
+
+ assert [{'name': 'obj1', 'type': 'foo'}, {'name': 'obj3', 'type': 'foo'}] == resource.get_objects_by_filter(
+ '/objects', {'type': 'foo'})
+
+
+class TestIterateOverPageableResource(object):
+
+ def test_iterate_over_pageable_resource_with_no_items(self):
+ resource_func = mock.Mock(return_value={'items': []})
+
+ items = iterate_over_pageable_resource(resource_func)
+
+ assert [] == list(items)
+
+ def test_iterate_over_pageable_resource_with_one_page(self):
+ resource_func = mock.Mock(side_effect=[
+ {'items': ['foo', 'bar']},
+ {'items': []},
+ ])
+
+ items = iterate_over_pageable_resource(resource_func)
+
+ assert ['foo', 'bar'] == list(items)
+ resource_func.assert_has_calls([
+ call(query_params={'offset': 0, 'limit': 10}),
+ call(query_params={'offset': 10, 'limit': 10})
+ ])
+
+ def test_iterate_over_pageable_resource_with_multiple_pages(self):
+ resource_func = mock.Mock(side_effect=[
+ {'items': ['foo']},
+ {'items': ['bar']},
+ {'items': ['buzz']},
+ {'items': []},
+ ])
+
+ items = iterate_over_pageable_resource(resource_func)
+
+ assert ['foo', 'bar', 'buzz'] == list(items)
+
+ def test_iterate_over_pageable_resource_should_preserve_query_params(self):
+ resource_func = mock.Mock(return_value={'items': []})
+
+ items = iterate_over_pageable_resource(resource_func, {'filter': 'name:123'})
+
+ assert [] == list(items)
+ resource_func.assert_called_once_with(query_params={'filter': 'name:123', 'offset': 0, 'limit': 10})
+
+ def test_iterate_over_pageable_resource_should_preserve_limit(self):
+ resource_func = mock.Mock(side_effect=[
+ {'items': ['foo']},
+ {'items': []},
+ ])
+
+ items = iterate_over_pageable_resource(resource_func, {'limit': 1})
+
+ assert ['foo'] == list(items)
+ resource_func.assert_has_calls([
+ call(query_params={'offset': 0, 'limit': 1}),
+ call(query_params={'offset': 1, 'limit': 1})
+ ])
+
+ def test_iterate_over_pageable_resource_should_preserve_offset(self):
+ resource_func = mock.Mock(side_effect=[
+ {'items': ['foo']},
+ {'items': []},
+ ])
+
+ items = iterate_over_pageable_resource(resource_func, {'offset': 3})
+
+ assert ['foo'] == list(items)
+ resource_func.assert_has_calls([
+ call(query_params={'offset': 3, 'limit': 10}),
+ call(query_params={'offset': 13, 'limit': 10})
+ ])
+
+ def test_iterate_over_pageable_resource_should_pass_with_string_offset_and_limit(self):
+ resource_func = mock.Mock(side_effect=[
+ {'items': ['foo']},
+ {'items': []},
+ ])
+
+ items = iterate_over_pageable_resource(resource_func, {'offset': '1', 'limit': '1'})
+
+ assert ['foo'] == list(items)
+ resource_func.assert_has_calls([
+ call(query_params={'offset': '1', 'limit': '1'}),
+ call(query_params={'offset': 2, 'limit': '1'})
+ ])
diff --git a/test/units/module_utils/network/ftd/test_fdm_swagger_parser.py b/test/units/module_utils/network/ftd/test_fdm_swagger_parser.py
new file mode 100644
index 00000000000..70b2eac63dd
--- /dev/null
+++ b/test/units/module_utils/network/ftd/test_fdm_swagger_parser.py
@@ -0,0 +1,196 @@
+# Copyright (c) 2018 Cisco and/or its affiliates.
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see .
+#
+
+import copy
+import os
+import unittest
+
+from ansible.module_utils.network.ftd.common import HTTPMethod
+from ansible.module_utils.network.ftd.fdm_swagger_client import FdmSwaggerParser
+
+DIR_PATH = os.path.dirname(os.path.realpath(__file__))
+TEST_DATA_FOLDER = os.path.join(DIR_PATH, 'test_data')
+
+base = {
+ 'basePath': "/api/fdm/v2",
+ 'definitions': {"NetworkObject": {"type": "object",
+ "properties": {"version": {"type": "string"}, "name": {"type": "string"},
+ "description": {"type": "string"},
+ "subType": {"type": "object",
+ "$ref": "#/definitions/NetworkObjectType"},
+ "value": {"type": "string"},
+ "isSystemDefined": {"type": "boolean"},
+ "dnsResolution": {"type": "object",
+ "$ref": "#/definitions/FQDNDNSResolution"},
+ "id": {"type": "string"},
+ "type": {"type": "string", "default": "networkobject"}},
+ "required": ["subType", "type", "value"]},
+ "NetworkObjectWrapper": {
+ "allOf": [{"$ref": "#/definitions/NetworkObject"}, {"$ref": "#/definitions/LinksWrapper"}]}
+ },
+ 'paths': {
+ "/object/networks": {
+ "get": {"tags": ["NetworkObject"], "operationId": "getNetworkObjectList",
+ "responses": {"200": {"description": "", "schema": {"type": "object",
+ "title": "NetworkObjectList",
+ "properties": {"items": {
+ "type": "array",
+ "items": {
+ "$ref": "#/definitions/NetworkObjectWrapper"}},
+ "paging": {
+ "$ref": "#/definitions/Paging"}},
+ "required": ["items",
+ "paging"]}}},
+ "parameters": [
+ {"name": "offset", "in": "query", "required": False, "type": "integer"},
+ {"name": "limit", "in": "query", "required": False, "type": "integer"},
+ {"name": "sort", "in": "query", "required": False, "type": "string"},
+ {"name": "filter", "in": "query", "required": False, "type": "string"}]},
+ "post": {"tags": ["NetworkObject"], "operationId": "addNetworkObject",
+ "responses": {
+ "200": {"description": "",
+ "schema": {"type": "object",
+ "$ref": "#/definitions/NetworkObjectWrapper"}},
+ "422": {"description": "",
+ "schema": {"type": "object", "$ref": "#/definitions/ErrorWrapper"}}},
+ "parameters": [{"in": "body", "name": "body",
+ "required": True,
+ "schema": {"$ref": "#/definitions/NetworkObject"}}]}
+ },
+ "/object/networks/{objId}": {
+ "get": {"tags": ["NetworkObject"], "operationId": "getNetworkObject",
+ "responses": {"200": {"description": "",
+ "schema": {"type": "object",
+ "$ref": "#/definitions/NetworkObjectWrapper"}},
+ "404": {"description": "",
+ "schema": {"type": "object",
+ "$ref": "#/definitions/ErrorWrapper"}}},
+ "parameters": [{"name": "objId", "in": "path", "required": True,
+ "type": "string"}]},
+
+ "put": {"tags": ["NetworkObject"], "operationId": "editNetworkObject",
+ "responses": {"200": {"description": "",
+ "schema": {"type": "object",
+ "$ref": "#/definitions/NetworkObjectWrapper"}},
+ "422": {"description": "",
+ "schema": {"type": "object",
+ "$ref": "#/definitions/ErrorWrapper"}}},
+ "parameters": [{"name": "objId", "in": "path", "required": True,
+ "type": "string"},
+ {"in": "body", "name": "body", "required": True,
+ "schema": {"$ref": "#/definitions/NetworkObject"}}]},
+ "delete": {"tags": ["NetworkObject"], "operationId": "deleteNetworkObject",
+ "responses": {"204": {"description": ""},
+ "422": {"description": "",
+ "schema": {"type": "object",
+ "$ref": "#/definitions/ErrorWrapper"}}},
+ "parameters": [{"name": "objId", "in": "path", "required": True,
+ "type": "string"}]}}}
+}
+
+
+def _get_objects(base_object, key_names):
+ return dict((_key, base_object[_key]) for _key in key_names)
+
+
+class TestFdmSwaggerParser(unittest.TestCase):
+
+ def test_simple_object(self):
+ self._data = copy.deepcopy(base)
+
+ self.fdm_data = FdmSwaggerParser().parse_spec(self._data)
+
+ expected_operations = {
+ 'getNetworkObjectList': {
+ 'method': HTTPMethod.GET,
+ 'url': '/api/fdm/v2/object/networks',
+ 'modelName': 'NetworkObject',
+ 'parameters': {
+ 'path': {},
+ 'query': {
+ 'offset': {
+ 'required': False,
+ 'type': 'integer'
+ },
+ 'limit': {
+ 'required': False,
+ 'type': 'integer'
+ },
+ 'sort': {
+ 'required': False,
+ 'type': 'string'
+ },
+ 'filter': {
+ 'required': False,
+ 'type': 'string'
+ }
+ }
+ }
+ },
+ 'addNetworkObject': {
+ 'method': HTTPMethod.POST,
+ 'url': '/api/fdm/v2/object/networks',
+ 'modelName': 'NetworkObject',
+ 'parameters': {'path': {},
+ 'query': {}}
+ },
+ 'getNetworkObject': {
+ 'method': HTTPMethod.GET,
+ 'url': '/api/fdm/v2/object/networks/{objId}',
+ 'modelName': 'NetworkObject',
+ 'parameters': {
+ 'path': {
+ 'objId': {
+ 'required': True,
+ 'type': "string"
+ }
+ },
+ 'query': {}
+ }
+ },
+ 'editNetworkObject': {
+ 'method': HTTPMethod.PUT,
+ 'url': '/api/fdm/v2/object/networks/{objId}',
+ 'modelName': 'NetworkObject',
+ 'parameters': {
+ 'path': {
+ 'objId': {
+ 'required': True,
+ 'type': "string"
+ }
+ },
+ 'query': {}
+ }
+ },
+ 'deleteNetworkObject': {
+ 'method': HTTPMethod.DELETE,
+ 'url': '/api/fdm/v2/object/networks/{objId}',
+ 'modelName': None,
+ 'parameters': {
+ 'path': {
+ 'objId': {
+ 'required': True,
+ 'type': "string"
+ }
+ },
+ 'query': {}
+ }
+ }
+ }
+ assert sorted(['NetworkObject', 'NetworkObjectWrapper']) == sorted(self.fdm_data['models'].keys())
+ assert expected_operations == self.fdm_data['operations']
diff --git a/test/units/module_utils/network/ftd/test_fdm_swagger_validator.py b/test/units/module_utils/network/ftd/test_fdm_swagger_validator.py
new file mode 100644
index 00000000000..e597bd94385
--- /dev/null
+++ b/test/units/module_utils/network/ftd/test_fdm_swagger_validator.py
@@ -0,0 +1,1082 @@
+# Copyright (c) 2018 Cisco and/or its affiliates.
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see .
+#
+
+import os
+import unittest
+
+import pytest
+from ansible.module_utils.network.ftd.fdm_swagger_client import FdmSwaggerValidator, IllegalArgumentException
+
+DIR_PATH = os.path.dirname(os.path.realpath(__file__))
+TEST_DATA_FOLDER = os.path.join(DIR_PATH, 'test_data')
+
+mock_data = {
+ 'models': {
+ 'ReferenceModel': {'type': 'object', 'required': ['id', 'type'],
+ 'properties': {'id': {'type': 'string'}, 'type': {'type': 'string'},
+ 'version': {'type': 'string'}, 'name': {'type': 'string'}}},
+ 'FQDNDNSResolution': {'type': 'string', 'enum': ['IPV4_ONLY', 'IPV6_ONLY', 'IPV4_AND_IPV6']},
+ 'NetworkObjectType': {'type': 'string', 'enum': ['HOST', 'NETWORK', 'IPRANGE', 'FQDN']},
+ 'NetworkObject': {'type': 'object',
+ 'properties': {'version': {'type': 'string'},
+ 'name': {'type': 'string'},
+ 'description': {'type': 'string'},
+ 'subType': {'type': 'object',
+ '$ref': '#/definitions/NetworkObjectType'},
+ 'value': {'type': 'string'},
+ 'isSystemDefined': {'type': 'boolean'},
+ 'dnsResolution': {'type': 'object',
+ '$ref': '#/definitions/FQDNDNSResolution'},
+ 'objects': {'type': 'array',
+ 'items': {'type': 'object',
+ '$ref': '#/definitions/ReferenceModel'}},
+ 'id': {'type': 'string'},
+ 'type': {'type': 'string',
+ 'default': 'networkobject'}},
+ 'required': ['subType', 'type', 'value']}
+ },
+ 'operations': {
+ 'getNetworkObjectList': {
+ 'method': 'get',
+ 'url': '/api/fdm/v2/object/networks',
+ 'modelName': 'NetworkObject',
+ 'parameters': {
+ 'path': {
+ 'objId': {
+ 'required': True,
+ 'type': "string"
+ }
+ },
+ 'query': {
+ 'offset': {
+ 'required': False,
+ 'type': 'integer'
+ },
+ 'limit': {
+ 'required': True,
+ 'type': 'integer'
+ },
+ 'sort': {
+ 'required': False,
+ 'type': 'string'
+ },
+ 'filter': {
+ 'required': False,
+ 'type': 'string'
+ }
+ }
+ }
+ }
+ }
+}
+
+nested_mock_data1 = {
+ 'models': {
+ 'model1': {
+ 'type': 'object',
+ 'properties': {
+ 'f_string': {'type': 'string'},
+ 'f_number': {'type': 'number'},
+ 'f_boolean': {'type': 'boolean'},
+ 'f_integer': {'type': 'integer'}
+ },
+ 'required': ['f_string']
+ },
+ 'TestModel': {
+ 'type': 'object',
+ 'properties': {
+ 'nested_model': {'type': 'object',
+ '$ref': '#/definitions/model1'},
+ 'f_integer': {'type': 'integer'}
+ },
+ 'required': ['nested_model']
+ }
+ },
+ 'operations': {
+ 'getdata': {
+ 'modelName': 'TestModel'
+ }
+ }
+}
+
+
+def sort_validator_rez(data):
+ if 'required' in data:
+ data['required'] = sorted(data['required'])
+ if 'invalid_type' in data:
+ data['invalid_type'] = sorted(data['invalid_type'],
+ key=lambda k: '{0}{1}{2}'.format(k['path'], ['expected_type'],
+ ['actually_value']))
+
+ return data
+
+
+class TestFdmSwaggerValidator(unittest.TestCase):
+
+ @staticmethod
+ def check_illegal_argument_exception(cb, msg):
+ with pytest.raises(IllegalArgumentException) as ctx:
+ cb()
+ assert msg == str(ctx.value)
+
+ def test_path_params_valid(self):
+ self.url_data_valid(method='validate_path_params', parameters_type='path')
+
+ def test_query_params_valid(self):
+ self.url_data_valid(method='validate_query_params', parameters_type='query')
+
+ @staticmethod
+ def url_data_valid(method, parameters_type):
+ local_mock_spec = {
+ 'models': {},
+ 'operations': {
+ 'getNetwork': {
+ 'method': 'get',
+ 'parameters': {
+ parameters_type: {
+ 'objId': {
+ 'required': True,
+ 'type': "string"
+ },
+ 'p_integer': {
+ 'required': False,
+ 'type': "integer"
+ },
+ 'p_boolean': {
+ 'required': False,
+ 'type': "boolean"
+ },
+ 'p_number': {
+ 'required': False,
+ 'type': "number"
+ }
+ }
+ }
+ }
+ }
+ }
+ data = {
+ 'objId': "value1",
+ 'p_integer': 1,
+ 'p_boolean': True,
+ 'p_number': 2.3
+ }
+ validator = FdmSwaggerValidator(local_mock_spec)
+ valid, rez = getattr(validator, method)('getNetwork', data)
+ assert valid
+ assert rez is None
+
+ def test_path_params_required_fields(self):
+ self.url_data_required_fields(method='validate_path_params', parameters_type='path')
+
+ def test_query_params_required_fields(self):
+ self.url_data_required_fields(method='validate_query_params', parameters_type='query')
+
+ @staticmethod
+ def url_data_required_fields(method, parameters_type):
+ local_mock_spec = {
+ 'models': {},
+ 'operations': {
+ 'getNetwork': {
+ 'method': 'get',
+ 'parameters': {
+ parameters_type: {
+ 'objId': {
+ 'required': True,
+ 'type': "string"
+ },
+ 'parentId': {
+ 'required': True,
+ 'type': "string"
+ },
+ 'someParam': {
+ 'required': False,
+ 'type': "string"
+ },
+ 'p_integer': {
+ 'required': False,
+ 'type': "integer"
+ },
+ 'p_boolean': {
+ 'required': False,
+ 'type': "boolean"
+ },
+ 'p_number': {
+ 'required': False,
+ 'type': "number"
+ }
+ }
+ }
+ }
+ }
+ }
+ validator = FdmSwaggerValidator(local_mock_spec)
+ valid, rez = getattr(validator, method)('getNetwork', None)
+ assert not valid
+ assert sort_validator_rez({
+ 'required': ['objId', 'parentId']
+ }) == sort_validator_rez(rez)
+ valid, rez = getattr(validator, method)('getNetwork', {})
+ assert not valid
+ assert sort_validator_rez({
+ 'required': ['objId', 'parentId']
+ }) == sort_validator_rez(rez)
+ data = {
+ 'someParam': "test"
+ }
+ valid, rez = getattr(validator, method)('getNetwork', data)
+ assert not valid
+ assert sort_validator_rez({
+ 'required': ['objId', 'parentId']
+ }) == sort_validator_rez(rez)
+
+ def test_path_params_invalid_params(self):
+ self.url_params_invalid_params(method='validate_path_params', parameters_type='path')
+
+ def test_path_params_invalid_params(self):
+ self.url_params_invalid_params(method='validate_query_params', parameters_type='query')
+
+ @staticmethod
+ def url_params_invalid_params(method, parameters_type):
+ local_mock_spec = {
+ 'models': {},
+ 'operations': {
+ 'getNetwork': {
+ 'method': 'get',
+ 'parameters': {
+ parameters_type: {
+ 'objId': {
+ 'required': True,
+ 'type': "string"
+ },
+ 'parentId': {
+ 'required': True,
+ 'type': "string"
+ },
+ 'someParam': {
+ 'required': False,
+ 'type': "string"
+ },
+ 'p_integer': {
+ 'required': False,
+ 'type': "integer"
+ },
+ 'p_boolean': {
+ 'required': False,
+ 'type': "boolean"
+ },
+ 'p_number': {
+ 'required': False,
+ 'type': "number"
+ }
+ }
+ }
+ }
+ }
+ }
+ validator = FdmSwaggerValidator(local_mock_spec)
+ data = {
+ 'objId': 1,
+ 'parentId': True,
+ 'someParam': [],
+ 'p_integer': 1.2,
+ 'p_boolean': 0,
+ 'p_number': False
+ }
+ valid, rez = getattr(validator, method)('getNetwork', data)
+ assert not valid
+ assert sort_validator_rez({
+ 'invalid_type': [
+ {
+ 'path': 'objId',
+ 'expected_type': 'string',
+ 'actually_value': 1
+ },
+ {
+ 'path': 'parentId',
+ 'expected_type': 'string',
+ 'actually_value': True
+ },
+ {
+ 'path': 'someParam',
+ 'expected_type': 'string',
+ 'actually_value': []
+ },
+ {
+ 'path': 'p_integer',
+ 'expected_type': 'integer',
+ 'actually_value': 1.2
+ },
+ {
+ 'path': 'p_boolean',
+ 'expected_type': 'boolean',
+ 'actually_value': 0
+ },
+ {
+ 'path': 'p_number',
+ 'expected_type': 'number',
+ 'actually_value': False
+ }
+ ]
+ }) == sort_validator_rez(rez)
+ data = {
+ 'objId': {},
+ 'parentId': 0,
+ 'someParam': 1.2,
+ 'p_integer': True,
+ 'p_boolean': 1,
+ 'p_number': True
+ }
+ valid, rez = getattr(validator, method)('getNetwork', data)
+ assert not valid
+ assert sort_validator_rez({
+ 'invalid_type': [
+ {
+ 'path': 'objId',
+ 'expected_type': 'string',
+ 'actually_value': {}
+ },
+ {
+ 'path': 'parentId',
+ 'expected_type': 'string',
+ 'actually_value': 0
+ },
+ {
+ 'path': 'someParam',
+ 'expected_type': 'string',
+ 'actually_value': 1.2
+ },
+ {
+ 'path': 'p_integer',
+ 'expected_type': 'integer',
+ 'actually_value': True
+ },
+ {
+ 'path': 'p_boolean',
+ 'expected_type': 'boolean',
+ 'actually_value': 1
+ },
+ {
+ 'path': 'p_number',
+ 'expected_type': 'number',
+ 'actually_value': True
+ }
+ ]
+ }) == sort_validator_rez(rez)
+ data = {
+ 'objId': {},
+ 'parentId': 0,
+ 'someParam': 1.2,
+ 'p_integer': "1",
+ 'p_boolean': "",
+ 'p_number': "2"
+ }
+ valid, rez = getattr(validator, method)('getNetwork', data)
+ assert not valid
+ assert sort_validator_rez({
+ 'invalid_type': [
+ {
+ 'path': 'objId',
+ 'expected_type': 'string',
+ 'actually_value': {}
+ },
+ {
+ 'path': 'parentId',
+ 'expected_type': 'string',
+ 'actually_value': 0
+ },
+ {
+ 'path': 'someParam',
+ 'expected_type': 'string',
+ 'actually_value': 1.2
+ },
+ {
+ 'path': 'p_integer',
+ 'expected_type': 'integer',
+ 'actually_value': "1"
+ },
+ {
+ 'path': 'p_boolean',
+ 'expected_type': 'boolean',
+ 'actually_value': ""
+ },
+ {
+ 'path': 'p_number',
+ 'expected_type': 'number',
+ 'actually_value': "2"
+ }
+ ]
+ }) == sort_validator_rez(rez)
+
+ def test_validate_path_params_method_with_empty_data(self):
+ self.validate_url_data_with_empty_data(method='validate_path_params', parameters_type='path')
+
+ def test_validate_query_params_method_with_empty_data(self):
+ self.validate_url_data_with_empty_data(method='validate_query_params', parameters_type='query')
+
+ def validate_url_data_with_empty_data(self, method, parameters_type):
+ local_mock_spec = {
+ 'models': {},
+ 'operations': {
+ 'getNetwork': {
+ 'method': 'get',
+ 'parameters': {
+ parameters_type: {
+ 'objId': {
+ 'required': True,
+ 'type': "string"
+ }
+ }
+ }
+ }
+ }
+ }
+ validator = FdmSwaggerValidator(local_mock_spec)
+ valid, rez = getattr(validator, method)('getNetwork', None)
+ assert not valid
+ assert {'required': ['objId']} == rez
+
+ self.check_illegal_argument_exception(lambda: getattr(validator, method)('getNetwork', ''),
+ "The params parameter must be a dict")
+
+ self.check_illegal_argument_exception(lambda: getattr(validator, method)('getNetwork', []),
+ "The params parameter must be a dict")
+
+ valid, rez = getattr(validator, method)('getNetwork', {})
+ assert not valid
+ assert {'required': ['objId']} == rez
+
+ self.check_illegal_argument_exception(lambda: getattr(validator, method)(None, {'name': 'test'}),
+ "The operation_name parameter must be a non-empty string")
+
+ self.check_illegal_argument_exception(lambda: getattr(validator, method)('', {'name': 'test'}),
+ "The operation_name parameter must be a non-empty string")
+
+ self.check_illegal_argument_exception(lambda: getattr(validator, method)([], {'name': 'test'}),
+ "The operation_name parameter must be a non-empty string")
+
+ self.check_illegal_argument_exception(lambda: getattr(validator, method)({}, {'name': 'test'}),
+ "The operation_name parameter must be a non-empty string")
+
+ self.check_illegal_argument_exception(
+ lambda: getattr(validator, method)('operation_does_not_exist', {'name': 'test'}),
+ "operation_does_not_exist operation does not support")
+
+ def test_validate_data_method_with_empty_data(self):
+ validator = FdmSwaggerValidator(mock_data)
+ valid, rez = validator.validate_data('getNetworkObjectList', None)
+ assert not valid
+ assert sort_validator_rez({
+ 'required': ['subType', 'type', 'value']
+ }) == sort_validator_rez(rez)
+
+ self.check_illegal_argument_exception(lambda: validator.validate_data('getNetworkObjectList', ''),
+ "The data parameter must be a dict")
+
+ self.check_illegal_argument_exception(lambda: validator.validate_data('getNetworkObjectList', []),
+ "The data parameter must be a dict")
+ valid, rez = validator.validate_data('getNetworkObjectList', {})
+ assert not valid
+ assert sort_validator_rez({
+ 'required': ['subType', 'type', 'value']
+ }) == sort_validator_rez(rez)
+
+ self.check_illegal_argument_exception(lambda: validator.validate_data(None, {'name': 'test'}),
+ "The operation_name parameter must be a non-empty string")
+
+ self.check_illegal_argument_exception(lambda: validator.validate_data('', {'name': 'test'}),
+ "The operation_name parameter must be a non-empty string")
+
+ self.check_illegal_argument_exception(lambda: validator.validate_data([], {'name': 'test'}),
+ "The operation_name parameter must be a non-empty string")
+
+ self.check_illegal_argument_exception(lambda: validator.validate_data({}, {'name': 'test'}),
+ "The operation_name parameter must be a non-empty string")
+
+ self.check_illegal_argument_exception(
+ lambda: validator.validate_data('operation_does_not_exist', {'name': 'test'}),
+ "operation_does_not_exist operation does not support")
+
+ def test_errors_for_required_fields(self):
+ data = {
+ 'name': 'test'
+ }
+ valid, rez = FdmSwaggerValidator(mock_data).validate_data('getNetworkObjectList', data)
+ assert not valid
+ assert sort_validator_rez({
+ 'required': ['subType', 'type', 'value']
+ }) == sort_validator_rez(rez)
+
+ def test_errors_if_no_data_was_passed(self):
+ data = {}
+ valid, rez = FdmSwaggerValidator(mock_data).validate_data('getNetworkObjectList', data)
+ assert not valid
+ assert sort_validator_rez({
+ 'required': ['subType', 'type', 'value']
+ }) == sort_validator_rez(rez)
+
+ def test_errors_if_one_required_field_is_empty(self):
+ data = {
+ 'subType': 'NETWORK',
+ 'value': '1.1.1.1'
+ }
+ valid, rez = FdmSwaggerValidator(mock_data).validate_data('getNetworkObjectList', data)
+ assert not valid
+ assert {'required': ['type']} == rez
+
+ def test_types_of_required_fields_are_incorrect(self):
+ data = {
+ 'subType': True,
+ 'type': 1,
+ 'value': False
+ }
+ valid, rez = FdmSwaggerValidator(mock_data).validate_data('getNetworkObjectList', data)
+ assert not valid
+ assert sort_validator_rez({
+ 'invalid_type': [
+ {
+ 'path': 'subType',
+ 'expected_type': 'enum',
+ 'actually_value': True
+ },
+ {
+ 'path': 'value',
+ 'expected_type': 'string',
+ 'actually_value': False
+ },
+ {
+ 'path': 'type',
+ 'expected_type': 'string',
+ 'actually_value': 1
+ }
+ ]
+ }) == sort_validator_rez(rez)
+ data = {
+ 'subType': {},
+ 'type': [],
+ 'value': {}
+ }
+ valid, rez = FdmSwaggerValidator(mock_data).validate_data('getNetworkObjectList', data)
+ assert not valid
+ assert sort_validator_rez({
+ 'invalid_type': [
+ {
+ 'path': 'subType',
+ 'expected_type': 'enum',
+ 'actually_value': {}
+ },
+ {
+ 'path': 'value',
+ 'expected_type': 'string',
+ 'actually_value': {}
+ },
+ {
+ 'path': 'type',
+ 'expected_type': 'string',
+ 'actually_value': []
+ }
+ ]
+ }) == sort_validator_rez(rez)
+
+ def test_pass_only_required_fields(self):
+ data = {
+ 'subType': 'NETWORK',
+ 'type': 'networkobject',
+ 'value': '1.1.1.1'
+ }
+ valid, rez = FdmSwaggerValidator(mock_data).validate_data('getNetworkObjectList', data)
+ assert valid
+ assert rez is None
+
+ def test_pass_all_fields_with_correct_data(self):
+ data = {
+ 'id': 'id-di',
+ 'version': 'v',
+ 'name': 'test_name',
+ 'subType': 'NETWORK',
+ 'type': 'networkobject',
+ 'value': '1.1.1.1',
+ 'description': 'des',
+ 'isSystemDefined': False,
+ 'dnsResolution': 'IPV4_ONLY',
+ 'objects': [{
+ 'type': 'port',
+ 'id': 'fs-sf'
+ }]
+ }
+ valid, rez = FdmSwaggerValidator(mock_data).validate_data('getNetworkObjectList', data)
+ assert valid
+ assert rez is None
+
+ def test_array_data_is_not_correct(self):
+ data = {
+ 'name': 'test_name',
+ 'subType': 'NETWORK',
+ 'type': 'networkobject',
+ 'value': '1.1.1.1',
+ 'objects': [
+ {
+ 'id': 'fs-sf'
+ },
+ {
+ 'type': 'type'
+ },
+ {},
+ {
+ 'id': 1,
+ 'type': True
+ },
+ [],
+ 'test'
+ ]
+ }
+ valid, rez = FdmSwaggerValidator(mock_data).validate_data('getNetworkObjectList', data)
+ assert not valid
+ assert sort_validator_rez({
+ 'required': ['objects[0].type', 'objects[1].id', 'objects[2].id', 'objects[2].type'],
+ 'invalid_type': [
+ {
+ 'path': 'objects[3].id',
+ 'expected_type': 'string',
+ 'actually_value': 1
+ },
+ {
+ 'path': 'objects[3].type',
+ 'expected_type': 'string',
+ 'actually_value': True
+ },
+ {
+ 'path': 'objects[4]',
+ 'expected_type': 'object',
+ 'actually_value': []
+ },
+ {
+ 'path': 'objects[5]',
+ 'expected_type': 'object',
+ 'actually_value': 'test'
+ }
+ ]
+ }) == sort_validator_rez(rez)
+
+ def test_simple_types(self):
+ local_mock_data = {
+ 'models': {
+ 'TestModel': {
+ 'type': 'object',
+ 'properties': {
+ 'f_string': {'type': 'string'},
+ 'f_number': {'type': 'number'},
+ 'f_boolean': {'type': 'boolean'},
+ 'f_integer': {'type': 'integer'}
+ },
+ 'required': []
+ }
+ },
+ 'operations': {
+ 'getdata': {
+ 'modelName': 'TestModel'
+ }
+ }
+ }
+ valid_data = {
+ "f_string": "test",
+ "f_number": 2.2,
+ "f_boolean": False,
+ "f_integer": 1
+ }
+
+ valid, rez = FdmSwaggerValidator(local_mock_data).validate_data('getdata', valid_data)
+ assert valid
+ assert rez is None
+
+ valid_data = {
+ "f_string": "",
+ "f_number": 0,
+ "f_boolean": True,
+ "f_integer": 0
+ }
+
+ valid, rez = FdmSwaggerValidator(local_mock_data).validate_data('getdata', valid_data)
+ assert valid
+ assert rez is None
+
+ valid_data = {
+ "f_string": "0",
+ "f_number": 100,
+ "f_boolean": True,
+ "f_integer": 2
+ }
+
+ valid, rez = FdmSwaggerValidator(local_mock_data).validate_data('getdata', valid_data)
+ assert valid
+ assert rez is None
+
+ def test_invalid_simple_types(self):
+ local_mock_data = {
+ 'models': {
+ 'TestModel': {
+ 'type': 'object',
+ 'properties': {
+ 'f_string': {'type': 'string'},
+ 'f_number': {'type': 'number'},
+ 'f_boolean': {'type': 'boolean'},
+ 'f_integer': {'type': 'integer'}
+ },
+ 'required': []
+ }
+ },
+ 'operations': {
+ 'getdata': {
+ 'modelName': 'TestModel'
+ }
+ }
+ }
+ invalid_data = {
+ "f_string": True,
+ "f_number": True,
+ "f_boolean": 1,
+ "f_integer": True
+ }
+
+ valid, rez = FdmSwaggerValidator(local_mock_data).validate_data('getdata', invalid_data)
+ assert not valid
+ assert sort_validator_rez({
+ 'invalid_type': [
+ {
+ 'path': 'f_string',
+ 'expected_type': 'string',
+ 'actually_value': True
+ },
+ {
+ 'path': 'f_number',
+ 'expected_type': 'number',
+ 'actually_value': True
+ },
+ {
+ 'path': 'f_boolean',
+ 'expected_type': 'boolean',
+ 'actually_value': 1
+ },
+ {
+ 'path': 'f_integer',
+ 'expected_type': 'integer',
+ 'actually_value': True
+ }
+ ]
+ }) == sort_validator_rez(rez)
+
+ invalid_data = {
+ "f_string": 1,
+ "f_number": False,
+ "f_boolean": 0,
+ "f_integer": "test"
+ }
+
+ valid, rez = FdmSwaggerValidator(local_mock_data).validate_data('getdata', invalid_data)
+ assert not valid
+ assert sort_validator_rez({
+ 'invalid_type': [
+ {
+ 'path': 'f_string',
+ 'expected_type': 'string',
+ 'actually_value': 1
+ },
+ {
+ 'path': 'f_number',
+ 'expected_type': 'number',
+ 'actually_value': False
+ },
+ {
+ 'path': 'f_boolean',
+ 'expected_type': 'boolean',
+ 'actually_value': 0
+ },
+ {
+ 'path': 'f_integer',
+ 'expected_type': 'integer',
+ 'actually_value': "test"
+ }
+ ]
+ }) == sort_validator_rez(rez)
+
+ invalid_data = {
+ "f_string": False,
+ "f_number": "1",
+ "f_boolean": "",
+ "f_integer": 1.2
+ }
+
+ valid, rez = FdmSwaggerValidator(local_mock_data).validate_data('getdata', invalid_data)
+ assert not valid
+ assert sort_validator_rez({
+ 'invalid_type': [
+ {
+ 'path': 'f_string',
+ 'expected_type': 'string',
+ 'actually_value': False
+ },
+ {
+ 'path': 'f_number',
+ 'expected_type': 'number',
+ 'actually_value': "1"
+ },
+ {
+ 'path': 'f_boolean',
+ 'expected_type': 'boolean',
+ 'actually_value': ""
+ },
+ {
+ 'path': 'f_integer',
+ 'expected_type': 'integer',
+ 'actually_value': 1.2
+ }
+ ]
+ }) == sort_validator_rez(rez)
+
+ def test_nested_required_fields(self):
+ valid_data = {
+ 'nested_model': {
+ 'f_string': "test"
+ }
+ }
+
+ valid, rez = FdmSwaggerValidator(nested_mock_data1).validate_data('getdata', valid_data)
+ assert valid
+ assert rez is None
+
+ def test_invalid_nested_required_fields(self):
+ invalid_data = {
+ 'f_integer': 2
+ }
+
+ valid, rez = FdmSwaggerValidator(nested_mock_data1).validate_data('getdata', invalid_data)
+ assert not valid
+ assert {'required': ['nested_model']} == rez
+
+ invalid_data = {
+ 'nested_model': {
+ 'f_number': 1.2
+ }
+ }
+
+ valid, rez = FdmSwaggerValidator(nested_mock_data1).validate_data('getdata', invalid_data)
+ assert not valid
+ assert {'required': ['nested_model.f_string']} == rez
+
+ def test_invalid_type_in_nested_fields(self):
+ invalid_data = {
+ 'nested_model': {
+ "f_string": 1,
+ "f_number": "ds",
+ "f_boolean": 1.3,
+ "f_integer": True
+ }
+ }
+
+ valid, rez = FdmSwaggerValidator(nested_mock_data1).validate_data('getdata', invalid_data)
+ assert not valid
+ assert sort_validator_rez({
+ 'invalid_type': [
+ {
+ 'path': 'nested_model.f_string',
+ 'expected_type': 'string',
+ 'actually_value': 1
+ },
+ {
+ 'path': 'nested_model.f_number',
+ 'expected_type': 'number',
+ 'actually_value': "ds"
+ },
+ {
+ 'path': 'nested_model.f_boolean',
+ 'expected_type': 'boolean',
+ 'actually_value': 1.3
+ },
+ {
+ 'path': 'nested_model.f_integer',
+ 'expected_type': 'integer',
+ 'actually_value': True
+ }
+ ]
+
+ }) == sort_validator_rez(rez)
+
+ def test_few_levels_nested_fields(self):
+ local_mock_data = {
+ 'models': {
+ 'Model2': {
+ 'type': 'object',
+ 'required': ['ms', 'ts'],
+ 'properties': {
+ 'ms': {'type': 'array',
+ 'items': {
+ 'type': 'object',
+ '$ref': '#/definitions/ReferenceModel'}},
+ 'ts': {'type': 'array',
+ 'items': {
+ 'type': 'object',
+ '$ref': '#/definitions/ReferenceModel'}}
+ }
+ },
+ 'NetworkObjectType': {'type': 'string', 'enum': ['HOST', 'NETWORK', 'IPRANGE', 'FQDN']},
+ 'Fragment': {'type': 'object',
+ 'required': ['type', 'objects', 'subType', 'object'],
+ 'properties': {
+ 'objects': {'type': 'array',
+ 'items': {
+ 'type': 'object',
+ '$ref': '#/definitions/ReferenceModel'}},
+ 'object': {'type': 'object',
+ '$ref': '#/definitions/Model2'},
+ 'subType': {'type': 'object',
+ '$ref': '#/definitions/NetworkObjectType'},
+ 'type': {'type': 'string'},
+ 'value': {'type': 'number'},
+ 'name': {'type': 'string'}}},
+ 'ReferenceModel': {'type': 'object', 'required': ['id', 'type'],
+ 'properties': {
+ 'id': {'type': 'string'},
+ 'type': {'type': 'string'},
+ 'version': {'type': 'string'},
+ 'name': {'type': 'string'}}},
+ 'model1': {
+ 'type': 'object',
+ 'properties': {
+ 'f_string': {'type': 'string'},
+ 'f_number': {'type': 'number'},
+ 'f_boolean': {'type': 'boolean'},
+ 'f_integer': {'type': 'integer'},
+ 'objects': {'type': 'array',
+ 'items': {
+ 'type': 'object',
+ '$ref': '#/definitions/ReferenceModel'}},
+ 'fragments': {'type': 'array',
+ 'items': {
+ 'type': 'object',
+ '$ref': '#/definitions/Fragment'}}
+ },
+ 'required': ['f_string', 'objects', 'fragments']
+ },
+ 'TestModel': {
+ 'type': 'object',
+ 'properties': {
+ 'nested_model': {'type': 'object',
+ '$ref': '#/definitions/model1'},
+ 'f_integer': {'type': 'integer'}
+ },
+ 'required': ['nested_model']
+ }
+ },
+ 'operations': {
+ 'getdata': {
+ 'modelName': 'TestModel'
+ }
+ }
+ }
+
+ valid_data = {
+ "nested_model": {
+ 'objects': [{
+ 'type': 't1',
+ 'id': 'id1'
+ }],
+ 'fragments': [{
+ 'type': "test",
+ 'subType': 'NETWORK',
+ 'object': {
+ 'ts': [],
+ 'ms': [{
+ 'type': "tt",
+ 'id': 'id'
+ }]
+ },
+ 'objects': [{
+ 'type': 't',
+ 'id': 'id'
+ }]
+ }],
+ 'f_string': '1'
+ }
+ }
+
+ valid, rez = FdmSwaggerValidator(local_mock_data).validate_data('getdata', valid_data)
+ assert valid
+ assert rez is None
+
+ valid_data = {
+ "nested_model": {
+ 'objects': [{
+ 'type': 't1',
+ 'id': 'id1'
+ }],
+ 'fragments': [{
+ 'type': "test",
+ 'subType': 'NETWORK',
+ 'object': {
+ 'ms': {}
+ },
+ 'objects': [{
+ 'type': 't',
+ 'id': 'id'
+ }]
+ }],
+ 'f_string': '1'
+ }
+ }
+
+ valid, rez = FdmSwaggerValidator(local_mock_data).validate_data('getdata', valid_data)
+ assert not valid
+ assert sort_validator_rez({
+ 'required': ['nested_model.fragments[0].object.ts'],
+ 'invalid_type': [{
+ 'path': 'nested_model.fragments[0].object.ms',
+ 'expected_type': 'array',
+ 'actually_value': {}
+ }]
+ }) == sort_validator_rez(rez)
+
+ valid_data = {
+ "nested_model": {
+ 'objects': [{
+ 'type': 't1',
+ 'id': 'id1'
+ }],
+ 'fragments': [{
+ 'type': "test",
+ 'subType': 'NETWORK',
+ 'object': [],
+ 'objects': {}
+ }],
+ 'f_string': '1'
+ }
+ }
+
+ valid, rez = FdmSwaggerValidator(local_mock_data).validate_data('getdata', valid_data)
+ assert not valid
+ assert sort_validator_rez({
+ 'invalid_type': [
+ {
+ 'path': 'nested_model.fragments[0].objects',
+ 'expected_type': 'array',
+ 'actually_value': {}
+ },
+ {
+ 'path': 'nested_model.fragments[0].object',
+ 'expected_type': 'object',
+ 'actually_value': []}
+ ]}) == sort_validator_rez(rez)
diff --git a/test/units/modules/network/ftd/__init__.py b/test/units/modules/network/ftd/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/test/units/modules/network/ftd/test_ftd_configuration.py b/test/units/modules/network/ftd/test_ftd_configuration.py
new file mode 100644
index 00000000000..95358921190
--- /dev/null
+++ b/test/units/modules/network/ftd/test_ftd_configuration.py
@@ -0,0 +1,345 @@
+# Copyright (c) 2018 Cisco and/or its affiliates.
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see .
+#
+
+from __future__ import absolute_import
+
+import json
+
+import pytest
+
+from ansible.module_utils import basic
+from ansible.module_utils.network.ftd.common import HTTPMethod, FtdConfigurationError, FtdServerError
+from ansible.modules.network.ftd import ftd_configuration
+from units.modules.utils import set_module_args, exit_json, fail_json, AnsibleFailJson, AnsibleExitJson
+
+ADD_RESPONSE = {'status': 'Object added'}
+EDIT_RESPONSE = {'status': 'Object edited'}
+DELETE_RESPONSE = {'status': 'Object deleted'}
+GET_BY_FILTER_RESPONSE = [{'name': 'foo', 'description': 'bar'}]
+ARBITRARY_RESPONSE = {'status': 'Arbitrary request sent'}
+
+
+class TestFtdConfiguration(object):
+ module = ftd_configuration
+
+ @pytest.fixture(autouse=True)
+ def module_mock(self, mocker):
+ return mocker.patch.multiple(basic.AnsibleModule, exit_json=exit_json, fail_json=fail_json)
+
+ @pytest.fixture
+ def connection_mock(self, mocker):
+ connection_class_mock = mocker.patch('ansible.modules.network.ftd.ftd_configuration.Connection')
+ connection_instance = connection_class_mock.return_value
+ connection_instance.validate_data.return_value = True, None
+ connection_instance.validate_query_params.return_value = True, None
+ connection_instance.validate_path_params.return_value = True, None
+
+ return connection_instance
+
+ @pytest.fixture
+ def resource_mock(self, mocker):
+ resource_class_mock = mocker.patch('ansible.modules.network.ftd.ftd_configuration.BaseConfigurationResource')
+ resource_instance = resource_class_mock.return_value
+ resource_instance.add_object.return_value = ADD_RESPONSE
+ resource_instance.edit_object.return_value = EDIT_RESPONSE
+ resource_instance.delete_object.return_value = DELETE_RESPONSE
+ resource_instance.send_request.return_value = ARBITRARY_RESPONSE
+ resource_instance.get_objects_by_filter.return_value = GET_BY_FILTER_RESPONSE
+ return resource_instance
+
+ def test_module_should_fail_without_operation_arg(self):
+ set_module_args({})
+
+ with pytest.raises(AnsibleFailJson) as ex:
+ self.module.main()
+
+ assert 'missing required arguments: operation' in str(ex)
+
+ def test_module_should_fail_when_no_operation_spec_found(self, connection_mock):
+ connection_mock.get_operation_spec.return_value = None
+ set_module_args({'operation': 'nonExistingOperation'})
+
+ with pytest.raises(AnsibleFailJson) as ex:
+ self.module.main()
+
+ assert 'Invalid operation name provided: nonExistingOperation' in str(ex)
+
+ def test_module_should_add_object_when_add_operation(self, connection_mock, resource_mock):
+ connection_mock.get_operation_spec.return_value = {
+ 'method': HTTPMethod.POST,
+ 'url': '/object'
+ }
+
+ params = {
+ 'operation': 'addObject',
+ 'data': {'name': 'testObject', 'type': 'object'}
+ }
+ result = self._run_module(params)
+
+ assert ADD_RESPONSE == result['response']
+ resource_mock.add_object.assert_called_with(connection_mock.get_operation_spec.return_value['url'],
+ params['data'], None, None)
+
+ def test_module_should_edit_object_when_edit_operation(self, connection_mock, resource_mock):
+ connection_mock.get_operation_spec.return_value = {
+ 'method': HTTPMethod.PUT,
+ 'url': '/object/{objId}'
+ }
+
+ params = {
+ 'operation': 'editObject',
+ 'data': {'id': '123', 'name': 'testObject', 'type': 'object'},
+ 'path_params': {'objId': '123'}
+ }
+ result = self._run_module(params)
+
+ assert EDIT_RESPONSE == result['response']
+ resource_mock.edit_object.assert_called_with(connection_mock.get_operation_spec.return_value['url'],
+ params['data'],
+ params['path_params'], None)
+
+ def test_module_should_delete_object_when_delete_operation(self, connection_mock, resource_mock):
+ connection_mock.get_operation_spec.return_value = {
+ 'method': HTTPMethod.DELETE,
+ 'url': '/object/{objId}'
+ }
+
+ params = {
+ 'operation': 'deleteObject',
+ 'path_params': {'objId': '123'}
+ }
+ result = self._run_module(params)
+
+ assert DELETE_RESPONSE == result['response']
+ resource_mock.delete_object.assert_called_with(connection_mock.get_operation_spec.return_value['url'],
+ params['path_params'])
+
+ def test_module_should_get_objects_by_filter_when_find_by_filter_operation(self, connection_mock, resource_mock):
+ connection_mock.get_operation_spec.return_value = {
+ 'method': HTTPMethod.GET,
+ 'url': '/objects'
+ }
+
+ params = {
+ 'operation': 'getObjectList',
+ 'filters': {'name': 'foo'}
+ }
+ result = self._run_module(params)
+
+ assert GET_BY_FILTER_RESPONSE == result['response']
+ resource_mock.get_objects_by_filter.assert_called_with(connection_mock.get_operation_spec.return_value['url'],
+ params['filters'],
+ None, None)
+
+ def test_module_should_send_request_when_arbitrary_operation(self, connection_mock, resource_mock):
+ connection_mock.get_operation_spec.return_value = {
+ 'method': HTTPMethod.GET,
+ 'url': '/object/status/{objId}'
+ }
+
+ params = {
+ 'operation': 'checkStatus',
+ 'path_params': {'objId': '123'}
+ }
+ result = self._run_module(params)
+
+ assert ARBITRARY_RESPONSE == result['response']
+ resource_mock.send_request.assert_called_with(connection_mock.get_operation_spec.return_value['url'],
+ HTTPMethod.GET, None,
+ params['path_params'], None)
+
+ def test_module_should_fail_when_operation_raises_configuration_error(self, connection_mock, resource_mock):
+ connection_mock.get_operation_spec.return_value = {'method': HTTPMethod.GET, 'url': '/test'}
+ resource_mock.send_request.side_effect = FtdConfigurationError('Foo error.')
+
+ result = self._run_module_with_fail_json({'operation': 'failure'})
+ assert result['failed']
+ assert 'Failed to execute failure operation because of the configuration error: Foo error.' == result['msg']
+
+ def test_module_should_fail_when_operation_raises_server_error(self, connection_mock, resource_mock):
+ connection_mock.get_operation_spec.return_value = {'method': HTTPMethod.GET, 'url': '/test'}
+ resource_mock.send_request.side_effect = FtdServerError({'error': 'foo'}, 500)
+
+ result = self._run_module_with_fail_json({'operation': 'failure'})
+ assert result['failed']
+ assert 'Server returned an error trying to execute failure operation. Status code: 500. ' \
+ 'Server response: {\'error\': \'foo\'}' == result['msg']
+
+ def test_module_should_fail_if_validation_error_in_data(self, connection_mock):
+ connection_mock.get_operation_spec.return_value = {'method': HTTPMethod.POST, 'url': '/test'}
+ report = {
+ 'required': ['objects[0].type'],
+ 'invalid_type': [
+ {
+ 'path': 'objects[3].id',
+ 'expected_type': 'string',
+ 'actually_value': 1
+ }
+ ]
+ }
+ connection_mock.validate_data.return_value = (False, json.dumps(report, sort_keys=True, indent=4))
+
+ result = self._run_module_with_fail_json({
+ 'operation': 'test',
+ 'data': {}
+ })
+ key = 'Invalid data provided'
+ assert result['msg'][key]
+ result['msg'][key] = json.loads(result['msg'][key])
+ assert result == {
+ 'msg':
+ {key: {
+ 'invalid_type': [{'actually_value': 1, 'expected_type': 'string', 'path': 'objects[3].id'}],
+ 'required': ['objects[0].type']
+ }},
+ 'failed': True}
+
+ def test_module_should_fail_if_validation_error_in_query_params(self, connection_mock):
+ connection_mock.get_operation_spec.return_value = {'method': HTTPMethod.GET, 'url': '/test'}
+ report = {
+ 'required': ['objects[0].type'],
+ 'invalid_type': [
+ {
+ 'path': 'objects[3].id',
+ 'expected_type': 'string',
+ 'actually_value': 1
+ }
+ ]
+ }
+ connection_mock.validate_query_params.return_value = (False, json.dumps(report, sort_keys=True, indent=4))
+
+ result = self._run_module_with_fail_json({
+ 'operation': 'test',
+ 'data': {}
+ })
+ key = 'Invalid query_params provided'
+ assert result['msg'][key]
+ result['msg'][key] = json.loads(result['msg'][key])
+
+ assert result == {'msg': {key: {
+ 'invalid_type': [{'actually_value': 1, 'expected_type': 'string', 'path': 'objects[3].id'}],
+ 'required': ['objects[0].type']}}, 'failed': True}
+
+ def test_module_should_fail_if_validation_error_in_path_params(self, connection_mock):
+ connection_mock.get_operation_spec.return_value = {'method': HTTPMethod.GET, 'url': '/test'}
+ report = {
+ 'path_params': {
+ 'required': ['objects[0].type'],
+ 'invalid_type': [
+ {
+ 'path': 'objects[3].id',
+ 'expected_type': 'string',
+ 'actually_value': 1
+ }
+ ]
+ }
+ }
+ connection_mock.validate_path_params.return_value = (False, json.dumps(report, sort_keys=True, indent=4))
+
+ result = self._run_module_with_fail_json({
+ 'operation': 'test',
+ 'data': {}
+ })
+ key = 'Invalid path_params provided'
+ assert result['msg'][key]
+ result['msg'][key] = json.loads(result['msg'][key])
+
+ assert result == {'msg': {key: {
+ 'path_params': {
+ 'invalid_type': [{'actually_value': 1, 'expected_type': 'string', 'path': 'objects[3].id'}],
+ 'required': ['objects[0].type']}}}, 'failed': True}
+
+ def test_module_should_fail_if_validation_error_in_all_params(self, connection_mock):
+ connection_mock.get_operation_spec.return_value = {'method': HTTPMethod.POST, 'url': '/test'}
+ report = {
+ 'data': {
+ 'required': ['objects[0].type'],
+ 'invalid_type': [
+ {
+ 'path': 'objects[3].id',
+ 'expected_type': 'string',
+ 'actually_value': 1
+ }
+ ]
+ },
+ 'path_params': {
+ 'required': ['some_param'],
+ 'invalid_type': [
+ {
+ 'path': 'name',
+ 'expected_type': 'string',
+ 'actually_value': True
+ }
+ ]
+ },
+ 'query_params': {
+ 'required': ['other_param'],
+ 'invalid_type': [
+ {
+ 'path': 'f_integer',
+ 'expected_type': 'integer',
+ 'actually_value': "test"
+ }
+ ]
+ }
+ }
+ connection_mock.validate_data.return_value = (False, json.dumps(report['data'], sort_keys=True, indent=4))
+ connection_mock.validate_query_params.return_value = (False,
+ json.dumps(report['query_params'], sort_keys=True,
+ indent=4))
+ connection_mock.validate_path_params.return_value = (False,
+ json.dumps(report['path_params'], sort_keys=True,
+ indent=4))
+
+ result = self._run_module_with_fail_json({
+ 'operation': 'test',
+ 'data': {}
+ })
+ key_data = 'Invalid data provided'
+ assert result['msg'][key_data]
+ result['msg'][key_data] = json.loads(result['msg'][key_data])
+
+ key_path_params = 'Invalid path_params provided'
+ assert result['msg'][key_path_params]
+ result['msg'][key_path_params] = json.loads(result['msg'][key_path_params])
+
+ key_query_params = 'Invalid query_params provided'
+ assert result['msg'][key_query_params]
+ result['msg'][key_query_params] = json.loads(result['msg'][key_query_params])
+
+ assert result == {'msg': {
+ key_data: {'invalid_type': [{'actually_value': 1, 'expected_type': 'string', 'path': 'objects[3].id'}],
+ 'required': ['objects[0].type']},
+ key_path_params: {'invalid_type': [{'actually_value': True, 'expected_type': 'string', 'path': 'name'}],
+ 'required': ['some_param']},
+ key_query_params: {
+ 'invalid_type': [{'actually_value': 'test', 'expected_type': 'integer', 'path': 'f_integer'}],
+ 'required': ['other_param']}}, 'failed': True}
+
+ def _run_module(self, module_args):
+ set_module_args(module_args)
+ with pytest.raises(AnsibleExitJson) as ex:
+ self.module.main()
+ return ex.value.args[0]
+
+ def _run_module_with_fail_json(self, module_args):
+ set_module_args(module_args)
+ with pytest.raises(AnsibleFailJson) as exc:
+ self.module.main()
+ result = exc.value.args[0]
+ return result
diff --git a/test/units/modules/network/ftd/test_ftd_file_download.py b/test/units/modules/network/ftd/test_ftd_file_download.py
new file mode 100644
index 00000000000..f11085e43de
--- /dev/null
+++ b/test/units/modules/network/ftd/test_ftd_file_download.py
@@ -0,0 +1,98 @@
+# Copyright (c) 2018 Cisco and/or its affiliates.
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see .
+#
+
+from __future__ import absolute_import
+
+import pytest
+
+from ansible.module_utils import basic
+from ansible.module_utils.network.ftd.common import HTTPMethod
+from ansible.module_utils.network.ftd.fdm_swagger_client import FILE_MODEL_NAME, OperationField
+from ansible.modules.network.ftd import ftd_file_download
+from units.modules.utils import set_module_args, exit_json, fail_json, AnsibleFailJson, AnsibleExitJson
+
+
+class TestFtdFileDownload(object):
+ module = ftd_file_download
+
+ @pytest.fixture(autouse=True)
+ def module_mock(self, mocker):
+ return mocker.patch.multiple(basic.AnsibleModule, exit_json=exit_json, fail_json=fail_json)
+
+ @pytest.fixture
+ def connection_mock(self, mocker):
+ connection_class_mock = mocker.patch('ansible.modules.network.ftd.ftd_file_download.Connection')
+ return connection_class_mock.return_value
+
+ @pytest.mark.parametrize("missing_arg", ['operation', 'destination'])
+ def test_module_should_fail_without_required_args(self, missing_arg):
+ module_args = {'operation': 'downloadFile', 'destination': '/tmp'}
+ del module_args[missing_arg]
+ set_module_args(module_args)
+
+ with pytest.raises(AnsibleFailJson) as ex:
+ self.module.main()
+
+ assert 'missing required arguments: %s' % missing_arg in str(ex)
+
+ def test_module_should_fail_when_no_operation_spec_found(self, connection_mock):
+ connection_mock.get_operation_spec.return_value = None
+ set_module_args({'operation': 'nonExistingDownloadOperation', 'destination': '/tmp'})
+
+ with pytest.raises(AnsibleFailJson) as ex:
+ self.module.main()
+
+ result = ex.value.args[0]
+ assert result['failed']
+ assert result['msg'] == 'Operation with specified name is not found: nonExistingDownloadOperation'
+
+ def test_module_should_fail_when_not_download_operation_specified(self, connection_mock):
+ connection_mock.get_operation_spec.return_value = {
+ OperationField.METHOD: HTTPMethod.GET,
+ OperationField.URL: '/object',
+ OperationField.MODEL_NAME: 'NetworkObject'
+ }
+ set_module_args({'operation': 'nonDownloadOperation', 'destination': '/tmp'})
+
+ with pytest.raises(AnsibleFailJson) as ex:
+ self.module.main()
+
+ result = ex.value.args[0]
+ assert result['failed']
+ assert result['msg'] == 'Invalid download operation: nonDownloadOperation. ' \
+ 'The operation must make GET request and return a file.'
+
+ def test_module_should_call_download_and_return(self, connection_mock):
+ connection_mock.validate_path_params.return_value = (True, None)
+ connection_mock.get_operation_spec.return_value = {
+ OperationField.METHOD: HTTPMethod.GET,
+ OperationField.URL: '/file/{objId}',
+ OperationField.MODEL_NAME: FILE_MODEL_NAME
+ }
+
+ set_module_args({
+ 'operation': 'downloadFile',
+ 'path_params': {'objId': '12'},
+ 'destination': '/tmp'
+ })
+ with pytest.raises(AnsibleExitJson) as ex:
+ self.module.main()
+
+ result = ex.value.args[0]
+ assert not result['changed']
+ connection_mock.download_file.assert_called_once_with('/file/{objId}', '/tmp', {'objId': '12'})
diff --git a/test/units/modules/network/ftd/test_ftd_file_upload.py b/test/units/modules/network/ftd/test_ftd_file_upload.py
new file mode 100644
index 00000000000..f05b6f787cf
--- /dev/null
+++ b/test/units/modules/network/ftd/test_ftd_file_upload.py
@@ -0,0 +1,98 @@
+# Copyright (c) 2018 Cisco and/or its affiliates.
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see .
+#
+
+from __future__ import absolute_import
+
+import pytest
+
+from ansible.module_utils import basic
+from ansible.module_utils.network.ftd.common import HTTPMethod
+from ansible.module_utils.network.ftd.fdm_swagger_client import OperationField
+from ansible.modules.network.ftd import ftd_file_upload
+from units.modules.utils import set_module_args, exit_json, fail_json, AnsibleFailJson, AnsibleExitJson
+
+
+class TestFtdFileUpload(object):
+ module = ftd_file_upload
+
+ @pytest.fixture(autouse=True)
+ def module_mock(self, mocker):
+ return mocker.patch.multiple(basic.AnsibleModule, exit_json=exit_json, fail_json=fail_json)
+
+ @pytest.fixture
+ def connection_mock(self, mocker):
+ connection_class_mock = mocker.patch('ansible.modules.network.ftd.ftd_file_upload.Connection')
+ return connection_class_mock.return_value
+
+ @pytest.mark.parametrize("missing_arg", ['operation', 'fileToUpload'])
+ def test_module_should_fail_without_required_args(self, missing_arg):
+ module_args = {'operation': 'uploadFile', 'fileToUpload': '/tmp/test.txt'}
+ del module_args[missing_arg]
+ set_module_args(module_args)
+
+ with pytest.raises(AnsibleFailJson) as ex:
+ self.module.main()
+
+ assert 'missing required arguments: %s' % missing_arg in str(ex)
+
+ def test_module_should_fail_when_no_operation_spec_found(self, connection_mock):
+ connection_mock.get_operation_spec.return_value = None
+ set_module_args({'operation': 'nonExistingUploadOperation', 'fileToUpload': '/tmp/test.txt'})
+
+ with pytest.raises(AnsibleFailJson) as ex:
+ self.module.main()
+
+ result = ex.value.args[0]
+ assert result['failed']
+ assert result['msg'] == 'Operation with specified name is not found: nonExistingUploadOperation'
+
+ def test_module_should_fail_when_not_upload_operation_specified(self, connection_mock):
+ connection_mock.get_operation_spec.return_value = {
+ OperationField.METHOD: HTTPMethod.GET,
+ OperationField.URL: '/object/network',
+ OperationField.MODEL_NAME: 'NetworkObject'
+ }
+ set_module_args({'operation': 'nonUploadOperation', 'fileToUpload': '/tmp/test.txt'})
+
+ with pytest.raises(AnsibleFailJson) as ex:
+ self.module.main()
+
+ result = ex.value.args[0]
+ assert result['failed']
+ assert result['msg'] == 'Invalid upload operation: nonUploadOperation. ' \
+ 'The operation must make POST request and return UploadStatus model.'
+
+ def test_module_should_call_upload_and_return_response(self, connection_mock):
+ connection_mock.get_operation_spec.return_value = {
+ OperationField.METHOD: HTTPMethod.POST,
+ OperationField.URL: '/uploadFile',
+ OperationField.MODEL_NAME: 'FileUploadStatus'
+ }
+ connection_mock.upload_file.return_value = {'id': '123'}
+
+ set_module_args({
+ 'operation': 'uploadFile',
+ 'fileToUpload': '/tmp/test.txt'
+ })
+ with pytest.raises(AnsibleExitJson) as ex:
+ self.module.main()
+
+ result = ex.value.args[0]
+ assert result['changed']
+ assert {'id': '123'} == result['response']
+ connection_mock.upload_file.assert_called_once_with('/tmp/test.txt', '/uploadFile')
diff --git a/test/units/plugins/httpapi/__init__.py b/test/units/plugins/httpapi/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/test/units/plugins/httpapi/test_ftd.py b/test/units/plugins/httpapi/test_ftd.py
new file mode 100644
index 00000000000..d3585871e79
--- /dev/null
+++ b/test/units/plugins/httpapi/test_ftd.py
@@ -0,0 +1,255 @@
+# Copyright (c) 2018 Cisco and/or its affiliates.
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see .
+#
+
+import json
+import os
+
+from ansible.module_utils.six.moves.urllib.error import HTTPError
+
+from ansible.compat.tests import mock
+from ansible.compat.tests import unittest
+from ansible.compat.tests.mock import mock_open, patch
+from ansible.errors import AnsibleConnectionFailure
+from ansible.module_utils.connection import ConnectionError
+from ansible.module_utils.network.ftd.common import HTTPMethod, ResponseParams
+from ansible.module_utils.network.ftd.fdm_swagger_client import SpecProp, FdmSwaggerParser
+from ansible.module_utils.six import BytesIO, PY3, StringIO
+from ansible.plugins.httpapi.ftd import HttpApi, API_TOKEN_PATH_ENV_VAR
+
+if PY3:
+ BUILTINS_NAME = 'builtins'
+else:
+ BUILTINS_NAME = '__builtin__'
+
+
+class TestFtdHttpApi(unittest.TestCase):
+
+ def setUp(self):
+ self.connection_mock = mock.Mock()
+ self.ftd_plugin = HttpApi(self.connection_mock)
+ self.ftd_plugin.access_token = 'ACCESS_TOKEN'
+
+ def test_login_should_request_tokens_when_no_refresh_token(self):
+ self.connection_mock.send.return_value = self._connection_response(
+ {'access_token': 'ACCESS_TOKEN', 'refresh_token': 'REFRESH_TOKEN'}
+ )
+
+ self.ftd_plugin.login('foo', 'bar')
+
+ assert 'ACCESS_TOKEN' == self.ftd_plugin.access_token
+ assert 'REFRESH_TOKEN' == self.ftd_plugin.refresh_token
+ expected_body = json.dumps({'grant_type': 'password', 'username': 'foo', 'password': 'bar'})
+ self.connection_mock.send.assert_called_once_with(mock.ANY, expected_body, headers=mock.ANY, method=mock.ANY)
+
+ def test_login_should_update_tokens_when_refresh_token_exists(self):
+ self.ftd_plugin.refresh_token = 'REFRESH_TOKEN'
+ self.connection_mock.send.return_value = self._connection_response(
+ {'access_token': 'NEW_ACCESS_TOKEN', 'refresh_token': 'NEW_REFRESH_TOKEN'}
+ )
+
+ self.ftd_plugin.login('foo', 'bar')
+
+ assert 'NEW_ACCESS_TOKEN' == self.ftd_plugin.access_token
+ assert 'NEW_REFRESH_TOKEN' == self.ftd_plugin.refresh_token
+ expected_body = json.dumps({'grant_type': 'refresh_token', 'refresh_token': 'REFRESH_TOKEN'})
+ self.connection_mock.send.assert_called_once_with(mock.ANY, expected_body, headers=mock.ANY, method=mock.ANY)
+
+ @patch.dict(os.environ, {API_TOKEN_PATH_ENV_VAR: '/testLoginUrl'})
+ def test_login_should_use_env_variable_when_set(self):
+ self.connection_mock.send.return_value = self._connection_response(
+ {'access_token': 'ACCESS_TOKEN', 'refresh_token': 'REFRESH_TOKEN'}
+ )
+
+ self.ftd_plugin.login('foo', 'bar')
+
+ self.connection_mock.send.assert_called_once_with('/testLoginUrl', mock.ANY, headers=mock.ANY, method=mock.ANY)
+
+ def test_login_raises_exception_when_no_refresh_token_and_no_credentials(self):
+ with self.assertRaises(AnsibleConnectionFailure) as res:
+ self.ftd_plugin.login(None, None)
+ assert 'Username and password are required' in str(res.exception)
+
+ def test_login_raises_exception_when_invalid_response(self):
+ self.connection_mock.send.return_value = self._connection_response(
+ {'no_access_token': 'ACCESS_TOKEN'}
+ )
+
+ with self.assertRaises(ConnectionError) as res:
+ self.ftd_plugin.login('foo', 'bar')
+
+ assert 'Server returned response without token info during connection authentication' in str(res.exception)
+
+ def test_logout_should_revoke_tokens(self):
+ self.ftd_plugin.access_token = 'ACCESS_TOKEN_TO_REVOKE'
+ self.ftd_plugin.refresh_token = 'REFRESH_TOKEN_TO_REVOKE'
+ self.connection_mock.send.return_value = self._connection_response(None)
+
+ self.ftd_plugin.logout()
+
+ assert self.ftd_plugin.access_token is None
+ assert self.ftd_plugin.refresh_token is None
+ expected_body = json.dumps({'grant_type': 'revoke_token', 'access_token': 'ACCESS_TOKEN_TO_REVOKE',
+ 'token_to_revoke': 'REFRESH_TOKEN_TO_REVOKE'})
+ self.connection_mock.send.assert_called_once_with(mock.ANY, expected_body, headers=mock.ANY, method=mock.ANY)
+
+ def test_send_request_should_send_correct_request(self):
+ exp_resp = {'id': '123', 'name': 'foo'}
+ self.connection_mock.send.return_value = self._connection_response(exp_resp)
+
+ resp = self.ftd_plugin.send_request('/test/{objId}', HTTPMethod.PUT,
+ body_params={'name': 'foo'},
+ path_params={'objId': '123'},
+ query_params={'at': 0})
+
+ assert {ResponseParams.SUCCESS: True, ResponseParams.STATUS_CODE: 200,
+ ResponseParams.RESPONSE: exp_resp} == resp
+ self.connection_mock.send.assert_called_once_with('/test/123?at=0', '{"name": "foo"}', method=HTTPMethod.PUT,
+ headers=self._expected_headers())
+
+ def test_send_request_should_return_empty_dict_when_no_response_data(self):
+ self.connection_mock.send.return_value = self._connection_response(None)
+
+ resp = self.ftd_plugin.send_request('/test', HTTPMethod.GET)
+
+ assert {ResponseParams.SUCCESS: True, ResponseParams.STATUS_CODE: 200, ResponseParams.RESPONSE: {}} == resp
+ self.connection_mock.send.assert_called_once_with('/test', None, method=HTTPMethod.GET,
+ headers=self._expected_headers())
+
+ def test_send_request_should_return_error_info_when_http_error_raises(self):
+ self.connection_mock.send.side_effect = HTTPError('http://testhost.com', 500, '', {},
+ StringIO('{"errorMessage": "ERROR"}'))
+
+ resp = self.ftd_plugin.send_request('/test', HTTPMethod.GET)
+
+ assert {ResponseParams.SUCCESS: False, ResponseParams.STATUS_CODE: 500,
+ ResponseParams.RESPONSE: {'errorMessage': 'ERROR'}} == resp
+
+ def test_send_request_raises_exception_when_invalid_response(self):
+ self.connection_mock.send.return_value = self._connection_response('nonValidJson')
+
+ with self.assertRaises(ConnectionError) as res:
+ self.ftd_plugin.send_request('/test', HTTPMethod.GET)
+
+ assert 'Invalid JSON response' in str(res.exception)
+
+ def test_handle_httperror_should_update_tokens_and_retry_on_auth_errors(self):
+ self.ftd_plugin.refresh_token = 'REFRESH_TOKEN'
+ self.connection_mock.send.return_value = self._connection_response(
+ {'access_token': 'NEW_ACCESS_TOKEN', 'refresh_token': 'NEW_REFRESH_TOKEN'}
+ )
+
+ retry = self.ftd_plugin.handle_httperror(HTTPError('http://testhost.com', 401, '', {}, None))
+
+ assert retry
+ assert 'NEW_ACCESS_TOKEN' == self.ftd_plugin.access_token
+ assert 'NEW_REFRESH_TOKEN' == self.ftd_plugin.refresh_token
+
+ def test_handle_httperror_should_not_retry_on_non_auth_errors(self):
+ assert not self.ftd_plugin.handle_httperror(HTTPError('http://testhost.com', 500, '', {}, None))
+
+ @patch('os.path.isdir', mock.Mock(return_value=False))
+ def test_download_file(self):
+ self.connection_mock.send.return_value = self._connection_response('File content')
+
+ open_mock = mock_open()
+ with patch('%s.open' % BUILTINS_NAME, open_mock):
+ self.ftd_plugin.download_file('/files/1', '/tmp/test.txt')
+
+ open_mock.assert_called_once_with('/tmp/test.txt', 'wb')
+ open_mock().write.assert_called_once_with(b'File content')
+
+ @patch('os.path.isdir', mock.Mock(return_value=True))
+ def test_download_file_should_extract_filename_from_headers(self):
+ filename = 'test_file.txt'
+ response = mock.Mock()
+ response.info.return_value = {'Content-Disposition': 'attachment; filename="%s"' % filename}
+ dummy, response_data = self._connection_response('File content')
+ self.connection_mock.send.return_value = response, response_data
+
+ open_mock = mock_open()
+ with patch('%s.open' % BUILTINS_NAME, open_mock):
+ self.ftd_plugin.download_file('/files/1', '/tmp/')
+
+ open_mock.assert_called_once_with('/tmp/%s' % filename, 'wb')
+ open_mock().write.assert_called_once_with(b'File content')
+
+ @patch('os.path.basename', mock.Mock(return_value='test.txt'))
+ @patch('ansible.plugins.httpapi.ftd.encode_multipart_formdata',
+ mock.Mock(return_value=('--Encoded data--', 'multipart/form-data')))
+ def test_upload_file(self):
+ self.connection_mock.send.return_value = self._connection_response({'id': '123'})
+
+ open_mock = mock_open()
+ with patch('%s.open' % BUILTINS_NAME, open_mock):
+ resp = self.ftd_plugin.upload_file('/tmp/test.txt', '/files')
+
+ assert {'id': '123'} == resp
+ exp_headers = self._expected_headers()
+ exp_headers['Content-Length'] = len('--Encoded data--')
+ exp_headers['Content-Type'] = 'multipart/form-data'
+ self.connection_mock.send.assert_called_once_with('/files', data='--Encoded data--',
+ headers=exp_headers, method=HTTPMethod.POST)
+ open_mock.assert_called_once_with('/tmp/test.txt', 'rb')
+
+ @patch('os.path.basename', mock.Mock(return_value='test.txt'))
+ @patch('ansible.plugins.httpapi.ftd.encode_multipart_formdata',
+ mock.Mock(return_value=('--Encoded data--', 'multipart/form-data')))
+ def test_upload_file_raises_exception_when_invalid_response(self):
+ self.connection_mock.send.return_value = self._connection_response('invalidJsonResponse')
+
+ open_mock = mock_open()
+ with patch('%s.open' % BUILTINS_NAME, open_mock):
+ with self.assertRaises(ConnectionError) as res:
+ self.ftd_plugin.upload_file('/tmp/test.txt', '/files')
+
+ assert 'Invalid JSON response' in str(res.exception)
+
+ @patch.object(FdmSwaggerParser, 'parse_spec')
+ def test_get_operation_spec(self, parse_spec_mock):
+ self.connection_mock.send.return_value = self._connection_response(None)
+ parse_spec_mock.return_value = {
+ SpecProp.OPERATIONS: {'testOp': 'Specification for testOp'}
+ }
+
+ assert 'Specification for testOp' == self.ftd_plugin.get_operation_spec('testOp')
+ assert self.ftd_plugin.get_operation_spec('nonExistingTestOp') is None
+
+ @patch.object(FdmSwaggerParser, 'parse_spec')
+ def test_get_model_spec(self, parse_spec_mock):
+ self.connection_mock.send.return_value = self._connection_response(None)
+ parse_spec_mock.return_value = {
+ SpecProp.MODELS: {'TestModel': 'Specification for TestModel'}
+ }
+
+ assert 'Specification for TestModel' == self.ftd_plugin.get_model_spec('TestModel')
+ assert self.ftd_plugin.get_model_spec('NonExistingTestModel') is None
+
+ @staticmethod
+ def _connection_response(response, status=200):
+ response_mock = mock.Mock()
+ response_mock.getcode.return_value = status
+ response_text = json.dumps(response) if type(response) is dict else response
+ response_data = BytesIO(response_text.encode() if response_text else ''.encode())
+ return response_mock, response_data
+
+ def _expected_headers(self):
+ return {
+ 'Accept': 'application/json',
+ 'Authorization': 'Bearer %s' % self.ftd_plugin.access_token,
+ 'Content-Type': 'application/json'
+ }