diff --git a/lib/ansible/module_utils/network/ftd/common.py b/lib/ansible/module_utils/network/ftd/common.py
index 1125fb36c30..4c24b3fa3d6 100644
--- a/lib/ansible/module_utils/network/ftd/common.py
+++ b/lib/ansible/module_utils/network/ftd/common.py
@@ -18,6 +18,9 @@
import re
+from ansible.module_utils._text import to_text
+from ansible.module_utils.common.collections import is_string
+
INVALID_IDENTIFIER_SYMBOLS = r'[^a-zA-Z0-9_]'
IDENTITY_PROPERTIES = ['id', 'version', 'ruleId']
@@ -38,7 +41,10 @@ class ResponseParams:
class FtdConfigurationError(Exception):
- pass
+ def __init__(self, msg, obj=None):
+ super(FtdConfigurationError, self).__init__(msg)
+ self.msg = msg
+ self.obj = obj
class FtdServerError(Exception):
@@ -48,6 +54,11 @@ class FtdServerError(Exception):
self.code = code
+class FtdUnexpectedResponse(Exception):
+ """The exception to be raised in case of unexpected responses from 3d parties."""
+ pass
+
+
def construct_ansible_facts(response, params):
facts = dict()
if response:
@@ -149,6 +160,11 @@ def equal_values(v1, v2):
:return: True if types and content of passed values are equal. Otherwise, returns False.
:rtype: bool
"""
+
+ # string-like values might have same text but different types, so checking them separately
+ if is_string(v1) and is_string(v2):
+ return to_text(v1) == to_text(v2)
+
if type(v1) != type(v2):
return False
value_type = type(v1)
diff --git a/lib/ansible/module_utils/network/ftd/configuration.py b/lib/ansible/module_utils/network/ftd/configuration.py
index c594a98da8a..aba2615f66e 100644
--- a/lib/ansible/module_utils/network/ftd/configuration.py
+++ b/lib/ansible/module_utils/network/ftd/configuration.py
@@ -15,11 +15,13 @@
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see .
#
-
+import copy
from functools import partial
-from ansible.module_utils.network.ftd.common import HTTPMethod, equal_objects, copy_identity_properties, \
- FtdConfigurationError, FtdServerError, ResponseParams
+from ansible.module_utils.network.ftd.common import HTTPMethod, equal_objects, FtdConfigurationError, \
+ FtdServerError, ResponseParams, copy_identity_properties, FtdUnexpectedResponse
+from ansible.module_utils.network.ftd.fdm_swagger_client import OperationField, ValidationError
+from ansible.module_utils.six import iteritems
DEFAULT_PAGE_SIZE = 10
DEFAULT_OFFSET = 0
@@ -28,86 +30,358 @@ UNPROCESSABLE_ENTITY_STATUS = 422
INVALID_UUID_ERROR_MESSAGE = "Validation failed due to an invalid UUID"
DUPLICATE_NAME_ERROR_MESSAGE = "Validation failed due to a duplicate name"
+MULTIPLE_DUPLICATES_FOUND_ERROR = (
+ "Cannot add a new object. An object(s) with the same attributes exists."
+ "Multiple objects returned according to filters being specified. "
+ "Please specify more specific filters which can find exact object that caused duplication error")
+
+
+class OperationNamePrefix:
+ ADD = 'add'
+ EDIT = 'edit'
+ GET = 'get'
+ DELETE = 'delete'
+ UPSERT = 'upsert'
+
+
+class QueryParams:
+ FILTER = 'filter'
+
+
+class ParamName:
+ QUERY_PARAMS = 'query_params'
+ PATH_PARAMS = 'path_params'
+ DATA = 'data'
+ FILTERS = 'filters'
+
+
+class CheckModeException(Exception):
+ pass
+
+
+class FtdInvalidOperationNameError(Exception):
+ def __init__(self, operation_name):
+ super(FtdInvalidOperationNameError, self).__init__(operation_name)
+ self.operation_name = operation_name
+
+
+class OperationChecker(object):
+
+ @classmethod
+ def is_add_operation(cls, operation_name, operation_spec):
+ """
+ Check if operation defined with 'operation_name' is add object operation according to 'operation_spec'.
+
+ :param operation_name: name of the operation being called by the user
+ :type operation_name: str
+ :param operation_spec: specification of the operation being called by the user
+ :type operation_spec: dict
+ :return: True if the called operation is add object operation, otherwise False
+ :rtype: bool
+ """
+ # Some endpoints have non-CRUD operations, so checking operation name is required in addition to the HTTP method
+ return operation_name.startswith(OperationNamePrefix.ADD) and is_post_request(operation_spec)
+
+ @classmethod
+ def is_edit_operation(cls, operation_name, operation_spec):
+ """
+ Check if operation defined with 'operation_name' is edit object operation according to 'operation_spec'.
+
+ :param operation_name: name of the operation being called by the user
+ :type operation_name: str
+ :param operation_spec: specification of the operation being called by the user
+ :type operation_spec: dict
+ :return: True if the called operation is edit object operation, otherwise False
+ :rtype: bool
+ """
+ # Some endpoints have non-CRUD operations, so checking operation name is required in addition to the HTTP method
+ return operation_name.startswith(OperationNamePrefix.EDIT) and is_put_request(operation_spec)
+
+ @classmethod
+ def is_delete_operation(cls, operation_name, operation_spec):
+ """
+ Check if operation defined with 'operation_name' is delete object operation according to 'operation_spec'.
+
+ :param operation_name: name of the operation being called by the user
+ :type operation_name: str
+ :param operation_spec: specification of the operation being called by the user
+ :type operation_spec: dict
+ :return: True if the called operation is delete object operation, otherwise False
+ :rtype: bool
+ """
+ # Some endpoints have non-CRUD operations, so checking operation name is required in addition to the HTTP method
+ return operation_name.startswith(OperationNamePrefix.DELETE) \
+ and operation_spec[OperationField.METHOD] == HTTPMethod.DELETE
+
+ @classmethod
+ def is_get_list_operation(cls, operation_name, operation_spec):
+ """
+ Check if operation defined with 'operation_name' is get list of objects operation according to 'operation_spec'.
+
+ :param operation_name: name of the operation being called by the user
+ :type operation_name: str
+ :param operation_spec: specification of the operation being called by the user
+ :type operation_spec: dict
+ :return: True if the called operation is get a list of objects operation, otherwise False
+ :rtype: bool
+ """
+ return operation_spec[OperationField.METHOD] == HTTPMethod.GET \
+ and operation_spec[OperationField.RETURN_MULTIPLE_ITEMS]
+
+ @classmethod
+ def is_get_operation(cls, operation_name, operation_spec):
+ """
+ Check if operation defined with 'operation_name' is get objects operation according to 'operation_spec'.
+
+ :param operation_name: name of the operation being called by the user
+ :type operation_name: str
+ :param operation_spec: specification of the operation being called by the user
+ :type operation_spec: dict
+ :return: True if the called operation is get object operation, otherwise False
+ :rtype: bool
+ """
+ return operation_spec[OperationField.METHOD] == HTTPMethod.GET \
+ and not operation_spec[OperationField.RETURN_MULTIPLE_ITEMS]
+
+ @classmethod
+ def is_upsert_operation(cls, operation_name):
+ """
+ Check if operation defined with 'operation_name' is upsert objects operation according to 'operation_name'.
+
+ :param operation_name: name of the operation being called by the user
+ :type operation_name: str
+ :return: True if the called operation is upsert object operation, otherwise False
+ :rtype: bool
+ """
+ return operation_name.startswith(OperationNamePrefix.UPSERT)
+
+ @classmethod
+ def is_find_by_filter_operation(cls, operation_name, params, operation_spec):
+ """
+ Checks whether the called operation is 'find by filter'. This operation fetches all objects and finds
+ the matching ones by the given filter. As filtering is done on the client side, this operation should be used
+ only when selected filters are not implemented on the server side.
+
+ :param operation_name: name of the operation being called by the user
+ :type operation_name: str
+ :param operation_spec: specification of the operation being called by the user
+ :type operation_spec: dict
+ :param params: params - params should contain 'filters'
+ :return: True if the called operation is find by filter, otherwise False
+ :rtype: bool
+ """
+ is_get_list = cls.is_get_list_operation(operation_name, operation_spec)
+ return is_get_list and ParamName.FILTERS in params and params[ParamName.FILTERS]
+
+ @classmethod
+ def is_upsert_operation_supported(cls, operations):
+ """
+ Checks if all operations required for upsert object operation are defined in 'operations'.
+
+ :param operations: specification of the operations supported by model
+ :type operations: dict
+ :return: True if all criteria required to provide requested called operation are satisfied, otherwise False
+ :rtype: bool
+ """
+ amount_operations_need_for_upsert_operation = 3
+ amount_supported_operations = 0
+ for operation_name, operation_spec in operations.items():
+ if cls.is_add_operation(operation_name, operation_spec) \
+ or cls.is_edit_operation(operation_name, operation_spec) \
+ or cls.is_get_list_operation(operation_name, operation_spec):
+ amount_supported_operations += 1
+
+ return amount_supported_operations == amount_operations_need_for_upsert_operation
+
class BaseConfigurationResource(object):
- def __init__(self, conn):
+
+ def __init__(self, conn, check_mode=False):
self._conn = conn
self.config_changed = False
+ self._operation_spec_cache = {}
+ self._models_operations_specs_cache = {}
+ self._check_mode = check_mode
+ self._operation_checker = OperationChecker
- def get_object_by_name(self, url_path, name, path_params=None):
- item_generator = iterate_over_pageable_resource(
- partial(self.send_request, url_path=url_path, http_method=HTTPMethod.GET, path_params=path_params),
- {'filter': 'name:%s' % name}
- )
- # not all endpoints support filtering so checking name explicitly
- return next((item for item in item_generator if item['name'] == name), None)
+ def execute_operation(self, op_name, params):
+ """
+ Allow user request execution of simple operations(natively supported by API provider) as well as complex
+ operations(operations that are implemented as a set of simple operations).
- def get_objects_by_filter(self, url_path, filters, path_params=None, query_params=None):
- def match_filters(obj):
- for k, v in filters.items():
+ :param op_name: name of the operation being called by the user
+ :type op_name: str
+ :param params: definition of the params that operation should be executed with
+ :type params: dict
+ :return: Result of the operation being executed
+ :rtype: dict
+ """
+ if self._operation_checker.is_upsert_operation(op_name):
+ return self.upsert_object(op_name, params)
+ else:
+ return self.crud_operation(op_name, params)
+
+ def crud_operation(self, op_name, params):
+ """
+ Allow user request execution of simple operations(natively supported by API provider) only.
+
+ :param op_name: name of the operation being called by the user
+ :type op_name: str
+ :param params: definition of the params that operation should be executed with
+ :type params: dict
+ :return: Result of the operation being executed
+ :rtype: dict
+ """
+ op_spec = self.get_operation_spec(op_name)
+ if op_spec is None:
+ raise FtdInvalidOperationNameError(op_name)
+
+ if self._operation_checker.is_add_operation(op_name, op_spec):
+ resp = self.add_object(op_name, params)
+ elif self._operation_checker.is_edit_operation(op_name, op_spec):
+ resp = self.edit_object(op_name, params)
+ elif self._operation_checker.is_delete_operation(op_name, op_spec):
+ resp = self.delete_object(op_name, params)
+ elif self._operation_checker.is_find_by_filter_operation(op_name, params, op_spec):
+ resp = list(self.get_objects_by_filter(op_name, params))
+ else:
+ resp = self.send_general_request(op_name, params)
+ return resp
+
+ def get_operation_spec(self, operation_name):
+ if operation_name not in self._operation_spec_cache:
+ self._operation_spec_cache[operation_name] = self._conn.get_operation_spec(operation_name)
+ return self._operation_spec_cache[operation_name]
+
+ def get_operation_specs_by_model_name(self, model_name):
+ if model_name not in self._models_operations_specs_cache:
+ model_op_specs = self._conn.get_operation_specs_by_model_name(model_name)
+ self._models_operations_specs_cache[model_name] = model_op_specs
+ for op_name, op_spec in iteritems(model_op_specs):
+ self._operation_spec_cache.setdefault(op_name, op_spec)
+ return self._models_operations_specs_cache[model_name]
+
+ def get_objects_by_filter(self, operation_name, params):
+ def transform_filters_to_query_param(filter_params):
+ return ';'.join(['%s:%s' % (key, val) for key, val in sorted(iteritems(filter_params))])
+
+ def match_filters(filter_params, obj):
+ for k, v in iteritems(filter_params):
if k not in obj or obj[k] != v:
return False
return True
- item_generator = iterate_over_pageable_resource(
- partial(self.send_request, url_path=url_path, http_method=HTTPMethod.GET, path_params=path_params),
- query_params
- )
- return [i for i in item_generator if match_filters(i)]
+ dummy, query_params, path_params = _get_user_params(params)
+ # copy required params to avoid mutation of passed `params` dict
+ get_list_params = {ParamName.QUERY_PARAMS: dict(query_params), ParamName.PATH_PARAMS: dict(path_params)}
- def add_object(self, url_path, body_params, path_params=None, query_params=None, update_if_exists=False):
+ filters = params.get(ParamName.FILTERS) or {}
+ if filters:
+ get_list_params[ParamName.QUERY_PARAMS][QueryParams.FILTER] = transform_filters_to_query_param(filters)
+
+ item_generator = iterate_over_pageable_resource(
+ partial(self.send_general_request, operation_name=operation_name), get_list_params
+ )
+ return (i for i in item_generator if match_filters(filters, i))
+
+ def add_object(self, operation_name, params):
def is_duplicate_name_error(err):
return err.code == UNPROCESSABLE_ENTITY_STATUS and DUPLICATE_NAME_ERROR_MESSAGE in str(err)
- def update_existing_object(obj):
- new_path_params = {} if path_params is None else path_params
- new_path_params['objId'] = obj['id']
- return self.send_request(url_path=url_path + '/{objId}',
- http_method=HTTPMethod.PUT,
- body_params=copy_identity_properties(obj, body_params),
- path_params=new_path_params,
- query_params=query_params)
-
try:
- return self.send_request(url_path=url_path, http_method=HTTPMethod.POST, body_params=body_params,
- path_params=path_params, query_params=query_params)
+ return self.send_general_request(operation_name, params)
except FtdServerError as e:
if is_duplicate_name_error(e):
- existing_obj = self.get_object_by_name(url_path, body_params['name'], path_params)
- if equal_objects(existing_obj, body_params):
- return existing_obj
- elif update_if_exists:
- return update_existing_object(existing_obj)
- else:
- raise FtdConfigurationError(
- 'Cannot add new object. An object with the same name but different parameters already exists.')
+ return self._check_if_the_same_object(operation_name, params, e)
else:
raise e
- def delete_object(self, url_path, path_params):
+ def _check_if_the_same_object(self, operation_name, params, e):
+ """
+ Special check used in the scope of 'add_object' operation, which can be requested as a standalone operation or
+ in the scope of 'upsert_object' operation. This method executed in case 'add_object' failed and should try to
+ find the object that caused "object duplicate" error. In case single object found and it's equal to one we are
+ trying to create - the existing object will be returned (attempt to have kind of idempotency for add action).
+ In the case when we got more than one object returned as a result of the request to API - it will be hard to
+ find exact duplicate so the exception will be raised.
+ """
+ model_name = self.get_operation_spec(operation_name)[OperationField.MODEL_NAME]
+ get_list_operation = self._find_get_list_operation(model_name)
+ if get_list_operation:
+ data = params[ParamName.DATA]
+ if not params.get(ParamName.FILTERS):
+ params[ParamName.FILTERS] = {'name': data['name']}
+
+ existing_obj = None
+ existing_objs = self.get_objects_by_filter(get_list_operation, params)
+
+ for i, obj in enumerate(existing_objs):
+ if i > 0:
+ raise FtdConfigurationError(MULTIPLE_DUPLICATES_FOUND_ERROR)
+ existing_obj = obj
+
+ if existing_obj is not None:
+ if equal_objects(existing_obj, data):
+ return existing_obj
+ else:
+ raise FtdConfigurationError(
+ 'Cannot add new object. '
+ 'An object with the same name but different parameters already exists.',
+ existing_obj)
+
+ raise e
+
+ def _find_get_list_operation(self, model_name):
+ operations = self.get_operation_specs_by_model_name(model_name) or {}
+ return next((
+ op for op, op_spec in operations.items()
+ if self._operation_checker.is_get_list_operation(op, op_spec)), None)
+
+ def _find_get_operation(self, model_name):
+ operations = self.get_operation_specs_by_model_name(model_name) or {}
+ return next((
+ op for op, op_spec in operations.items()
+ if self._operation_checker.is_get_operation(op, op_spec)), None)
+
+ def delete_object(self, operation_name, params):
def is_invalid_uuid_error(err):
return err.code == UNPROCESSABLE_ENTITY_STATUS and INVALID_UUID_ERROR_MESSAGE in str(err)
try:
- return self.send_request(url_path=url_path, http_method=HTTPMethod.DELETE, path_params=path_params)
+ return self.send_general_request(operation_name, params)
except FtdServerError as e:
if is_invalid_uuid_error(e):
return {'status': 'Referenced object does not exist'}
else:
raise e
- def edit_object(self, url_path, body_params, path_params=None, query_params=None):
- existing_object = self.send_request(url_path=url_path, http_method=HTTPMethod.GET, path_params=path_params)
+ def edit_object(self, operation_name, params):
+ data, dummy, path_params = _get_user_params(params)
- if not existing_object:
- raise FtdConfigurationError('Referenced object does not exist')
- elif equal_objects(existing_object, body_params):
- return existing_object
- else:
- return self.send_request(url_path=url_path, http_method=HTTPMethod.PUT, body_params=body_params,
- path_params=path_params, query_params=query_params)
+ model_name = self.get_operation_spec(operation_name)[OperationField.MODEL_NAME]
+ get_operation = self._find_get_operation(model_name)
- def send_request(self, url_path, http_method, body_params=None, path_params=None, query_params=None):
+ if get_operation:
+ existing_object = self.send_general_request(get_operation, {ParamName.PATH_PARAMS: path_params})
+ if not existing_object:
+ raise FtdConfigurationError('Referenced object does not exist')
+ elif equal_objects(existing_object, data):
+ return existing_object
+
+ return self.send_general_request(operation_name, params)
+
+ def send_general_request(self, operation_name, params):
+ self.validate_params(operation_name, params)
+ if self._check_mode:
+ raise CheckModeException()
+
+ data, query_params, path_params = _get_user_params(params)
+ op_spec = self.get_operation_spec(operation_name)
+ url, method = op_spec[OperationField.URL], op_spec[OperationField.METHOD]
+
+ return self._send_request(url, method, data, path_params, query_params)
+
+ def _send_request(self, url_path, http_method, body_params=None, path_params=None, query_params=None):
def raise_for_failure(resp):
if not resp[ResponseParams.SUCCESS]:
raise FtdServerError(resp[ResponseParams.RESPONSE], resp[ResponseParams.STATUS_CODE])
@@ -119,28 +393,152 @@ class BaseConfigurationResource(object):
self.config_changed = True
return response[ResponseParams.RESPONSE]
+ def validate_params(self, operation_name, params):
+ report = {}
+ op_spec = self.get_operation_spec(operation_name)
+ data, query_params, path_params = _get_user_params(params)
-def iterate_over_pageable_resource(resource_func, query_params=None):
+ def validate(validation_method, field_name, user_params):
+ key = 'Invalid %s provided' % field_name
+ try:
+ is_valid, validation_report = validation_method(operation_name, user_params)
+ if not is_valid:
+ report[key] = validation_report
+ except Exception as e:
+ report[key] = str(e)
+ return report
+
+ validate(self._conn.validate_query_params, ParamName.QUERY_PARAMS, query_params)
+ validate(self._conn.validate_path_params, ParamName.PATH_PARAMS, path_params)
+ if is_post_request(op_spec) or is_put_request(op_spec):
+ validate(self._conn.validate_data, ParamName.DATA, data)
+
+ if report:
+ raise ValidationError(report)
+
+ def is_upsert_operation_supported(self, op_name):
+ """
+ Checks if all operations required for upsert object operation are defined in 'operations'.
+
+ :param op_name: upsert operation name
+ :type op_name: str
+ :return: True if all criteria required to provide requested called operation are satisfied, otherwise False
+ :rtype: bool
+ """
+ model_name = _extract_model_from_upsert_operation(op_name)
+ operations = self.get_operation_specs_by_model_name(model_name)
+ return self._operation_checker.is_upsert_operation_supported(operations)
+
+ @staticmethod
+ def _get_operation_name(checker, operations):
+ for operation_name, op_spec in operations.items():
+ if checker(operation_name, op_spec):
+ return operation_name
+ raise FtdConfigurationError("Operation is not supported")
+
+ def _add_upserted_object(self, model_operations, params):
+ add_op_name = self._get_operation_name(self._operation_checker.is_add_operation, model_operations)
+ return self.add_object(add_op_name, params)
+
+ def _edit_upserted_object(self, model_operations, existing_object, params):
+ edit_op_name = self._get_operation_name(self._operation_checker.is_edit_operation, model_operations)
+ _set_default(params, 'path_params', {})
+ _set_default(params, 'data', {})
+
+ params['path_params']['objId'] = existing_object['id']
+ copy_identity_properties(existing_object, params['data'])
+ return self.edit_object(edit_op_name, params)
+
+ def upsert_object(self, op_name, params):
+ """
+ The wrapper on top of add object operation, get a list of objects and edit object operations that implement
+ upsert object operation. As a result, the object will be created if the object does not exist, if a single
+ object exists with requested 'params' this object will be updated otherwise, Exception will be raised.
+
+ :param op_name: upsert operation name
+ :type op_name: str
+ :param params: params that upsert operation should be executed with
+ :type params: dict
+ :return: upserted object representation
+ :rtype: dict
+ """
+ if not self.is_upsert_operation_supported(op_name):
+ raise FtdInvalidOperationNameError(op_name)
+
+ model_name = _extract_model_from_upsert_operation(op_name)
+ model_operations = self.get_operation_specs_by_model_name(model_name)
+
+ try:
+ return self._add_upserted_object(model_operations, params)
+ except FtdConfigurationError as e:
+ if e.obj:
+ return self._edit_upserted_object(model_operations, e.obj, params)
+ raise e
+
+
+def _set_default(params, field_name, value):
+ if field_name not in params or params[field_name] is None:
+ params[field_name] = value
+
+
+def is_post_request(operation_spec):
+ return operation_spec[OperationField.METHOD] == HTTPMethod.POST
+
+
+def is_put_request(operation_spec):
+ return operation_spec[OperationField.METHOD] == HTTPMethod.PUT
+
+
+def _extract_model_from_upsert_operation(op_name):
+ return op_name[len(OperationNamePrefix.UPSERT):]
+
+
+def _get_user_params(params):
+ return params.get(ParamName.DATA) or {}, params.get(ParamName.QUERY_PARAMS) or {}, params.get(
+ ParamName.PATH_PARAMS) or {}
+
+
+def iterate_over_pageable_resource(resource_func, params):
"""
A generator function that iterates over a resource that supports pagination and lazily returns present items
one by one.
- :param resource_func: function that receives `query_params` named argument and returns a page of objects
+ :param resource_func: function that receives `params` argument and returns a page of objects
:type resource_func: callable
- :param query_params: initial dictionary of query parameters that will be passed to the resource_func
- :type query_params: dict
+ :param params: initial dictionary of parameters that will be passed to the resource_func.
+ Should contain `query_params` inside.
+ :type params: dict
:return: an iterator containing returned items
:rtype: iterator of dict
"""
- query_params = {} if query_params is None else dict(query_params)
- query_params.setdefault('limit', DEFAULT_PAGE_SIZE)
- query_params.setdefault('offset', DEFAULT_OFFSET)
+ # creating a copy not to mutate passed dict
+ params = copy.deepcopy(params)
+ params[ParamName.QUERY_PARAMS].setdefault('limit', DEFAULT_PAGE_SIZE)
+ params[ParamName.QUERY_PARAMS].setdefault('offset', DEFAULT_OFFSET)
+ limit = int(params[ParamName.QUERY_PARAMS]['limit'])
+
+ def received_less_items_than_requested(items_in_response, items_expected):
+ if items_in_response == items_expected:
+ return False
+ elif items_in_response < items_expected:
+ return True
+
+ raise FtdUnexpectedResponse(
+ "Get List of Objects Response from the server contains more objects than requested. "
+ "There are {0} item(s) in the response while {1} was(ere) requested".format(items_in_response,
+ items_expected)
+ )
+
+ while True:
+ result = resource_func(params=params)
- result = resource_func(query_params=query_params)
- while result['items']:
for item in result['items']:
yield item
+
+ if received_less_items_than_requested(len(result['items']), limit):
+ break
+
# creating a copy not to mutate existing dict
- query_params = dict(query_params)
- query_params['offset'] = int(query_params['offset']) + int(query_params['limit'])
- result = resource_func(query_params=query_params)
+ params = copy.deepcopy(params)
+ query_params = params[ParamName.QUERY_PARAMS]
+ query_params['offset'] = int(query_params['offset']) + limit
diff --git a/lib/ansible/module_utils/network/ftd/fdm_swagger_client.py b/lib/ansible/module_utils/network/ftd/fdm_swagger_client.py
index 5d8becb077d..b826ff83488 100644
--- a/lib/ansible/module_utils/network/ftd/fdm_swagger_client.py
+++ b/lib/ansible/module_utils/network/ftd/fdm_swagger_client.py
@@ -17,10 +17,11 @@
#
from ansible.module_utils.network.ftd.common import HTTPMethod
-from ansible.module_utils.six import integer_types, string_types
+from ansible.module_utils.six import integer_types, string_types, iteritems
FILE_MODEL_NAME = '_File'
SUCCESS_RESPONSE_CODE = '200'
+DELETE_PREFIX = 'delete'
class OperationField:
@@ -28,12 +29,15 @@ class OperationField:
METHOD = 'method'
PARAMETERS = 'parameters'
MODEL_NAME = 'modelName'
+ DESCRIPTION = 'description'
+ RETURN_MULTIPLE_ITEMS = 'returnMultipleItems'
class SpecProp:
DEFINITIONS = 'definitions'
OPERATIONS = 'operations'
MODELS = 'models'
+ MODEL_OPERATIONS = 'model_operations'
class PropName:
@@ -51,6 +55,7 @@ class PropName:
PROPERTIES = 'properties'
RESPONSES = 'responses'
NAME = 'name'
+ DESCRIPTION = 'description'
class PropType:
@@ -68,6 +73,10 @@ class OperationParams:
QUERY = 'query'
+class QueryParams:
+ FILTER = 'filter'
+
+
def _get_model_name_from_url(schema_ref):
path = schema_ref.split('/')
return path[len(path) - 1]
@@ -89,13 +98,18 @@ class ValidationError(ValueError):
class FdmSwaggerParser:
_definitions = None
+ _base_path = None
- def parse_spec(self, spec):
+ def parse_spec(self, spec, docs=None):
"""
- This method simplifies a swagger format and also resolves a model name for each operation
- :param spec: dict
- expect data in the swagger format see
- :rtype: (bool, string|dict)
+ This method simplifies a swagger format, resolves a model name for each operation, and adds documentation for
+ each operation and model if it is provided.
+
+ :param spec: An API specification in the swagger format, see
+ :type spec: dict
+ :param spec: A documentation map containing descriptions for models, operations and operation parameters.
+ :type docs: dict
+ :rtype: dict
:return:
Ex.
The models field contains model definition from swagger see <#https://github.com/OAI/OpenAPI-Specification/blob/master/versions/2.0.md#definitions>
@@ -111,6 +125,7 @@ class FdmSwaggerParser:
'modelName': 'NetworkObject', # it is a link to the model from 'models'
# None - for a delete operation or we don't have information
# '_File' - if an endpoint works with files
+ 'returnMultipleItems': False, # shows if the operation returns a single item or an item list
'parameters': {
'path':{
'param_name':{
@@ -129,26 +144,49 @@ class FdmSwaggerParser:
}
},
...
+ },
+ 'model_operations':{
+ 'model_name':{ # a list of operations available for the current model
+ 'operation_name':{
+ ... # the same as in the operations section
+ },
+ ...
+ },
+ ...
}
}
"""
self._definitions = spec[SpecProp.DEFINITIONS]
- config = {
+ self._base_path = spec[PropName.BASE_PATH]
+ operations = self._get_operations(spec)
+
+ if docs:
+ operations = self._enrich_operations_with_docs(operations, docs)
+ self._definitions = self._enrich_definitions_with_docs(self._definitions, docs)
+
+ return {
SpecProp.MODELS: self._definitions,
- SpecProp.OPERATIONS: self._get_operations(spec)
+ SpecProp.OPERATIONS: operations,
+ SpecProp.MODEL_OPERATIONS: self._get_model_operations(operations)
}
- return config
+
+ def _get_model_operations(self, operations):
+ model_operations = {}
+ for operations_name, params in iteritems(operations):
+ model_name = params[OperationField.MODEL_NAME]
+ model_operations.setdefault(model_name, {})[operations_name] = params
+ return model_operations
def _get_operations(self, spec):
- base_path = spec[PropName.BASE_PATH]
paths_dict = spec[PropName.PATHS]
operations_dict = {}
- for url, operation_params in paths_dict.items():
- for method, params in operation_params.items():
+ for url, operation_params in iteritems(paths_dict):
+ for method, params in iteritems(operation_params):
operation = {
OperationField.METHOD: method,
- OperationField.URL: base_path + url,
- OperationField.MODEL_NAME: self._get_model_name(method, params)
+ OperationField.URL: self._base_path + url,
+ OperationField.MODEL_NAME: self._get_model_name(method, params),
+ OperationField.RETURN_MULTIPLE_ITEMS: self._return_multiple_items(params)
}
if OperationField.PARAMETERS in params:
operation[OperationField.PARAMETERS] = self._get_rest_params(params[OperationField.PARAMETERS])
@@ -157,14 +195,68 @@ class FdmSwaggerParser:
operations_dict[operation_id] = operation
return operations_dict
+ def _enrich_operations_with_docs(self, operations, docs):
+ def get_operation_docs(op):
+ op_url = op[OperationField.URL][len(self._base_path):]
+ return docs[PropName.PATHS].get(op_url, {}).get(op[OperationField.METHOD], {})
+
+ for operation in operations.values():
+ operation_docs = get_operation_docs(operation)
+ operation[OperationField.DESCRIPTION] = operation_docs.get(PropName.DESCRIPTION, '')
+
+ if OperationField.PARAMETERS in operation:
+ param_descriptions = dict((p[PropName.NAME], p[PropName.DESCRIPTION])
+ for p in operation_docs.get(OperationField.PARAMETERS, {}))
+
+ for param_name, params_spec in operation[OperationField.PARAMETERS][OperationParams.PATH].items():
+ params_spec[OperationField.DESCRIPTION] = param_descriptions.get(param_name, '')
+
+ for param_name, params_spec in operation[OperationField.PARAMETERS][OperationParams.QUERY].items():
+ params_spec[OperationField.DESCRIPTION] = param_descriptions.get(param_name, '')
+
+ return operations
+
+ def _enrich_definitions_with_docs(self, definitions, docs):
+ for model_name, model_def in definitions.items():
+ model_docs = docs[SpecProp.DEFINITIONS].get(model_name, {})
+ model_def[PropName.DESCRIPTION] = model_docs.get(PropName.DESCRIPTION, '')
+ for prop_name, prop_spec in model_def.get(PropName.PROPERTIES, {}).items():
+ prop_spec[PropName.DESCRIPTION] = model_docs.get(PropName.PROPERTIES, {}).get(prop_name, '')
+ prop_spec[PropName.REQUIRED] = prop_name in model_def.get(PropName.REQUIRED, [])
+ return definitions
+
def _get_model_name(self, method, params):
if method == HTTPMethod.GET:
return self._get_model_name_from_responses(params)
elif method == HTTPMethod.POST or method == HTTPMethod.PUT:
return self._get_model_name_for_post_put_requests(params)
+ elif method == HTTPMethod.DELETE:
+ return self._get_model_name_from_delete_operation(params)
else:
return None
+ @staticmethod
+ def _return_multiple_items(op_params):
+ """
+ Defines if the operation returns one item or a list of items.
+
+ :param op_params: operation specification
+ :return: True if the operation returns a list of items, otherwise False
+ """
+ try:
+ schema = op_params[PropName.RESPONSES][SUCCESS_RESPONSE_CODE][PropName.SCHEMA]
+ return PropName.ITEMS in schema[PropName.PROPERTIES]
+ except KeyError:
+ return False
+
+ def _get_model_name_from_delete_operation(self, params):
+ operation_id = params[PropName.OPERATION_ID]
+ if operation_id.startswith(DELETE_PREFIX):
+ model_name = operation_id[len(DELETE_PREFIX):]
+ if model_name in self._definitions:
+ return model_name
+ return None
+
def _get_model_name_for_post_put_requests(self, params):
model_name = None
if OperationField.PARAMETERS in params:
@@ -429,7 +521,8 @@ class FdmSwaggerValidator:
self._add_invalid_type_report(status, path, '', PropType.OBJECT, data)
return None
- self._check_required_fields(status, model[PropName.REQUIRED], data, path)
+ if PropName.REQUIRED in model:
+ self._check_required_fields(status, model[PropName.REQUIRED], data, path)
model_properties = model[PropName.PROPERTIES]
for prop in model_properties.keys():
@@ -472,14 +565,25 @@ class FdmSwaggerValidator:
@staticmethod
def _is_correct_simple_types(expected_type, value):
+ def is_numeric_string(s):
+ try:
+ float(s)
+ return True
+ except ValueError:
+ return False
+
if expected_type == PropType.STRING:
return isinstance(value, string_types)
elif expected_type == PropType.BOOLEAN:
return isinstance(value, bool)
elif expected_type == PropType.INTEGER:
- return isinstance(value, integer_types) and not isinstance(value, bool)
+ is_integer = isinstance(value, integer_types) and not isinstance(value, bool)
+ is_digit_string = isinstance(value, string_types) and value.isdigit()
+ return is_integer or is_digit_string
elif expected_type == PropType.NUMBER:
- return isinstance(value, (integer_types, float)) and not isinstance(value, bool)
+ is_number = isinstance(value, (integer_types, float)) and not isinstance(value, bool)
+ is_numeric_string = isinstance(value, string_types) and is_numeric_string(value)
+ return is_number or is_numeric_string
return False
@staticmethod
diff --git a/lib/ansible/modules/network/ftd/ftd_configuration.py b/lib/ansible/modules/network/ftd/ftd_configuration.py
index 464fabdbec3..d763c71e5f4 100644
--- a/lib/ansible/modules/network/ftd/ftd_configuration.py
+++ b/lib/ansible/modules/network/ftd/ftd_configuration.py
@@ -19,8 +19,8 @@
#
from __future__ import absolute_import, division, print_function
-__metaclass__ = type
+__metaclass__ = type
ANSIBLE_METADATA = {'metadata_version': '1.1',
'status': ['preview'],
@@ -38,25 +38,31 @@ author: "Cisco Systems, Inc."
options:
operation:
description:
- - The name of the operation to execute. Commonly, the operation starts with 'add', 'edit', 'get'
+ - The name of the operation to execute. Commonly, the operation starts with 'add', 'edit', 'get', 'upsert'
or 'delete' verbs, but can have an arbitrary name too.
required: true
+ type: str
data:
description:
- Key-value pairs that should be sent as body parameters in a REST API call
+ type: dict
query_params:
description:
- Key-value pairs that should be sent as query parameters in a REST API call.
+ type: dict
path_params:
description:
- Key-value pairs that should be sent as path parameters in a REST API call.
+ type: dict
register_as:
description:
- Specifies Ansible fact name that is used to register received response from the FTD device.
+ type: str
filters:
description:
- Key-value dict that represents equality filters. Every key is a property name and value is its desired value.
If multiple filters are present, they are combined with logical operator AND.
+ type: dict
"""
EXAMPLES = """
@@ -88,74 +94,11 @@ response:
"""
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.connection import Connection
-from ansible.module_utils.network.ftd.common import HTTPMethod, construct_ansible_facts, FtdConfigurationError, \
- FtdServerError
-from ansible.module_utils.network.ftd.configuration import BaseConfigurationResource
-from ansible.module_utils.network.ftd.fdm_swagger_client import OperationField, ValidationError
-
-
-def is_post_request(operation_spec):
- return operation_spec[OperationField.METHOD] == HTTPMethod.POST
-
-
-def is_put_request(operation_spec):
- return operation_spec[OperationField.METHOD] == HTTPMethod.PUT
-
-
-def is_add_operation(operation_name, operation_spec):
- # Some endpoints have non-CRUD operations, so checking operation name is required in addition to the HTTP method
- return operation_name.startswith('add') and is_post_request(operation_spec)
-
-
-def is_edit_operation(operation_name, operation_spec):
- # Some endpoints have non-CRUD operations, so checking operation name is required in addition to the HTTP method
- return operation_name.startswith('edit') and is_put_request(operation_spec)
-
-
-def is_delete_operation(operation_name, operation_spec):
- # Some endpoints have non-CRUD operations, so checking operation name is required in addition to the HTTP method
- return operation_name.startswith('delete') and operation_spec[OperationField.METHOD] == HTTPMethod.DELETE
-
-
-def validate_params(connection, op_name, query_params, path_params, data, op_spec):
- report = {}
-
- def validate(validation_method, field_name, params):
- key = 'Invalid %s provided' % field_name
- try:
- is_valid, validation_report = validation_method(op_name, params)
- if not is_valid:
- report[key] = validation_report
- except Exception as e:
- report[key] = str(e)
- return report
-
- validate(connection.validate_query_params, 'query_params', query_params)
- validate(connection.validate_path_params, 'path_params', path_params)
- if is_post_request(op_spec) or is_post_request(op_spec):
- validate(connection.validate_data, 'data', data)
-
- if report:
- raise ValidationError(report)
-
-
-def is_find_by_filter_operation(operation_name, operation_spec, params):
- """
- Checks whether the called operation is 'find by filter'. This operation fetches all objects and finds
- the matching ones by the given filter. As filtering is done on the client side, this operation should be used
- only when selected filters are not implemented on the server side.
-
- :param operation_name: name of the operation being called by the user
- :type operation_name: str
- :param operation_spec: specification of the operation being called by the user
- :type operation_spec: dict
- :param params: module parameters
- :return: True if called operation is find by filter, otherwise False
- :rtype: bool
- """
- is_get_list_operation = operation_name.startswith('get') and operation_name.endswith('List')
- is_get_method = operation_spec[OperationField.METHOD] == HTTPMethod.GET
- return is_get_list_operation and is_get_method and params['filters']
+from ansible.module_utils.network.ftd.configuration import BaseConfigurationResource, CheckModeException, \
+ FtdInvalidOperationNameError
+from ansible.module_utils.network.ftd.fdm_swagger_client import ValidationError
+from ansible.module_utils.network.ftd.common import construct_ansible_facts, FtdConfigurationError, \
+ FtdServerError, FtdUnexpectedResponse
def main():
@@ -172,47 +115,25 @@ def main():
params = module.params
connection = Connection(module._socket_path)
-
+ resource = BaseConfigurationResource(connection, module.check_mode)
op_name = params['operation']
- op_spec = connection.get_operation_spec(op_name)
- if op_spec is None:
- module.fail_json(msg='Invalid operation name provided: %s' % op_name)
-
- data, query_params, path_params = params['data'], params['query_params'], params['path_params']
-
try:
- validate_params(connection, op_name, query_params, path_params, data, op_spec)
- except ValidationError as e:
- module.fail_json(msg=e.args[0])
-
- try:
- if module.check_mode:
- module.exit_json(changed=False)
-
- resource = BaseConfigurationResource(connection)
- url = op_spec[OperationField.URL]
-
- if is_add_operation(op_name, op_spec):
- resp = resource.add_object(url, data, path_params, query_params)
- elif is_edit_operation(op_name, op_spec):
- resp = resource.edit_object(url, data, path_params, query_params)
- elif is_delete_operation(op_name, op_spec):
- resp = resource.delete_object(url, path_params)
- elif is_find_by_filter_operation(op_name, op_spec, params):
- resp = resource.get_objects_by_filter(url, params['filters'], path_params,
- query_params)
- else:
- resp = resource.send_request(url, op_spec[OperationField.METHOD], data,
- path_params,
- query_params)
-
+ resp = resource.execute_operation(op_name, params)
module.exit_json(changed=resource.config_changed, response=resp,
ansible_facts=construct_ansible_facts(resp, module.params))
+ except FtdInvalidOperationNameError as e:
+ module.fail_json(msg='Invalid operation name provided: %s' % e.operation_name)
except FtdConfigurationError as e:
- module.fail_json(msg='Failed to execute %s operation because of the configuration error: %s' % (op_name, e))
+ module.fail_json(msg='Failed to execute %s operation because of the configuration error: %s' % (op_name, e.msg))
except FtdServerError as e:
module.fail_json(msg='Server returned an error trying to execute %s operation. Status code: %s. '
'Server response: %s' % (op_name, e.code, e.response))
+ except FtdUnexpectedResponse as e:
+ module.fail_json(msg=e.args[0])
+ except ValidationError as e:
+ module.fail_json(msg=e.args[0])
+ except CheckModeException:
+ module.exit_json(changed=False)
if __name__ == '__main__':
diff --git a/lib/ansible/modules/network/ftd/ftd_file_download.py b/lib/ansible/modules/network/ftd/ftd_file_download.py
index aafccb2b0de..ca0b2a0467f 100644
--- a/lib/ansible/modules/network/ftd/ftd_file_download.py
+++ b/lib/ansible/modules/network/ftd/ftd_file_download.py
@@ -41,14 +41,17 @@ options:
- The name of the operation to execute.
- Only operations that return a file can be used in this module.
required: true
+ type: str
path_params:
description:
- Key-value pairs that should be sent as path parameters in a REST API call.
+ type: dict
destination:
description:
- Absolute path of where to download the file to.
- If destination is a directory, the module uses a filename from 'Content-Disposition' header specified by the server.
required: true
+ type: path
"""
EXAMPLES = """
@@ -62,7 +65,7 @@ EXAMPLES = """
RETURN = """
msg:
- description: the error message describing why the module failed
+ description: The error message describing why the module failed.
returned: error
type: string
"""
diff --git a/lib/ansible/modules/network/ftd/ftd_file_upload.py b/lib/ansible/modules/network/ftd/ftd_file_upload.py
index a1187dc12d0..90cb7a749d3 100644
--- a/lib/ansible/modules/network/ftd/ftd_file_upload.py
+++ b/lib/ansible/modules/network/ftd/ftd_file_upload.py
@@ -40,25 +40,29 @@ options:
- The name of the operation to execute.
- Only operations that upload file can be used in this module.
required: true
- fileToUpload:
+ type: str
+ file_to_upload:
description:
- Absolute path to the file that should be uploaded.
required: true
+ type: path
+ version_added: "2.8"
register_as:
description:
- Specifies Ansible fact name that is used to register received response from the FTD device.
+ type: str
"""
EXAMPLES = """
- name: Upload disk file
ftd_file_upload:
operation: 'postuploaddiskfile'
- fileToUpload: /tmp/test1.txt
+ file_to_upload: /tmp/test1.txt
"""
RETURN = """
msg:
- description: the error message describing why the module failed
+ description: The error message describing why the module failed.
returned: error
type: string
"""
@@ -75,7 +79,7 @@ def is_upload_operation(op_spec):
def main():
fields = dict(
operation=dict(type='str', required=True),
- fileToUpload=dict(type='path', required=True),
+ file_to_upload=dict(type='path', required=True),
register_as=dict(type='str'),
)
module = AnsibleModule(argument_spec=fields,
@@ -94,7 +98,7 @@ def main():
try:
if module.check_mode:
module.exit_json()
- resp = connection.upload_file(params['fileToUpload'], op_spec[OperationField.URL])
+ resp = connection.upload_file(params['file_to_upload'], op_spec[OperationField.URL])
module.exit_json(changed=True, response=resp, ansible_facts=construct_ansible_facts(resp, module.params))
except FtdServerError as e:
module.fail_json(msg='Upload request for %s operation failed. Status code: %s. '
diff --git a/lib/ansible/plugins/httpapi/ftd.py b/lib/ansible/plugins/httpapi/ftd.py
index 33366232ef9..f154b351312 100644
--- a/lib/ansible/plugins/httpapi/ftd.py
+++ b/lib/ansible/plugins/httpapi/ftd.py
@@ -37,7 +37,6 @@ options:
default: '/api/fdm/v2/fdm/token'
vars:
- name: ansible_httpapi_ftd_token_path
-
spec_path:
type: str
description:
@@ -70,6 +69,13 @@ BASE_HEADERS = {
TOKEN_EXPIRATION_STATUS_CODE = 408
UNAUTHORIZED_STATUS_CODE = 401
+try:
+ from __main__ import display
+except ImportError:
+ from ansible.utils.display import Display
+
+ display = Display()
+
class HttpApi(HttpApiBase):
def __init__(self, connection):
@@ -79,6 +85,7 @@ class HttpApi(HttpApiBase):
self.refresh_token = None
self._api_spec = None
self._api_validator = None
+ self._ignore_http_errors = False
def login(self, username, password):
def request_token_payload(username, password):
@@ -101,10 +108,15 @@ class HttpApi(HttpApiBase):
else:
raise AnsibleConnectionFailure('Username and password are required for login in absence of refresh token')
- dummy, response_data = self.connection.send(
- self._get_api_token_path(), json.dumps(payload), method=HTTPMethod.POST, headers=BASE_HEADERS
+ url = self._get_api_token_path()
+ self._display(HTTPMethod.POST, 'login', url)
+
+ response, response_data = self._send_auth_request(
+ url, json.dumps(payload), method=HTTPMethod.POST, headers=BASE_HEADERS
)
- response = self._response_to_json(response_data.getvalue())
+ self._display(HTTPMethod.POST, 'login:status_code', response.getcode())
+
+ response = self._response_to_json(self._get_response_value(response_data))
try:
self.refresh_token = response['refresh_token']
@@ -120,13 +132,29 @@ class HttpApi(HttpApiBase):
'access_token': self.access_token,
'token_to_revoke': self.refresh_token
}
- self.connection.send(
- self._get_api_token_path(), json.dumps(auth_payload), method=HTTPMethod.POST,
- headers=BASE_HEADERS
- )
+
+ url = self._get_api_token_path()
+
+ self._display(HTTPMethod.POST, 'logout', url)
+ response, dummy = self._send_auth_request(url, json.dumps(auth_payload), method=HTTPMethod.POST,
+ headers=BASE_HEADERS)
+ self._display(HTTPMethod.POST, 'logout:status_code', response.getcode())
+
self.refresh_token = None
self.access_token = None
+ def _send_auth_request(self, path, data, **kwargs):
+ try:
+ self._ignore_http_errors = True
+ return self.connection.send(path, data, **kwargs)
+ except HTTPError as e:
+ # HttpApi connection does not read the error response from HTTPError, so we do it here and wrap it up in
+ # ConnectionError, so the actual error message is displayed to the user.
+ error_msg = self._response_to_json(to_text(e.read()))
+ raise ConnectionError('Server returned an error during authentication request: %s' % error_msg)
+ finally:
+ self._ignore_http_errors = False
+
def update_auth(self, response, response_data):
# With tokens, authentication should not be checked and updated on each request
return None
@@ -135,23 +163,34 @@ class HttpApi(HttpApiBase):
url = construct_url_path(url_path, path_params, query_params)
data = json.dumps(body_params) if body_params else None
try:
+ self._display(http_method, 'url', url)
+ if data:
+ self._display(http_method, 'data', data)
+
response, response_data = self.connection.send(url, data, method=http_method, headers=BASE_HEADERS)
+
+ value = self._get_response_value(response_data)
+ self._display(http_method, 'response', value)
+
return {
ResponseParams.SUCCESS: True,
ResponseParams.STATUS_CODE: response.getcode(),
- ResponseParams.RESPONSE: self._response_to_json(response_data.getvalue())
+ ResponseParams.RESPONSE: self._response_to_json(value)
}
# Being invoked via JSON-RPC, this method does not serialize and pass HTTPError correctly to the method caller.
# Thus, in order to handle non-200 responses, we need to wrap them into a simple structure and pass explicitly.
except HTTPError as e:
+ error_msg = to_text(e.read())
+ self._display(http_method, 'error', error_msg)
return {
ResponseParams.SUCCESS: False,
ResponseParams.STATUS_CODE: e.code,
- ResponseParams.RESPONSE: self._response_to_json(e.read())
+ ResponseParams.RESPONSE: self._response_to_json(error_msg)
}
def upload_file(self, from_path, to_url):
url = construct_url_path(to_url)
+ self._display(HTTPMethod.POST, 'upload', url)
with open(from_path, 'rb') as src_file:
rf = RequestField('fileToUpload', src_file.read(), os.path.basename(src_file.name))
rf.make_multipart()
@@ -162,10 +201,13 @@ class HttpApi(HttpApiBase):
headers['Content-Length'] = len(body)
dummy, response_data = self.connection.send(url, data=body, method=HTTPMethod.POST, headers=headers)
- return self._response_to_json(response_data.getvalue())
+ value = self._get_response_value(response_data)
+ self._display(HTTPMethod.POST, 'upload:response', value)
+ return self._response_to_json(value)
def download_file(self, from_url, to_path, path_params=None):
url = construct_url_path(from_url, path_params=path_params)
+ self._display(HTTPMethod.GET, 'download', url)
response, response_data = self.connection.send(url, data=None, method=HTTPMethod.GET, headers=BASE_HEADERS)
if os.path.isdir(to_path):
@@ -174,15 +216,24 @@ class HttpApi(HttpApiBase):
with open(to_path, "wb") as output_file:
output_file.write(response_data.getvalue())
+ self._display(HTTPMethod.GET, 'downloaded', to_path)
def handle_httperror(self, exc):
- if exc.code == TOKEN_EXPIRATION_STATUS_CODE or exc.code == UNAUTHORIZED_STATUS_CODE:
+ is_auth_related_code = exc.code == TOKEN_EXPIRATION_STATUS_CODE or exc.code == UNAUTHORIZED_STATUS_CODE
+ if not self._ignore_http_errors and is_auth_related_code:
self.connection._auth = None
self.login(self.connection.get_option('remote_user'), self.connection.get_option('password'))
return True
# None means that the exception will be passed further to the caller
return None
+ def _display(self, http_method, title, msg=''):
+ display.vvvv('REST:%s:%s:%s\n%s' % (http_method, self.connection._url, title, msg))
+
+ @staticmethod
+ def _get_response_value(response_data):
+ return to_text(response_data.getvalue())
+
def _get_api_spec_path(self):
return self.get_option('spec_path')
@@ -190,8 +241,7 @@ class HttpApi(HttpApiBase):
return self.get_option('token_path')
@staticmethod
- def _response_to_json(response_data):
- response_text = to_text(response_data)
+ def _response_to_json(response_text):
try:
return json.loads(response_text) if response_text else {}
# JSONDecodeError only available on Python 3.5+
@@ -201,6 +251,12 @@ class HttpApi(HttpApiBase):
def get_operation_spec(self, operation_name):
return self.api_spec[SpecProp.OPERATIONS].get(operation_name, None)
+ def get_operation_specs_by_model_name(self, model_name):
+ if model_name:
+ return self.api_spec[SpecProp.MODEL_OPERATIONS].get(model_name, None)
+ else:
+ return None
+
def get_model_spec(self, model_name):
return self.api_spec[SpecProp.MODELS].get(model_name, None)
diff --git a/test/units/module_utils/network/ftd/test_common.py b/test/units/module_utils/network/ftd/test_common.py
index b3b609993d9..9a3bd63cfdf 100644
--- a/test/units/module_utils/network/ftd/test_common.py
+++ b/test/units/module_utils/network/ftd/test_common.py
@@ -70,6 +70,13 @@ def test_equal_objects_return_true_with_equal_objects():
)
+def test_equal_objects_return_true_with_equal_str_like_values():
+ assert equal_objects(
+ {'foo': b'bar'},
+ {'foo': u'bar'}
+ )
+
+
def test_equal_objects_return_true_with_equal_nested_dicts():
assert equal_objects(
{'foo': {'bar': 1, 'buz': 2}},
diff --git a/test/units/module_utils/network/ftd/test_configuration.py b/test/units/module_utils/network/ftd/test_configuration.py
index 9a58afd09eb..9fdd1947936 100644
--- a/test/units/module_utils/network/ftd/test_configuration.py
+++ b/test/units/module_utils/network/ftd/test_configuration.py
@@ -16,34 +16,81 @@
# along with Ansible. If not, see .
#
+import json
+import unittest
+
+import pytest
from units.compat import mock
from units.compat.mock import call, patch
-from ansible.module_utils.network.ftd.configuration import iterate_over_pageable_resource, BaseConfigurationResource
+
+from ansible.module_utils.network.ftd.common import HTTPMethod, FtdUnexpectedResponse
+from ansible.module_utils.network.ftd.configuration import iterate_over_pageable_resource, BaseConfigurationResource, \
+ OperationChecker, OperationNamePrefix, ParamName, QueryParams
+from ansible.module_utils.network.ftd.fdm_swagger_client import ValidationError, OperationField
class TestBaseConfigurationResource(object):
+ @pytest.fixture
+ def connection_mock(self, mocker):
+ connection_class_mock = mocker.patch('ansible.modules.network.ftd.ftd_configuration.Connection')
+ connection_instance = connection_class_mock.return_value
+ connection_instance.validate_data.return_value = True, None
+ connection_instance.validate_query_params.return_value = True, None
+ connection_instance.validate_path_params.return_value = True, None
- @patch.object(BaseConfigurationResource, 'send_request')
- def test_get_objects_by_filter_with_multiple_filters(self, send_request_mock):
+ return connection_instance
+
+ @patch.object(BaseConfigurationResource, '_send_request')
+ def test_get_objects_by_filter_with_multiple_filters(self, send_request_mock, connection_mock):
objects = [
{'name': 'obj1', 'type': 1, 'foo': {'bar': 'buzz'}},
{'name': 'obj2', 'type': 1, 'foo': {'bar': 'buz'}},
{'name': 'obj3', 'type': 2, 'foo': {'bar': 'buzz'}}
]
- resource = BaseConfigurationResource(None)
+ connection_mock.get_operation_spec.return_value = {
+ 'method': HTTPMethod.GET,
+ 'url': '/object/'
+ }
+ resource = BaseConfigurationResource(connection_mock, False)
send_request_mock.side_effect = [{'items': objects}, {'items': []}]
- assert objects == resource.get_objects_by_filter('/objects', {})
+ # resource.get_objects_by_filter returns generator so to be able compare generated list with expected list
+ # we need evaluate it.
+ assert objects == list(resource.get_objects_by_filter('test', {}))
+ send_request_mock.assert_has_calls(
+ [
+ mock.call('/object/', 'get', {}, {}, {'limit': 10, 'offset': 0})
+ ]
+ )
+ send_request_mock.reset_mock()
send_request_mock.side_effect = [{'items': objects}, {'items': []}]
- assert [objects[0]] == resource.get_objects_by_filter('/objects', {'name': 'obj1'})
+ # resource.get_objects_by_filter returns generator so to be able compare generated list with expected list
+ # we need evaluate it.
+ assert [objects[0]] == list(resource.get_objects_by_filter('test', {ParamName.FILTERS: {'name': 'obj1'}}))
+ send_request_mock.assert_has_calls(
+ [
+ mock.call('/object/', 'get', {}, {}, {QueryParams.FILTER: 'name:obj1', 'limit': 10, 'offset': 0})
+ ]
+ )
+ send_request_mock.reset_mock()
send_request_mock.side_effect = [{'items': objects}, {'items': []}]
- assert [objects[1]] == resource.get_objects_by_filter('/objects',
- {'type': 1, 'foo': {'bar': 'buz'}})
+ # resource.get_objects_by_filter returns generator so to be able compare generated list with expected list
+ # we need evaluate it.
+ assert [objects[1]] == list(resource.get_objects_by_filter(
+ 'test',
+ {ParamName.FILTERS: {'type': 1, 'foo': {'bar': 'buz'}}}))
- @patch.object(BaseConfigurationResource, 'send_request')
- def test_get_objects_by_filter_with_multiple_responses(self, send_request_mock):
+ send_request_mock.assert_has_calls(
+ [
+ mock.call('/object/', 'get', {}, {},
+ {QueryParams.FILTER: "foo:{'bar': 'buz'};type:1", 'limit': 10, 'offset': 0})
+ ]
+ )
+
+ @patch.object(BaseConfigurationResource, '_send_request')
+ def test_get_objects_by_filter_with_multiple_responses(self, send_request_mock, connection_mock):
send_request_mock.side_effect = [
{'items': [
{'name': 'obj1', 'type': 'foo'},
@@ -54,11 +101,204 @@ class TestBaseConfigurationResource(object):
]},
{'items': []}
]
+ connection_mock.get_operation_spec.return_value = {
+ 'method': HTTPMethod.GET,
+ 'url': '/object/'
+ }
+ resource = BaseConfigurationResource(connection_mock, False)
+ assert [{'name': 'obj1', 'type': 'foo'}] == list(resource.get_objects_by_filter(
+ 'test',
+ {ParamName.FILTERS: {'type': 'foo'}}))
+ send_request_mock.assert_has_calls(
+ [
+ mock.call('/object/', 'get', {}, {},
+ {QueryParams.FILTER: "type:foo", 'limit': 10, 'offset': 0})
+ ]
+ )
- resource = BaseConfigurationResource(None)
+ send_request_mock.reset_mock()
+ send_request_mock.side_effect = [
+ {'items': [
+ {'name': 'obj1', 'type': 'foo'},
+ {'name': 'obj2', 'type': 'bar'}
+ ]},
+ {'items': [
+ {'name': 'obj3', 'type': 'foo'}
+ ]},
+ {'items': []}
+ ]
+ resp = list(resource.get_objects_by_filter(
+ 'test',
+ {
+ ParamName.FILTERS: {'type': 'foo'},
+ ParamName.QUERY_PARAMS: {'limit': 2}
+ }))
+ assert [{'name': 'obj1', 'type': 'foo'}, {'name': 'obj3', 'type': 'foo'}] == resp
+ send_request_mock.assert_has_calls(
+ [
+ mock.call('/object/', 'get', {}, {},
+ {QueryParams.FILTER: "type:foo", 'limit': 2, 'offset': 0}),
+ mock.call('/object/', 'get', {}, {},
+ {QueryParams.FILTER: "type:foo", 'limit': 2, 'offset': 2})
+ ]
+ )
- assert [{'name': 'obj1', 'type': 'foo'}, {'name': 'obj3', 'type': 'foo'}] == resource.get_objects_by_filter(
- '/objects', {'type': 'foo'})
+ def test_module_should_fail_if_validation_error_in_data(self, connection_mock):
+ connection_mock.get_operation_spec.return_value = {'method': HTTPMethod.POST, 'url': '/test'}
+ report = {
+ 'required': ['objects[0].type'],
+ 'invalid_type': [
+ {
+ 'path': 'objects[3].id',
+ 'expected_type': 'string',
+ 'actually_value': 1
+ }
+ ]
+ }
+ connection_mock.validate_data.return_value = (False, json.dumps(report, sort_keys=True, indent=4))
+
+ with pytest.raises(ValidationError) as e_info:
+ resource = BaseConfigurationResource(connection_mock, False)
+ resource.crud_operation('addTest', {'data': {}})
+
+ result = e_info.value.args[0]
+ key = 'Invalid data provided'
+ assert result[key]
+ result[key] = json.loads(result[key])
+ assert result == {key: {
+ 'invalid_type': [{'actually_value': 1, 'expected_type': 'string', 'path': 'objects[3].id'}],
+ 'required': ['objects[0].type']
+ }}
+
+ def test_module_should_fail_if_validation_error_in_query_params(self, connection_mock):
+ connection_mock.get_operation_spec.return_value = {'method': HTTPMethod.GET, 'url': '/test',
+ 'returnMultipleItems': False}
+ report = {
+ 'required': ['objects[0].type'],
+ 'invalid_type': [
+ {
+ 'path': 'objects[3].id',
+ 'expected_type': 'string',
+ 'actually_value': 1
+ }
+ ]
+ }
+ connection_mock.validate_query_params.return_value = (False, json.dumps(report, sort_keys=True, indent=4))
+
+ with pytest.raises(ValidationError) as e_info:
+ resource = BaseConfigurationResource(connection_mock, False)
+ resource.crud_operation('getTestList', {'data': {}})
+
+ result = e_info.value.args[0]
+
+ key = 'Invalid query_params provided'
+ assert result[key]
+ result[key] = json.loads(result[key])
+
+ assert result == {key: {
+ 'invalid_type': [{'actually_value': 1, 'expected_type': 'string', 'path': 'objects[3].id'}],
+ 'required': ['objects[0].type']}}
+
+ def test_module_should_fail_if_validation_error_in_path_params(self, connection_mock):
+ connection_mock.get_operation_spec.return_value = {'method': HTTPMethod.GET, 'url': '/test',
+ 'returnMultipleItems': False}
+ report = {
+ 'path_params': {
+ 'required': ['objects[0].type'],
+ 'invalid_type': [
+ {
+ 'path': 'objects[3].id',
+ 'expected_type': 'string',
+ 'actually_value': 1
+ }
+ ]
+ }
+ }
+ connection_mock.validate_path_params.return_value = (False, json.dumps(report, sort_keys=True, indent=4))
+
+ with pytest.raises(ValidationError) as e_info:
+ resource = BaseConfigurationResource(connection_mock, False)
+ resource.crud_operation('putTest', {'data': {}})
+
+ result = e_info.value.args[0]
+
+ key = 'Invalid path_params provided'
+ assert result[key]
+ result[key] = json.loads(result[key])
+
+ assert result == {key: {
+ 'path_params': {
+ 'invalid_type': [{'actually_value': 1, 'expected_type': 'string', 'path': 'objects[3].id'}],
+ 'required': ['objects[0].type']}}}
+
+ def test_module_should_fail_if_validation_error_in_all_params(self, connection_mock):
+ connection_mock.get_operation_spec.return_value = {'method': HTTPMethod.POST, 'url': '/test'}
+ report = {
+ 'data': {
+ 'required': ['objects[0].type'],
+ 'invalid_type': [
+ {
+ 'path': 'objects[3].id',
+ 'expected_type': 'string',
+ 'actually_value': 1
+ }
+ ]
+ },
+ 'path_params': {
+ 'required': ['some_param'],
+ 'invalid_type': [
+ {
+ 'path': 'name',
+ 'expected_type': 'string',
+ 'actually_value': True
+ }
+ ]
+ },
+ 'query_params': {
+ 'required': ['other_param'],
+ 'invalid_type': [
+ {
+ 'path': 'f_integer',
+ 'expected_type': 'integer',
+ 'actually_value': "test"
+ }
+ ]
+ }
+ }
+ connection_mock.validate_data.return_value = (False, json.dumps(report['data'], sort_keys=True, indent=4))
+ connection_mock.validate_query_params.return_value = (False,
+ json.dumps(report['query_params'], sort_keys=True,
+ indent=4))
+ connection_mock.validate_path_params.return_value = (False,
+ json.dumps(report['path_params'], sort_keys=True,
+ indent=4))
+
+ with pytest.raises(ValidationError) as e_info:
+ resource = BaseConfigurationResource(connection_mock, False)
+ resource.crud_operation('putTest', {'data': {}})
+
+ result = e_info.value.args[0]
+
+ key_data = 'Invalid data provided'
+ assert result[key_data]
+ result[key_data] = json.loads(result[key_data])
+
+ key_path_params = 'Invalid path_params provided'
+ assert result[key_path_params]
+ result[key_path_params] = json.loads(result[key_path_params])
+
+ key_query_params = 'Invalid query_params provided'
+ assert result[key_query_params]
+ result[key_query_params] = json.loads(result[key_query_params])
+
+ assert result == {
+ key_data: {'invalid_type': [{'actually_value': 1, 'expected_type': 'string', 'path': 'objects[3].id'}],
+ 'required': ['objects[0].type']},
+ key_path_params: {'invalid_type': [{'actually_value': True, 'expected_type': 'string', 'path': 'name'}],
+ 'required': ['some_param']},
+ key_query_params: {
+ 'invalid_type': [{'actually_value': 'test', 'expected_type': 'integer', 'path': 'f_integer'}],
+ 'required': ['other_param']}}
class TestIterateOverPageableResource(object):
@@ -66,7 +306,7 @@ class TestIterateOverPageableResource(object):
def test_iterate_over_pageable_resource_with_no_items(self):
resource_func = mock.Mock(return_value={'items': []})
- items = iterate_over_pageable_resource(resource_func)
+ items = iterate_over_pageable_resource(resource_func, {'query_params': {}})
assert [] == list(items)
@@ -76,33 +316,37 @@ class TestIterateOverPageableResource(object):
{'items': []},
])
- items = iterate_over_pageable_resource(resource_func)
+ items = iterate_over_pageable_resource(resource_func, {'query_params': {}})
assert ['foo', 'bar'] == list(items)
resource_func.assert_has_calls([
- call(query_params={'offset': 0, 'limit': 10}),
- call(query_params={'offset': 10, 'limit': 10})
+ call(params={'query_params': {'offset': 0, 'limit': 10}})
])
def test_iterate_over_pageable_resource_with_multiple_pages(self):
- resource_func = mock.Mock(side_effect=[
+ objects = [
{'items': ['foo']},
{'items': ['bar']},
{'items': ['buzz']},
{'items': []},
- ])
+ ]
+ resource_func = mock.Mock(side_effect=objects)
- items = iterate_over_pageable_resource(resource_func)
+ items = iterate_over_pageable_resource(resource_func, {'query_params': {}})
+ assert ['foo'] == list(items)
+ resource_func.reset_mock()
+ resource_func = mock.Mock(side_effect=objects)
+ items = iterate_over_pageable_resource(resource_func, {'query_params': {'limit': 1}})
assert ['foo', 'bar', 'buzz'] == list(items)
def test_iterate_over_pageable_resource_should_preserve_query_params(self):
resource_func = mock.Mock(return_value={'items': []})
- items = iterate_over_pageable_resource(resource_func, {'filter': 'name:123'})
+ items = iterate_over_pageable_resource(resource_func, {'query_params': {'filter': 'name:123'}})
assert [] == list(items)
- resource_func.assert_called_once_with(query_params={'filter': 'name:123', 'offset': 0, 'limit': 10})
+ resource_func.assert_called_once_with(params={'query_params': {'filter': 'name:123', 'offset': 0, 'limit': 10}})
def test_iterate_over_pageable_resource_should_preserve_limit(self):
resource_func = mock.Mock(side_effect=[
@@ -110,12 +354,11 @@ class TestIterateOverPageableResource(object):
{'items': []},
])
- items = iterate_over_pageable_resource(resource_func, {'limit': 1})
+ items = iterate_over_pageable_resource(resource_func, {'query_params': {'limit': 1}})
assert ['foo'] == list(items)
resource_func.assert_has_calls([
- call(query_params={'offset': 0, 'limit': 1}),
- call(query_params={'offset': 1, 'limit': 1})
+ call(params={'query_params': {'offset': 0, 'limit': 1}})
])
def test_iterate_over_pageable_resource_should_preserve_offset(self):
@@ -124,12 +367,11 @@ class TestIterateOverPageableResource(object):
{'items': []},
])
- items = iterate_over_pageable_resource(resource_func, {'offset': 3})
+ items = iterate_over_pageable_resource(resource_func, {'query_params': {'offset': 3}})
assert ['foo'] == list(items)
resource_func.assert_has_calls([
- call(query_params={'offset': 3, 'limit': 10}),
- call(query_params={'offset': 13, 'limit': 10})
+ call(params={'query_params': {'offset': 3, 'limit': 10}}),
])
def test_iterate_over_pageable_resource_should_pass_with_string_offset_and_limit(self):
@@ -138,10 +380,191 @@ class TestIterateOverPageableResource(object):
{'items': []},
])
- items = iterate_over_pageable_resource(resource_func, {'offset': '1', 'limit': '1'})
+ items = iterate_over_pageable_resource(resource_func, {'query_params': {'offset': '1', 'limit': '1'}})
assert ['foo'] == list(items)
resource_func.assert_has_calls([
- call(query_params={'offset': '1', 'limit': '1'}),
- call(query_params={'offset': 2, 'limit': '1'})
+ call(params={'query_params': {'offset': '1', 'limit': '1'}}),
+ call(params={'query_params': {'offset': 2, 'limit': '1'}})
])
+
+ def test_iterate_over_pageable_resource_raises_exception_when_server_returned_more_items_than_requested(self):
+ resource_func = mock.Mock(side_effect=[
+ {'items': ['foo', 'redundant_bar']},
+ {'items': []},
+ ])
+
+ with pytest.raises(FtdUnexpectedResponse):
+ list(iterate_over_pageable_resource(resource_func, {'query_params': {'offset': '1', 'limit': '1'}}))
+
+ resource_func.assert_has_calls([
+ call(params={'query_params': {'offset': '1', 'limit': '1'}})
+ ])
+
+
+class TestOperationCheckerClass(unittest.TestCase):
+ def setUp(self):
+ self._checker = OperationChecker
+
+ def test_is_add_operation_positive(self):
+ operation_name = OperationNamePrefix.ADD + "Object"
+ operation_spec = {OperationField.METHOD: HTTPMethod.POST}
+ assert self._checker.is_add_operation(operation_name, operation_spec)
+
+ def test_is_add_operation_wrong_method_in_spec(self):
+ operation_name = OperationNamePrefix.ADD + "Object"
+ operation_spec = {OperationField.METHOD: HTTPMethod.GET}
+ assert not self._checker.is_add_operation(operation_name, operation_spec)
+
+ def test_is_add_operation_negative_wrong_operation_name(self):
+ operation_name = OperationNamePrefix.GET + "Object"
+ operation_spec = {OperationField.METHOD: HTTPMethod.POST}
+ assert not self._checker.is_add_operation(operation_name, operation_spec)
+
+ def test_is_edit_operation_positive(self):
+ operation_name = OperationNamePrefix.EDIT + "Object"
+ operation_spec = {OperationField.METHOD: HTTPMethod.PUT}
+ assert self._checker.is_edit_operation(operation_name, operation_spec)
+
+ def test_is_edit_operation_wrong_method_in_spec(self):
+ operation_name = OperationNamePrefix.EDIT + "Object"
+ operation_spec = {OperationField.METHOD: HTTPMethod.GET}
+ assert not self._checker.is_edit_operation(operation_name, operation_spec)
+
+ def test_is_edit_operation_negative_wrong_operation_name(self):
+ operation_name = OperationNamePrefix.GET + "Object"
+ operation_spec = {OperationField.METHOD: HTTPMethod.PUT}
+ assert not self._checker.is_edit_operation(operation_name, operation_spec)
+
+ def test_is_delete_operation_positive(self):
+ operation_name = OperationNamePrefix.DELETE + "Object"
+ operation_spec = {OperationField.METHOD: HTTPMethod.DELETE}
+ self.assertTrue(
+ self._checker.is_delete_operation(operation_name, operation_spec)
+ )
+
+ def test_is_delete_operation_wrong_method_in_spec(self):
+ operation_name = OperationNamePrefix.DELETE + "Object"
+ operation_spec = {OperationField.METHOD: HTTPMethod.GET}
+ assert not self._checker.is_delete_operation(operation_name, operation_spec)
+
+ def test_is_delete_operation_negative_wrong_operation_name(self):
+ operation_name = OperationNamePrefix.GET + "Object"
+ operation_spec = {OperationField.METHOD: HTTPMethod.DELETE}
+ assert not self._checker.is_delete_operation(operation_name, operation_spec)
+
+ def test_is_get_list_operation_positive(self):
+ operation_name = OperationNamePrefix.GET + "Object"
+ operation_spec = {
+ OperationField.METHOD: HTTPMethod.GET,
+ OperationField.RETURN_MULTIPLE_ITEMS: True
+ }
+ assert self._checker.is_get_list_operation(operation_name, operation_spec)
+
+ def test_is_get_list_operation_wrong_method_in_spec(self):
+ operation_name = OperationNamePrefix.GET + "Object"
+ operation_spec = {
+ OperationField.METHOD: HTTPMethod.POST,
+ OperationField.RETURN_MULTIPLE_ITEMS: True
+ }
+ assert not self._checker.is_get_list_operation(operation_name, operation_spec)
+
+ def test_is_get_list_operation_does_not_return_list(self):
+ operation_name = OperationNamePrefix.GET + "Object"
+ operation_spec = {
+ OperationField.METHOD: HTTPMethod.GET,
+ OperationField.RETURN_MULTIPLE_ITEMS: False
+ }
+ assert not self._checker.is_get_list_operation(operation_name, operation_spec)
+
+ def test_is_get_operation_positive(self):
+ operation_name = OperationNamePrefix.GET + "Object"
+ operation_spec = {
+ OperationField.METHOD: HTTPMethod.GET,
+ OperationField.RETURN_MULTIPLE_ITEMS: False
+ }
+ self.assertTrue(
+ self._checker.is_get_operation(operation_name, operation_spec)
+ )
+
+ def test_is_get_operation_wrong_method_in_spec(self):
+ operation_name = OperationNamePrefix.ADD + "Object"
+ operation_spec = {
+ OperationField.METHOD: HTTPMethod.POST,
+ OperationField.RETURN_MULTIPLE_ITEMS: False
+ }
+ assert not self._checker.is_get_operation(operation_name, operation_spec)
+
+ def test_is_get_operation_negative_when_returns_multiple(self):
+ operation_name = OperationNamePrefix.GET + "Object"
+ operation_spec = {
+ OperationField.METHOD: HTTPMethod.GET,
+ OperationField.RETURN_MULTIPLE_ITEMS: True
+ }
+ assert not self._checker.is_get_operation(operation_name, operation_spec)
+
+ def test_is_upsert_operation_positive(self):
+ operation_name = OperationNamePrefix.UPSERT + "Object"
+ assert self._checker.is_upsert_operation(operation_name)
+
+ def test_is_upsert_operation_with_wrong_operation_name(self):
+ for op_type in [OperationNamePrefix.ADD, OperationNamePrefix.GET, OperationNamePrefix.EDIT,
+ OperationNamePrefix.DELETE]:
+ operation_name = op_type + "Object"
+ assert not self._checker.is_upsert_operation(operation_name)
+
+ def test_is_find_by_filter_operation(self):
+ operation_name = OperationNamePrefix.GET + "Object"
+ operation_spec = {
+ OperationField.METHOD: HTTPMethod.GET,
+ OperationField.RETURN_MULTIPLE_ITEMS: True
+ }
+ params = {ParamName.FILTERS: 1}
+ self.assertTrue(
+ self._checker.is_find_by_filter_operation(
+ operation_name, params, operation_spec
+ )
+ )
+
+ def test_is_find_by_filter_operation_negative_when_filters_empty(self):
+ operation_name = OperationNamePrefix.GET + "Object"
+ operation_spec = {
+ OperationField.METHOD: HTTPMethod.GET,
+ OperationField.RETURN_MULTIPLE_ITEMS: True
+ }
+ params = {ParamName.FILTERS: None}
+ assert not self._checker.is_find_by_filter_operation(
+ operation_name, params, operation_spec
+ )
+
+ params = {}
+ assert not self._checker.is_find_by_filter_operation(
+ operation_name, params, operation_spec
+ )
+
+ @patch.object(OperationChecker, "is_add_operation")
+ @patch.object(OperationChecker, "is_edit_operation")
+ @patch.object(OperationChecker, "is_get_list_operation")
+ def test_is_upsert_operation_supported_operation(self, is_add_mock, is_edit_mock, is_get_list_mock):
+ operations_spec = {
+ 'add': 1,
+ 'edit': 1,
+ 'getList': 1
+ }
+ is_add_mock.side_effect = [1, 0, 0]
+ is_edit_mock.side_effect = [1, 0, 0]
+ is_get_list_mock.side_effect = [1, 0, 0]
+
+ assert self._checker.is_upsert_operation_supported(operations_spec)
+
+ is_add_mock.side_effect = [1, 0, 0]
+ is_edit_mock.side_effect = [0, 1, 0]
+ is_get_list_mock.side_effect = [0, 0, 0]
+
+ assert not self._checker.is_upsert_operation_supported(operations_spec)
+
+ is_add_mock.side_effect = [1, 0, 0]
+ is_edit_mock.side_effect = [0, 0, 0]
+ is_get_list_mock.side_effect = [1, 0, 0]
+
+ assert not self._checker.is_upsert_operation_supported(operations_spec)
diff --git a/test/units/module_utils/network/ftd/test_fdm_swagger_parser.py b/test/units/module_utils/network/ftd/test_fdm_swagger_parser.py
index 70b2eac63dd..cdbd6f94bb0 100644
--- a/test/units/module_utils/network/ftd/test_fdm_swagger_parser.py
+++ b/test/units/module_utils/network/ftd/test_fdm_swagger_parser.py
@@ -39,7 +39,7 @@ base = {
"$ref": "#/definitions/FQDNDNSResolution"},
"id": {"type": "string"},
"type": {"type": "string", "default": "networkobject"}},
- "required": ["subType", "type", "value"]},
+ "required": ["subType", "type", "value", "name"]},
"NetworkObjectWrapper": {
"allOf": [{"$ref": "#/definitions/NetworkObject"}, {"$ref": "#/definitions/LinksWrapper"}]}
},
@@ -140,14 +140,16 @@ class TestFdmSwaggerParser(unittest.TestCase):
'type': 'string'
}
}
- }
+ },
+ 'returnMultipleItems': True
},
'addNetworkObject': {
'method': HTTPMethod.POST,
'url': '/api/fdm/v2/object/networks',
'modelName': 'NetworkObject',
'parameters': {'path': {},
- 'query': {}}
+ 'query': {}},
+ 'returnMultipleItems': False
},
'getNetworkObject': {
'method': HTTPMethod.GET,
@@ -161,7 +163,8 @@ class TestFdmSwaggerParser(unittest.TestCase):
}
},
'query': {}
- }
+ },
+ 'returnMultipleItems': False
},
'editNetworkObject': {
'method': HTTPMethod.PUT,
@@ -175,12 +178,13 @@ class TestFdmSwaggerParser(unittest.TestCase):
}
},
'query': {}
- }
+ },
+ 'returnMultipleItems': False
},
'deleteNetworkObject': {
'method': HTTPMethod.DELETE,
'url': '/api/fdm/v2/object/networks/{objId}',
- 'modelName': None,
+ 'modelName': 'NetworkObject',
'parameters': {
'path': {
'objId': {
@@ -189,8 +193,173 @@ class TestFdmSwaggerParser(unittest.TestCase):
}
},
'query': {}
- }
+ },
+ 'returnMultipleItems': False
}
}
assert sorted(['NetworkObject', 'NetworkObjectWrapper']) == sorted(self.fdm_data['models'].keys())
assert expected_operations == self.fdm_data['operations']
+ assert {'NetworkObject': expected_operations} == self.fdm_data['model_operations']
+
+ def test_simple_object_with_documentation(self):
+ api_spec = copy.deepcopy(base)
+ docs = {
+ 'definitions': {
+ 'NetworkObject': {
+ 'description': 'Description for Network Object',
+ 'properties': {'name': 'Description for name field'}
+ }
+ },
+ 'paths': {
+ '/object/networks': {
+ 'get': {
+ 'description': 'Description for getNetworkObjectList operation',
+ 'parameters': [{'name': 'offset', 'description': 'Description for offset field'}]
+ },
+ 'post': {'description': 'Description for addNetworkObject operation'}
+ }
+ }
+ }
+
+ self.fdm_data = FdmSwaggerParser().parse_spec(api_spec, docs)
+
+ assert 'Description for Network Object' == self.fdm_data['models']['NetworkObject']['description']
+ assert '' == self.fdm_data['models']['NetworkObjectWrapper']['description']
+ network_properties = self.fdm_data['models']['NetworkObject']['properties']
+ assert '' == network_properties['id']['description']
+ assert not network_properties['id']['required']
+ assert 'Description for name field' == network_properties['name']['description']
+ assert network_properties['name']['required']
+
+ ops = self.fdm_data['operations']
+ assert 'Description for getNetworkObjectList operation' == ops['getNetworkObjectList']['description']
+ assert 'Description for addNetworkObject operation' == ops['addNetworkObject']['description']
+ assert '' == ops['deleteNetworkObject']['description']
+
+ get_op_params = ops['getNetworkObjectList']['parameters']
+ assert 'Description for offset field' == get_op_params['query']['offset']['description']
+ assert '' == get_op_params['query']['limit']['description']
+
+ def test_model_operations_should_contain_all_operations(self):
+ data = {
+ 'basePath': '/v2/',
+ 'definitions': {
+ 'Model1': {"type": "object"},
+ 'Model2': {"type": "object"},
+ 'Model3': {"type": "object"}
+ },
+ 'paths': {
+ 'path1': {
+ 'get': {
+ 'operationId': 'getSomeModelList',
+ "responses": {
+ "200": {"description": "",
+ "schema": {"type": "object",
+ "title": "NetworkObjectList",
+ "properties": {
+ "items": {
+ "type": "array",
+ "items": {
+ "$ref": "#/definitions/Model1"
+ }
+ }
+ }}
+ }
+ }
+ },
+ "post": {
+ "operationId": "addSomeModel",
+ "parameters": [{"in": "body",
+ "name": "body",
+ "schema": {"$ref": "#/definitions/Model2"}
+ }]}
+ },
+ 'path2/{id}': {
+ "get": {"operationId": "getSomeModel",
+ "responses": {"200": {"description": "",
+ "schema": {"type": "object",
+ "$ref": "#/definitions/Model3"}},
+ }
+ },
+ "put": {"operationId": "editSomeModel",
+ "parameters": [{"in": "body",
+ "name": "body",
+ "schema": {"$ref": "#/definitions/Model1"}}
+ ]},
+ "delete": {
+ "operationId": "deleteModel3",
+ }},
+ 'path3': {
+ "delete": {
+ "operationId": "deleteNoneModel",
+ }
+ }
+ }
+ }
+
+ expected_operations = {
+ 'getSomeModelList': {
+ 'method': HTTPMethod.GET,
+ 'url': '/v2/path1',
+ 'modelName': 'Model1',
+ 'returnMultipleItems': True
+ },
+ 'addSomeModel': {
+ 'method': HTTPMethod.POST,
+ 'url': '/v2/path1',
+ 'modelName': 'Model2',
+ 'parameters': {
+ 'path': {},
+ 'query': {}
+ },
+ 'returnMultipleItems': False
+ },
+ 'getSomeModel': {
+ 'method': HTTPMethod.GET,
+ 'url': '/v2/path2/{id}',
+ 'modelName': 'Model3',
+ 'returnMultipleItems': False
+ },
+ 'editSomeModel': {
+ 'method': HTTPMethod.PUT,
+ 'url': '/v2/path2/{id}',
+ 'modelName': 'Model1',
+ 'parameters': {
+ 'path': {},
+ 'query': {}
+ },
+ 'returnMultipleItems': False
+ },
+ 'deleteModel3': {
+ 'method': HTTPMethod.DELETE,
+ 'url': '/v2/path2/{id}',
+ 'modelName': 'Model3',
+ 'returnMultipleItems': False
+ },
+ 'deleteNoneModel': {
+ 'method': HTTPMethod.DELETE,
+ 'url': '/v2/path3',
+ 'modelName': None,
+ 'returnMultipleItems': False
+ }
+ }
+
+ fdm_data = FdmSwaggerParser().parse_spec(data)
+ assert sorted(['Model1', 'Model2', 'Model3']) == sorted(fdm_data['models'].keys())
+ assert expected_operations == fdm_data['operations']
+ assert {
+ 'Model1': {
+ 'getSomeModelList': expected_operations['getSomeModelList'],
+ 'editSomeModel': expected_operations['editSomeModel']
+ },
+ 'Model2': {
+ 'addSomeModel': expected_operations['addSomeModel']
+ },
+ 'Model3': {
+ 'getSomeModel': expected_operations['getSomeModel'],
+ 'deleteModel3': expected_operations['deleteModel3']
+ },
+ None: {
+ 'deleteNoneModel': expected_operations['deleteNoneModel']
+ }
+ } == fdm_data['model_operations']
diff --git a/test/units/module_utils/network/ftd/test_fdm_swagger_validator.py b/test/units/module_utils/network/ftd/test_fdm_swagger_validator.py
index e597bd94385..43738f53c52 100644
--- a/test/units/module_utils/network/ftd/test_fdm_swagger_validator.py
+++ b/test/units/module_utils/network/ftd/test_fdm_swagger_validator.py
@@ -15,7 +15,7 @@
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see .
#
-
+import copy
import os
import unittest
@@ -248,7 +248,7 @@ class TestFdmSwaggerValidator(unittest.TestCase):
def test_path_params_invalid_params(self):
self.url_params_invalid_params(method='validate_path_params', parameters_type='path')
- def test_path_params_invalid_params(self):
+ def test_query_params_invalid_params(self):
self.url_params_invalid_params(method='validate_query_params', parameters_type='query')
@staticmethod
@@ -384,7 +384,7 @@ class TestFdmSwaggerValidator(unittest.TestCase):
'someParam': 1.2,
'p_integer': "1",
'p_boolean': "",
- 'p_number': "2"
+ 'p_number': "2.1"
}
valid, rez = getattr(validator, method)('getNetwork', data)
assert not valid
@@ -405,20 +405,10 @@ class TestFdmSwaggerValidator(unittest.TestCase):
'expected_type': 'string',
'actually_value': 1.2
},
- {
- 'path': 'p_integer',
- 'expected_type': 'integer',
- 'actually_value': "1"
- },
{
'path': 'p_boolean',
'expected_type': 'boolean',
'actually_value': ""
- },
- {
- 'path': 'p_number',
- 'expected_type': 'number',
- 'actually_value': "2"
}
]
}) == sort_validator_rez(rez)
@@ -603,6 +593,15 @@ class TestFdmSwaggerValidator(unittest.TestCase):
assert valid
assert rez is None
+ def test_pass_no_data_with_no_required_fields(self):
+ spec = copy.deepcopy(mock_data)
+ del spec['models']['NetworkObject']['required']
+
+ valid, rez = FdmSwaggerValidator(spec).validate_data('getNetworkObjectList', {})
+
+ assert valid
+ assert rez is None
+
def test_pass_all_fields_with_correct_data(self):
data = {
'id': 'id-di',
@@ -818,7 +817,7 @@ class TestFdmSwaggerValidator(unittest.TestCase):
"f_string": False,
"f_number": "1",
"f_boolean": "",
- "f_integer": 1.2
+ "f_integer": "1.2"
}
valid, rez = FdmSwaggerValidator(local_mock_data).validate_data('getdata', invalid_data)
@@ -830,11 +829,6 @@ class TestFdmSwaggerValidator(unittest.TestCase):
'expected_type': 'string',
'actually_value': False
},
- {
- 'path': 'f_number',
- 'expected_type': 'number',
- 'actually_value': "1"
- },
{
'path': 'f_boolean',
'expected_type': 'boolean',
@@ -843,7 +837,7 @@ class TestFdmSwaggerValidator(unittest.TestCase):
{
'path': 'f_integer',
'expected_type': 'integer',
- 'actually_value': 1.2
+ 'actually_value': '1.2'
}
]
}) == sort_validator_rez(rez)
diff --git a/test/units/module_utils/network/ftd/test_upsert_functionality.py b/test/units/module_utils/network/ftd/test_upsert_functionality.py
new file mode 100644
index 00000000000..469470793c1
--- /dev/null
+++ b/test/units/module_utils/network/ftd/test_upsert_functionality.py
@@ -0,0 +1,762 @@
+# Copyright (c) 2018 Cisco and/or its affiliates.
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see .
+#
+
+from __future__ import absolute_import
+
+import copy
+import json
+import unittest
+
+import pytest
+from units.compat import mock
+
+from ansible.module_utils.network.ftd.common import FtdServerError, HTTPMethod, ResponseParams, FtdConfigurationError
+from ansible.module_utils.network.ftd.configuration import DUPLICATE_NAME_ERROR_MESSAGE, UNPROCESSABLE_ENTITY_STATUS, \
+ MULTIPLE_DUPLICATES_FOUND_ERROR, BaseConfigurationResource, FtdInvalidOperationNameError, QueryParams
+from ansible.module_utils.network.ftd.fdm_swagger_client import ValidationError
+
+ADD_RESPONSE = {'status': 'Object added'}
+EDIT_RESPONSE = {'status': 'Object edited'}
+DELETE_RESPONSE = {'status': 'Object deleted'}
+GET_BY_FILTER_RESPONSE = [{'name': 'foo', 'description': 'bar'}]
+ARBITRARY_RESPONSE = {'status': 'Arbitrary request sent'}
+
+
+class TestUpsertOperationUnitTests(unittest.TestCase):
+ def setUp(self):
+ conn = mock.MagicMock()
+ self._resource = BaseConfigurationResource(conn)
+
+ def test_get_operation_name(self):
+ operation_a = mock.MagicMock()
+ operation_b = mock.MagicMock()
+
+ def checker_wrapper(expected_object):
+ def checker(obj, *args, **kwargs):
+ return obj == expected_object
+
+ return checker
+
+ operations = {
+ operation_a: "spec",
+ operation_b: "spec"
+ }
+
+ assert operation_a == self._resource._get_operation_name(checker_wrapper(operation_a), operations)
+ assert operation_b == self._resource._get_operation_name(checker_wrapper(operation_b), operations)
+
+ self.assertRaises(
+ FtdConfigurationError,
+ self._resource._get_operation_name, checker_wrapper(None), operations
+ )
+
+ @mock.patch.object(BaseConfigurationResource, "_get_operation_name")
+ @mock.patch.object(BaseConfigurationResource, "add_object")
+ def test_add_upserted_object(self, add_object_mock, get_operation_mock):
+ model_operations = mock.MagicMock()
+ params = mock.MagicMock()
+ add_op_name = get_operation_mock.return_value
+
+ assert add_object_mock.return_value == self._resource._add_upserted_object(model_operations, params)
+
+ get_operation_mock.assert_called_once_with(
+ self._resource._operation_checker.is_add_operation,
+ model_operations)
+ add_object_mock.assert_called_once_with(add_op_name, params)
+
+ @mock.patch.object(BaseConfigurationResource, "_get_operation_name")
+ @mock.patch.object(BaseConfigurationResource, "edit_object")
+ @mock.patch("ansible.module_utils.network.ftd.configuration.copy_identity_properties")
+ @mock.patch("ansible.module_utils.network.ftd.configuration._set_default")
+ def test_edit_upserted_object(self, _set_default_mock, copy_properties_mock, edit_object_mock, get_operation_mock):
+ model_operations = mock.MagicMock()
+ existing_object = mock.MagicMock()
+ params = {
+ 'path_params': {},
+ 'data': {}
+ }
+
+ result = self._resource._edit_upserted_object(model_operations, existing_object, params)
+
+ assert result == edit_object_mock.return_value
+
+ _set_default_mock.assert_has_calls([
+ mock.call(params, 'path_params', {}),
+ mock.call(params, 'data', {})
+ ])
+ get_operation_mock.assert_called_once_with(
+ self._resource._operation_checker.is_edit_operation,
+ model_operations
+ )
+ copy_properties_mock.assert_called_once_with(
+ existing_object,
+ params['data']
+ )
+ edit_object_mock.assert_called_once_with(
+ get_operation_mock.return_value,
+ params
+ )
+
+ @mock.patch.object(BaseConfigurationResource, "get_operation_specs_by_model_name")
+ @mock.patch("ansible.module_utils.network.ftd.configuration.OperationChecker.is_upsert_operation_supported")
+ @mock.patch("ansible.module_utils.network.ftd.configuration._extract_model_from_upsert_operation")
+ def test_is_upsert_operation_supported(self, extract_model_mock, is_upsert_supported_mock, get_operation_spec_mock):
+ op_name = mock.MagicMock()
+
+ result = self._resource.is_upsert_operation_supported(op_name)
+
+ assert result == is_upsert_supported_mock.return_value
+ extract_model_mock.assert_called_once_with(op_name)
+ get_operation_spec_mock.assert_called_once_with(extract_model_mock.return_value)
+ is_upsert_supported_mock.assert_called_once_with(get_operation_spec_mock.return_value)
+
+ @mock.patch.object(BaseConfigurationResource, "is_upsert_operation_supported")
+ @mock.patch.object(BaseConfigurationResource, "get_operation_specs_by_model_name")
+ @mock.patch.object(BaseConfigurationResource, "_add_upserted_object")
+ @mock.patch.object(BaseConfigurationResource, "_edit_upserted_object")
+ @mock.patch("ansible.module_utils.network.ftd.configuration._extract_model_from_upsert_operation")
+ def test_upsert_object_succesfully_added(self, extract_model_mock, edit_mock, add_mock, get_operation_mock,
+ is_upsert_supported_mock):
+ op_name = mock.MagicMock()
+ params = mock.MagicMock()
+
+ is_upsert_supported_mock.return_value = True
+
+ result = self._resource.upsert_object(op_name, params)
+
+ assert result == add_mock.return_value
+ is_upsert_supported_mock.assert_called_once_with(op_name)
+ extract_model_mock.assert_called_once_with(op_name)
+ get_operation_mock.assert_called_once_with(extract_model_mock.return_value)
+ add_mock.assert_called_once_with(get_operation_mock.return_value, params)
+ edit_mock.assert_not_called()
+
+ @mock.patch.object(BaseConfigurationResource, "is_upsert_operation_supported")
+ @mock.patch.object(BaseConfigurationResource, "get_operation_specs_by_model_name")
+ @mock.patch.object(BaseConfigurationResource, "_add_upserted_object")
+ @mock.patch.object(BaseConfigurationResource, "_edit_upserted_object")
+ @mock.patch("ansible.module_utils.network.ftd.configuration._extract_model_from_upsert_operation")
+ def test_upsert_object_succesfully_edited(self, extract_model_mock, edit_mock, add_mock, get_operation_mock,
+ is_upsert_supported_mock):
+ op_name = mock.MagicMock()
+ params = mock.MagicMock()
+
+ is_upsert_supported_mock.return_value = True
+ error = FtdConfigurationError("Obj duplication error")
+ error.obj = mock.MagicMock()
+
+ add_mock.side_effect = error
+
+ result = self._resource.upsert_object(op_name, params)
+
+ assert result == edit_mock.return_value
+ is_upsert_supported_mock.assert_called_once_with(op_name)
+ extract_model_mock.assert_called_once_with(op_name)
+ get_operation_mock.assert_called_once_with(extract_model_mock.return_value)
+ add_mock.assert_called_once_with(get_operation_mock.return_value, params)
+ edit_mock.assert_called_once_with(get_operation_mock.return_value, error.obj, params)
+
+ @mock.patch.object(BaseConfigurationResource, "is_upsert_operation_supported")
+ @mock.patch.object(BaseConfigurationResource, "get_operation_specs_by_model_name")
+ @mock.patch.object(BaseConfigurationResource, "_add_upserted_object")
+ @mock.patch.object(BaseConfigurationResource, "_edit_upserted_object")
+ @mock.patch("ansible.module_utils.network.ftd.configuration._extract_model_from_upsert_operation")
+ def test_upsert_object_not_supported(self, extract_model_mock, edit_mock, add_mock, get_operation_mock,
+ is_upsert_supported_mock):
+ op_name = mock.MagicMock()
+ params = mock.MagicMock()
+
+ is_upsert_supported_mock.return_value = False
+
+ self.assertRaises(
+ FtdInvalidOperationNameError,
+ self._resource.upsert_object, op_name, params
+ )
+
+ is_upsert_supported_mock.assert_called_once_with(op_name)
+ extract_model_mock.assert_not_called()
+ get_operation_mock.assert_not_called()
+ add_mock.assert_not_called()
+ edit_mock.assert_not_called()
+
+ @mock.patch.object(BaseConfigurationResource, "is_upsert_operation_supported")
+ @mock.patch.object(BaseConfigurationResource, "get_operation_specs_by_model_name")
+ @mock.patch.object(BaseConfigurationResource, "_add_upserted_object")
+ @mock.patch.object(BaseConfigurationResource, "_edit_upserted_object")
+ @mock.patch("ansible.module_utils.network.ftd.configuration._extract_model_from_upsert_operation")
+ def test_upsert_object_neither_added_nor_edited(self, extract_model_mock, edit_mock, add_mock, get_operation_mock,
+ is_upsert_supported_mock):
+ op_name = mock.MagicMock()
+ params = mock.MagicMock()
+
+ is_upsert_supported_mock.return_value = True
+ error = FtdConfigurationError("Obj duplication error")
+ error.obj = mock.MagicMock()
+
+ add_mock.side_effect = error
+ edit_mock.side_effect = FtdConfigurationError("Some object edit error")
+
+ self.assertRaises(
+ FtdConfigurationError,
+ self._resource.upsert_object, op_name, params
+ )
+
+ is_upsert_supported_mock.assert_called_once_with(op_name)
+ extract_model_mock.assert_called_once_with(op_name)
+ get_operation_mock.assert_called_once_with(extract_model_mock.return_value)
+ add_mock.assert_called_once_with(get_operation_mock.return_value, params)
+ edit_mock.assert_called_once_with(get_operation_mock.return_value, error.obj, params)
+
+ @mock.patch.object(BaseConfigurationResource, "is_upsert_operation_supported")
+ @mock.patch.object(BaseConfigurationResource, "get_operation_specs_by_model_name")
+ @mock.patch.object(BaseConfigurationResource, "_add_upserted_object")
+ @mock.patch.object(BaseConfigurationResource, "_edit_upserted_object")
+ @mock.patch("ansible.module_utils.network.ftd.configuration._extract_model_from_upsert_operation")
+ def test_upsert_object_with_fatal_error_during_add(self, extract_model_mock, edit_mock, add_mock,
+ get_operation_mock, is_upsert_supported_mock):
+ op_name = mock.MagicMock()
+ params = mock.MagicMock()
+
+ is_upsert_supported_mock.return_value = True
+
+ error = FtdConfigurationError("Obj duplication error")
+ add_mock.side_effect = error
+
+ self.assertRaises(
+ FtdConfigurationError,
+ self._resource.upsert_object, op_name, params
+ )
+
+ is_upsert_supported_mock.assert_called_once_with(op_name)
+ extract_model_mock.assert_called_once_with(op_name)
+ get_operation_mock.assert_called_once_with(extract_model_mock.return_value)
+ add_mock.assert_called_once_with(get_operation_mock.return_value, params)
+ edit_mock.assert_not_called()
+
+
+# functional tests below
+class TestUpsertOperationFunctionalTests(object):
+
+ @pytest.fixture(autouse=True)
+ def connection_mock(self, mocker):
+ connection_class_mock = mocker.patch('ansible.modules.network.ftd.ftd_configuration.Connection')
+ connection_instance = connection_class_mock.return_value
+ connection_instance.validate_data.return_value = True, None
+ connection_instance.validate_query_params.return_value = True, None
+ connection_instance.validate_path_params.return_value = True, None
+ return connection_instance
+
+ def test_module_should_create_object_when_upsert_operation_and_object_does_not_exist(self, connection_mock):
+ url = '/test'
+
+ operations = {
+ 'getObjectList': {
+ 'method': HTTPMethod.GET,
+ 'url': url,
+ 'modelName': 'Object',
+ 'returnMultipleItems': True},
+ 'addObject': {
+ 'method': HTTPMethod.POST,
+ 'modelName': 'Object',
+ 'url': url},
+ 'editObject': {
+ 'method': HTTPMethod.PUT,
+ 'modelName': 'Object',
+ 'url': '/test/{objId}'},
+ 'otherObjectOperation': {
+ 'method': HTTPMethod.GET,
+ 'modelName': 'Object',
+ 'url': '/test/{objId}',
+ 'returnMultipleItems': False
+ }
+ }
+
+ def get_operation_spec(name):
+ return operations[name]
+
+ connection_mock.get_operation_spec = get_operation_spec
+
+ connection_mock.get_operation_specs_by_model_name.return_value = operations
+ connection_mock.send_request.return_value = {
+ ResponseParams.SUCCESS: True,
+ ResponseParams.RESPONSE: ADD_RESPONSE
+ }
+ params = {
+ 'operation': 'upsertObject',
+ 'data': {'id': '123', 'name': 'testObject', 'type': 'object'},
+ 'path_params': {'objId': '123'},
+ 'register_as': 'test_var'
+ }
+
+ result = self._resource_execute_operation(params, connection=connection_mock)
+
+ connection_mock.send_request.assert_called_once_with(url_path=url,
+ http_method=HTTPMethod.POST,
+ path_params=params['path_params'],
+ query_params={},
+ body_params=params['data'])
+ assert ADD_RESPONSE == result
+
+ # test when object exists but with different fields(except id)
+ def test_module_should_update_object_when_upsert_operation_and_object_exists(self, connection_mock):
+ url = '/test'
+ obj_id = '456'
+ version = 'test_version'
+ url_with_id_templ = '{0}/{1}'.format(url, '{objId}')
+
+ new_value = '0000'
+ old_value = '1111'
+ params = {
+ 'operation': 'upsertObject',
+ 'data': {'name': 'testObject', 'value': new_value, 'type': 'object'},
+ 'register_as': 'test_var'
+ }
+
+ def request_handler(url_path=None, http_method=None, body_params=None, path_params=None, query_params=None):
+ if http_method == HTTPMethod.POST:
+ assert url_path == url
+ assert body_params == params['data']
+ assert query_params == {}
+ assert path_params == {}
+ return {
+ ResponseParams.SUCCESS: False,
+ ResponseParams.RESPONSE: DUPLICATE_NAME_ERROR_MESSAGE,
+ ResponseParams.STATUS_CODE: UNPROCESSABLE_ENTITY_STATUS
+ }
+ elif http_method == HTTPMethod.GET:
+ is_get_list_req = url_path == url
+ is_get_req = url_path == url_with_id_templ
+ assert is_get_req or is_get_list_req
+
+ if is_get_list_req:
+ assert body_params == {}
+ assert query_params == {QueryParams.FILTER: 'name:testObject', 'limit': 10, 'offset': 0}
+ assert path_params == {}
+ elif is_get_req:
+ assert body_params == {}
+ assert query_params == {}
+ assert path_params == {'objId': obj_id}
+
+ return {
+ ResponseParams.SUCCESS: True,
+ ResponseParams.RESPONSE: {
+ 'items': [
+ {'name': 'testObject', 'value': old_value, 'type': 'object', 'id': obj_id,
+ 'version': version}
+ ]
+ }
+ }
+ elif http_method == HTTPMethod.PUT:
+ assert url_path == url_with_id_templ
+ return {
+ ResponseParams.SUCCESS: True,
+ ResponseParams.RESPONSE: body_params
+ }
+ else:
+ assert False
+
+ operations = {
+ 'getObjectList': {'method': HTTPMethod.GET, 'url': url, 'modelName': 'Object', 'returnMultipleItems': True},
+ 'addObject': {'method': HTTPMethod.POST, 'modelName': 'Object', 'url': url},
+ 'editObject': {'method': HTTPMethod.PUT, 'modelName': 'Object', 'url': url_with_id_templ},
+ 'otherObjectOperation': {
+ 'method': HTTPMethod.GET,
+ 'modelName': 'Object',
+ 'url': url_with_id_templ,
+ 'returnMultipleItems': False}
+ }
+
+ def get_operation_spec(name):
+ return operations[name]
+
+ connection_mock.get_operation_spec = get_operation_spec
+ connection_mock.get_operation_specs_by_model_name.return_value = operations
+
+ connection_mock.send_request = request_handler
+ expected_val = {'name': 'testObject', 'value': new_value, 'type': 'object', 'id': obj_id, 'version': version}
+
+ result = self._resource_execute_operation(params, connection=connection_mock)
+
+ assert expected_val == result
+
+ # test when object exists and all fields have the same value
+ def test_module_should_not_update_object_when_upsert_operation_and_object_exists_with_the_same_fields(
+ self, connection_mock):
+ url = '/test'
+ url_with_id_templ = '{0}/{1}'.format(url, '{objId}')
+
+ params = {
+ 'operation': 'upsertObject',
+ 'data': {'name': 'testObject', 'value': '3333', 'type': 'object'},
+ 'register_as': 'test_var'
+ }
+
+ expected_val = copy.deepcopy(params['data'])
+ expected_val['version'] = 'test_version'
+ expected_val['id'] = 'test_id'
+
+ def request_handler(url_path=None, http_method=None, body_params=None, path_params=None, query_params=None):
+ if http_method == HTTPMethod.POST:
+ assert url_path == url
+ assert body_params == params['data']
+ assert query_params == {}
+ assert path_params == {}
+ return {
+ ResponseParams.SUCCESS: False,
+ ResponseParams.RESPONSE: DUPLICATE_NAME_ERROR_MESSAGE,
+ ResponseParams.STATUS_CODE: UNPROCESSABLE_ENTITY_STATUS
+ }
+ elif http_method == HTTPMethod.GET:
+ assert url_path == url
+ assert body_params == {}
+ assert query_params == {QueryParams.FILTER: 'name:testObject', 'limit': 10, 'offset': 0}
+ assert path_params == {}
+
+ return {
+ ResponseParams.SUCCESS: True,
+ ResponseParams.RESPONSE: {
+ 'items': [expected_val]
+ }
+ }
+ else:
+ assert False
+
+ operations = {
+ 'getObjectList': {'method': HTTPMethod.GET, 'modelName': 'Object', 'url': url, 'returnMultipleItems': True},
+ 'addObject': {'method': HTTPMethod.POST, 'modelName': 'Object', 'url': url},
+ 'editObject': {'method': HTTPMethod.PUT, 'modelName': 'Object', 'url': url_with_id_templ},
+ 'otherObjectOperation': {
+ 'method': HTTPMethod.GET,
+ 'modelName': 'Object',
+ 'url': url_with_id_templ,
+ 'returnMultipleItems': False}
+ }
+
+ def get_operation_spec(name):
+ return operations[name]
+
+ connection_mock.get_operation_spec = get_operation_spec
+ connection_mock.get_operation_specs_by_model_name.return_value = operations
+ connection_mock.send_request = request_handler
+
+ result = self._resource_execute_operation(params, connection=connection_mock)
+
+ assert expected_val == result
+
+ def test_module_should_fail_when_upsert_operation_is_not_supported(self, connection_mock):
+ connection_mock.get_operation_specs_by_model_name.return_value = {
+ 'addObject': {'method': HTTPMethod.POST, 'modelName': 'Object', 'url': '/test'},
+ 'editObject': {'method': HTTPMethod.PUT, 'modelName': 'Object', 'url': '/test/{objId}'},
+ 'otherObjectOperation': {
+ 'method': HTTPMethod.GET,
+ 'modelName': 'Object',
+ 'url': '/test/{objId}',
+ 'returnMultipleItems': False}
+ }
+ operation_name = 'upsertObject'
+ params = {
+ 'operation': operation_name,
+ 'data': {'id': '123', 'name': 'testObject', 'type': 'object'},
+ 'path_params': {'objId': '123'},
+ 'register_as': 'test_var'
+ }
+
+ result = self._resource_execute_operation_with_expected_failure(
+ expected_exception_class=FtdInvalidOperationNameError,
+ params=params, connection=connection_mock)
+
+ connection_mock.send_request.assert_not_called()
+ assert operation_name == result.operation_name
+
+ # when create operation raised FtdConfigurationError exception without id and version
+ def test_module_should_fail_when_upsert_operation_and_failed_create_without_id_and_version(self, connection_mock):
+ url = '/test'
+ url_with_id_templ = '{0}/{1}'.format(url, '{objId}')
+
+ params = {
+ 'operation': 'upsertObject',
+ 'data': {'name': 'testObject', 'value': '3333', 'type': 'object'},
+ 'register_as': 'test_var'
+ }
+
+ def request_handler(url_path=None, http_method=None, body_params=None, path_params=None, query_params=None):
+ if http_method == HTTPMethod.POST:
+ assert url_path == url
+ assert body_params == params['data']
+ assert query_params == {}
+ assert path_params == {}
+ return {
+ ResponseParams.SUCCESS: False,
+ ResponseParams.RESPONSE: DUPLICATE_NAME_ERROR_MESSAGE,
+ ResponseParams.STATUS_CODE: UNPROCESSABLE_ENTITY_STATUS
+ }
+ elif http_method == HTTPMethod.GET:
+ assert url_path == url
+ assert body_params == {}
+ assert query_params == {QueryParams.FILTER: 'name:testObject', 'limit': 10, 'offset': 0}
+ assert path_params == {}
+
+ return {
+ ResponseParams.SUCCESS: True,
+ ResponseParams.RESPONSE: {
+ 'items': []
+ }
+ }
+ else:
+ assert False
+
+ operations = {
+ 'getObjectList': {'method': HTTPMethod.GET, 'modelName': 'Object', 'url': url, 'returnMultipleItems': True},
+ 'addObject': {'method': HTTPMethod.POST, 'modelName': 'Object', 'url': url},
+ 'editObject': {'method': HTTPMethod.PUT, 'modelName': 'Object', 'url': url_with_id_templ},
+ 'otherObjectOperation': {
+ 'method': HTTPMethod.GET,
+ 'modelName': 'Object',
+ 'url': url_with_id_templ,
+ 'returnMultipleItems': False}
+ }
+
+ def get_operation_spec(name):
+ return operations[name]
+
+ connection_mock.get_operation_spec = get_operation_spec
+ connection_mock.get_operation_specs_by_model_name.return_value = operations
+ connection_mock.send_request = request_handler
+
+ result = self._resource_execute_operation_with_expected_failure(
+ expected_exception_class=FtdServerError,
+ params=params, connection=connection_mock)
+
+ assert result.code == 422
+ assert result.response == 'Validation failed due to a duplicate name'
+
+ def test_module_should_fail_when_upsert_operation_and_failed_update_operation(self, connection_mock):
+ url = '/test'
+ obj_id = '456'
+ version = 'test_version'
+ url_with_id_templ = '{0}/{1}'.format(url, '{objId}')
+
+ error_code = 404
+
+ new_value = '0000'
+ old_value = '1111'
+ params = {
+ 'operation': 'upsertObject',
+ 'data': {'name': 'testObject', 'value': new_value, 'type': 'object'},
+ 'register_as': 'test_var'
+ }
+
+ error_msg = 'test error'
+
+ def request_handler(url_path=None, http_method=None, body_params=None, path_params=None, query_params=None):
+ if http_method == HTTPMethod.POST:
+ assert url_path == url
+ assert body_params == params['data']
+ assert query_params == {}
+ assert path_params == {}
+ return {
+ ResponseParams.SUCCESS: False,
+ ResponseParams.RESPONSE: DUPLICATE_NAME_ERROR_MESSAGE,
+ ResponseParams.STATUS_CODE: UNPROCESSABLE_ENTITY_STATUS
+ }
+ elif http_method == HTTPMethod.GET:
+ is_get_list_req = url_path == url
+ is_get_req = url_path == url_with_id_templ
+ assert is_get_req or is_get_list_req
+
+ if is_get_list_req:
+ assert body_params == {}
+ assert query_params == {QueryParams.FILTER: 'name:testObject', 'limit': 10, 'offset': 0}
+ elif is_get_req:
+ assert body_params == {}
+ assert query_params == {}
+ assert path_params == {'objId': obj_id}
+
+ return {
+ ResponseParams.SUCCESS: True,
+ ResponseParams.RESPONSE: {
+ 'items': [
+ {'name': 'testObject', 'value': old_value, 'type': 'object', 'id': obj_id,
+ 'version': version}
+ ]
+ }
+ }
+ elif http_method == HTTPMethod.PUT:
+ assert url_path == url_with_id_templ
+ raise FtdServerError(error_msg, error_code)
+ else:
+ assert False
+
+ operations = {
+ 'getObjectList': {'method': HTTPMethod.GET, 'modelName': 'Object', 'url': url, 'returnMultipleItems': True},
+ 'addObject': {'method': HTTPMethod.POST, 'modelName': 'Object', 'url': url},
+ 'editObject': {'method': HTTPMethod.PUT, 'modelName': 'Object', 'url': url_with_id_templ},
+ 'otherObjectOperation': {
+ 'method': HTTPMethod.GET,
+ 'modelName': 'Object',
+ 'url': url_with_id_templ,
+ 'returnMultipleItems': False}
+ }
+
+ def get_operation_spec(name):
+ return operations[name]
+
+ connection_mock.get_operation_spec = get_operation_spec
+ connection_mock.get_operation_specs_by_model_name.return_value = operations
+ connection_mock.send_request = request_handler
+
+ result = self._resource_execute_operation_with_expected_failure(
+ expected_exception_class=FtdServerError,
+ params=params, connection=connection_mock)
+
+ assert result.code == error_code
+ assert result.response == error_msg
+
+ def test_module_should_fail_when_upsert_operation_and_invalid_data_for_create_operation(self, connection_mock):
+ new_value = '0000'
+ params = {
+ 'operation': 'upsertObject',
+ 'data': {'name': 'testObject', 'value': new_value, 'type': 'object'},
+ 'register_as': 'test_var'
+ }
+
+ connection_mock.send_request.assert_not_called()
+
+ operations = {
+ 'getObjectList': {
+ 'method': HTTPMethod.GET,
+ 'modelName': 'Object',
+ 'url': 'sd',
+ 'returnMultipleItems': True},
+ 'addObject': {'method': HTTPMethod.POST, 'modelName': 'Object', 'url': 'sdf'},
+ 'editObject': {'method': HTTPMethod.PUT, 'modelName': 'Object', 'url': 'sadf'},
+ 'otherObjectOperation': {
+ 'method': HTTPMethod.GET,
+ 'modelName': 'Object',
+ 'url': 'sdfs',
+ 'returnMultipleItems': False}
+ }
+
+ def get_operation_spec(name):
+ return operations[name]
+
+ connection_mock.get_operation_spec = get_operation_spec
+ connection_mock.get_operation_specs_by_model_name.return_value = operations
+
+ report = {
+ 'required': ['objects[0].type'],
+ 'invalid_type': [
+ {
+ 'path': 'objects[3].id',
+ 'expected_type': 'string',
+ 'actually_value': 1
+ }
+ ]
+ }
+ connection_mock.validate_data.return_value = (False, json.dumps(report, sort_keys=True, indent=4))
+ key = 'Invalid data provided'
+
+ result = self._resource_execute_operation_with_expected_failure(
+ expected_exception_class=ValidationError,
+ params=params, connection=connection_mock)
+
+ assert len(result.args) == 1
+ assert key in result.args[0]
+ assert json.loads(result.args[0][key]) == {
+ 'invalid_type': [{'actually_value': 1, 'expected_type': 'string', 'path': 'objects[3].id'}],
+ 'required': ['objects[0].type']
+ }
+
+ def test_module_should_fail_when_upsert_operation_and_few_objects_found_by_filter(self, connection_mock):
+ url = '/test'
+ url_with_id_templ = '{0}/{1}'.format(url, '{objId}')
+
+ sample_obj = {'name': 'testObject', 'value': '3333', 'type': 'object'}
+ params = {
+ 'operation': 'upsertObject',
+ 'data': sample_obj,
+ 'register_as': 'test_var'
+ }
+
+ def request_handler(url_path=None, http_method=None, body_params=None, path_params=None, query_params=None):
+ if http_method == HTTPMethod.POST:
+ assert url_path == url
+ assert body_params == params['data']
+ assert query_params == {}
+ assert path_params == {}
+ return {
+ ResponseParams.SUCCESS: False,
+ ResponseParams.RESPONSE: DUPLICATE_NAME_ERROR_MESSAGE,
+ ResponseParams.STATUS_CODE: UNPROCESSABLE_ENTITY_STATUS
+ }
+ elif http_method == HTTPMethod.GET:
+ assert url_path == url
+ assert body_params == {}
+ assert query_params == {QueryParams.FILTER: 'name:testObject', 'limit': 10, 'offset': 0}
+ assert path_params == {}
+
+ return {
+ ResponseParams.SUCCESS: True,
+ ResponseParams.RESPONSE: {
+ 'items': [sample_obj, sample_obj]
+ }
+ }
+ else:
+ assert False
+
+ operations = {
+ 'getObjectList': {'method': HTTPMethod.GET, 'modelName': 'Object', 'url': url, 'returnMultipleItems': True},
+ 'addObject': {'method': HTTPMethod.POST, 'modelName': 'Object', 'url': url},
+ 'editObject': {'method': HTTPMethod.PUT, 'modelName': 'Object', 'url': url_with_id_templ},
+ 'otherObjectOperation': {
+ 'method': HTTPMethod.GET,
+ 'modelName': 'Object',
+ 'url': url_with_id_templ,
+ 'returnMultipleItems': False}
+ }
+
+ def get_operation_spec(name):
+ return operations[name]
+
+ connection_mock.get_operation_spec = get_operation_spec
+ connection_mock.get_operation_specs_by_model_name.return_value = operations
+ connection_mock.send_request = request_handler
+
+ result = self._resource_execute_operation_with_expected_failure(
+ expected_exception_class=FtdConfigurationError,
+ params=params, connection=connection_mock)
+
+ assert result.msg is MULTIPLE_DUPLICATES_FOUND_ERROR
+ assert result.obj is None
+
+ @staticmethod
+ def _resource_execute_operation(params, connection):
+ resource = BaseConfigurationResource(connection)
+ op_name = params['operation']
+
+ resp = resource.execute_operation(op_name, params)
+
+ return resp
+
+ def _resource_execute_operation_with_expected_failure(self, expected_exception_class, params, connection):
+ with pytest.raises(expected_exception_class) as ex:
+ self._resource_execute_operation(params, connection)
+ # 'ex' here is the instance of '_pytest._code.code.ExceptionInfo' but not
+ # actual instance of is in the value attribute of 'ex'. That's why we should return
+ # 'ex.value' here, so it can be checked in a test later.
+ return ex.value
diff --git a/test/units/modules/network/ftd/test_ftd_configuration.py b/test/units/modules/network/ftd/test_ftd_configuration.py
index 95358921190..0383b71c89a 100644
--- a/test/units/modules/network/ftd/test_ftd_configuration.py
+++ b/test/units/modules/network/ftd/test_ftd_configuration.py
@@ -18,20 +18,14 @@
from __future__ import absolute_import
-import json
-
import pytest
-
-from ansible.module_utils import basic
-from ansible.module_utils.network.ftd.common import HTTPMethod, FtdConfigurationError, FtdServerError
-from ansible.modules.network.ftd import ftd_configuration
from units.modules.utils import set_module_args, exit_json, fail_json, AnsibleFailJson, AnsibleExitJson
-ADD_RESPONSE = {'status': 'Object added'}
-EDIT_RESPONSE = {'status': 'Object edited'}
-DELETE_RESPONSE = {'status': 'Object deleted'}
-GET_BY_FILTER_RESPONSE = [{'name': 'foo', 'description': 'bar'}]
-ARBITRARY_RESPONSE = {'status': 'Arbitrary request sent'}
+from ansible.module_utils import basic
+from ansible.module_utils.network.ftd.common import FtdConfigurationError, FtdServerError, FtdUnexpectedResponse
+from ansible.module_utils.network.ftd.configuration import FtdInvalidOperationNameError, CheckModeException
+from ansible.module_utils.network.ftd.fdm_swagger_client import ValidationError
+from ansible.modules.network.ftd import ftd_configuration
class TestFtdConfiguration(object):
@@ -41,295 +35,80 @@ class TestFtdConfiguration(object):
def module_mock(self, mocker):
return mocker.patch.multiple(basic.AnsibleModule, exit_json=exit_json, fail_json=fail_json)
- @pytest.fixture
+ @pytest.fixture(autouse=True)
def connection_mock(self, mocker):
connection_class_mock = mocker.patch('ansible.modules.network.ftd.ftd_configuration.Connection')
- connection_instance = connection_class_mock.return_value
- connection_instance.validate_data.return_value = True, None
- connection_instance.validate_query_params.return_value = True, None
- connection_instance.validate_path_params.return_value = True, None
-
- return connection_instance
+ return connection_class_mock.return_value
@pytest.fixture
def resource_mock(self, mocker):
resource_class_mock = mocker.patch('ansible.modules.network.ftd.ftd_configuration.BaseConfigurationResource')
resource_instance = resource_class_mock.return_value
- resource_instance.add_object.return_value = ADD_RESPONSE
- resource_instance.edit_object.return_value = EDIT_RESPONSE
- resource_instance.delete_object.return_value = DELETE_RESPONSE
- resource_instance.send_request.return_value = ARBITRARY_RESPONSE
- resource_instance.get_objects_by_filter.return_value = GET_BY_FILTER_RESPONSE
- return resource_instance
+ return resource_instance.execute_operation
- def test_module_should_fail_without_operation_arg(self):
- set_module_args({})
+ def test_module_should_fail_when_ftd_invalid_operation_name_error(self, resource_mock):
+ operation_name = 'test name'
+ resource_mock.side_effect = FtdInvalidOperationNameError(operation_name)
- with pytest.raises(AnsibleFailJson) as ex:
- self.module.main()
-
- assert 'missing required arguments: operation' in str(ex)
-
- def test_module_should_fail_when_no_operation_spec_found(self, connection_mock):
- connection_mock.get_operation_spec.return_value = None
- set_module_args({'operation': 'nonExistingOperation'})
-
- with pytest.raises(AnsibleFailJson) as ex:
- self.module.main()
-
- assert 'Invalid operation name provided: nonExistingOperation' in str(ex)
-
- def test_module_should_add_object_when_add_operation(self, connection_mock, resource_mock):
- connection_mock.get_operation_spec.return_value = {
- 'method': HTTPMethod.POST,
- 'url': '/object'
- }
-
- params = {
- 'operation': 'addObject',
- 'data': {'name': 'testObject', 'type': 'object'}
- }
- result = self._run_module(params)
-
- assert ADD_RESPONSE == result['response']
- resource_mock.add_object.assert_called_with(connection_mock.get_operation_spec.return_value['url'],
- params['data'], None, None)
-
- def test_module_should_edit_object_when_edit_operation(self, connection_mock, resource_mock):
- connection_mock.get_operation_spec.return_value = {
- 'method': HTTPMethod.PUT,
- 'url': '/object/{objId}'
- }
-
- params = {
- 'operation': 'editObject',
- 'data': {'id': '123', 'name': 'testObject', 'type': 'object'},
- 'path_params': {'objId': '123'}
- }
- result = self._run_module(params)
-
- assert EDIT_RESPONSE == result['response']
- resource_mock.edit_object.assert_called_with(connection_mock.get_operation_spec.return_value['url'],
- params['data'],
- params['path_params'], None)
-
- def test_module_should_delete_object_when_delete_operation(self, connection_mock, resource_mock):
- connection_mock.get_operation_spec.return_value = {
- 'method': HTTPMethod.DELETE,
- 'url': '/object/{objId}'
- }
-
- params = {
- 'operation': 'deleteObject',
- 'path_params': {'objId': '123'}
- }
- result = self._run_module(params)
-
- assert DELETE_RESPONSE == result['response']
- resource_mock.delete_object.assert_called_with(connection_mock.get_operation_spec.return_value['url'],
- params['path_params'])
-
- def test_module_should_get_objects_by_filter_when_find_by_filter_operation(self, connection_mock, resource_mock):
- connection_mock.get_operation_spec.return_value = {
- 'method': HTTPMethod.GET,
- 'url': '/objects'
- }
-
- params = {
- 'operation': 'getObjectList',
- 'filters': {'name': 'foo'}
- }
- result = self._run_module(params)
-
- assert GET_BY_FILTER_RESPONSE == result['response']
- resource_mock.get_objects_by_filter.assert_called_with(connection_mock.get_operation_spec.return_value['url'],
- params['filters'],
- None, None)
-
- def test_module_should_send_request_when_arbitrary_operation(self, connection_mock, resource_mock):
- connection_mock.get_operation_spec.return_value = {
- 'method': HTTPMethod.GET,
- 'url': '/object/status/{objId}'
- }
-
- params = {
- 'operation': 'checkStatus',
- 'path_params': {'objId': '123'}
- }
- result = self._run_module(params)
-
- assert ARBITRARY_RESPONSE == result['response']
- resource_mock.send_request.assert_called_with(connection_mock.get_operation_spec.return_value['url'],
- HTTPMethod.GET, None,
- params['path_params'], None)
-
- def test_module_should_fail_when_operation_raises_configuration_error(self, connection_mock, resource_mock):
- connection_mock.get_operation_spec.return_value = {'method': HTTPMethod.GET, 'url': '/test'}
- resource_mock.send_request.side_effect = FtdConfigurationError('Foo error.')
-
- result = self._run_module_with_fail_json({'operation': 'failure'})
+ result = self._run_module_with_fail_json({'operation': operation_name})
assert result['failed']
- assert 'Failed to execute failure operation because of the configuration error: Foo error.' == result['msg']
+ assert 'Invalid operation name provided: %s' % operation_name == result['msg']
- def test_module_should_fail_when_operation_raises_server_error(self, connection_mock, resource_mock):
- connection_mock.get_operation_spec.return_value = {'method': HTTPMethod.GET, 'url': '/test'}
- resource_mock.send_request.side_effect = FtdServerError({'error': 'foo'}, 500)
+ def test_module_should_fail_when_ftd_configuration_error(self, resource_mock):
+ operation_name = 'test name'
+ msg = 'Foo error.'
+ resource_mock.side_effect = FtdConfigurationError(msg)
- result = self._run_module_with_fail_json({'operation': 'failure'})
+ result = self._run_module_with_fail_json({'operation': operation_name})
assert result['failed']
- assert 'Server returned an error trying to execute failure operation. Status code: 500. ' \
- 'Server response: {\'error\': \'foo\'}' == result['msg']
+ assert 'Failed to execute %s operation because of the configuration error: %s' % (operation_name, msg) == \
+ result['msg']
- def test_module_should_fail_if_validation_error_in_data(self, connection_mock):
- connection_mock.get_operation_spec.return_value = {'method': HTTPMethod.POST, 'url': '/test'}
- report = {
- 'required': ['objects[0].type'],
- 'invalid_type': [
- {
- 'path': 'objects[3].id',
- 'expected_type': 'string',
- 'actually_value': 1
- }
- ]
- }
- connection_mock.validate_data.return_value = (False, json.dumps(report, sort_keys=True, indent=4))
+ def test_module_should_fail_when_ftd_server_error(self, resource_mock):
+ operation_name = 'test name'
+ code = 500
+ response = {'error': 'foo'}
+ resource_mock.side_effect = FtdServerError(response, code)
- result = self._run_module_with_fail_json({
- 'operation': 'test',
- 'data': {}
- })
- key = 'Invalid data provided'
- assert result['msg'][key]
- result['msg'][key] = json.loads(result['msg'][key])
- assert result == {
- 'msg':
- {key: {
- 'invalid_type': [{'actually_value': 1, 'expected_type': 'string', 'path': 'objects[3].id'}],
- 'required': ['objects[0].type']
- }},
- 'failed': True}
+ result = self._run_module_with_fail_json({'operation': operation_name})
+ assert result['failed']
+ assert 'Server returned an error trying to execute %s operation. Status code: %s. ' \
+ 'Server response: %s' % (operation_name, code, response) == \
+ result['msg']
- def test_module_should_fail_if_validation_error_in_query_params(self, connection_mock):
- connection_mock.get_operation_spec.return_value = {'method': HTTPMethod.GET, 'url': '/test'}
- report = {
- 'required': ['objects[0].type'],
- 'invalid_type': [
- {
- 'path': 'objects[3].id',
- 'expected_type': 'string',
- 'actually_value': 1
- }
- ]
- }
- connection_mock.validate_query_params.return_value = (False, json.dumps(report, sort_keys=True, indent=4))
+ def test_module_should_fail_when_validation_error(self, resource_mock):
+ operation_name = 'test name'
+ msg = 'Foo error.'
+ resource_mock.side_effect = ValidationError(msg)
- result = self._run_module_with_fail_json({
- 'operation': 'test',
- 'data': {}
- })
- key = 'Invalid query_params provided'
- assert result['msg'][key]
- result['msg'][key] = json.loads(result['msg'][key])
+ result = self._run_module_with_fail_json({'operation': operation_name})
+ assert result['failed']
+ assert msg == result['msg']
- assert result == {'msg': {key: {
- 'invalid_type': [{'actually_value': 1, 'expected_type': 'string', 'path': 'objects[3].id'}],
- 'required': ['objects[0].type']}}, 'failed': True}
+ def test_module_should_fail_when_unexpected_server_response(self, resource_mock):
+ operation_name = 'test name'
+ msg = 'Foo error.'
+ resource_mock.side_effect = FtdUnexpectedResponse(msg)
- def test_module_should_fail_if_validation_error_in_path_params(self, connection_mock):
- connection_mock.get_operation_spec.return_value = {'method': HTTPMethod.GET, 'url': '/test'}
- report = {
- 'path_params': {
- 'required': ['objects[0].type'],
- 'invalid_type': [
- {
- 'path': 'objects[3].id',
- 'expected_type': 'string',
- 'actually_value': 1
- }
- ]
- }
- }
- connection_mock.validate_path_params.return_value = (False, json.dumps(report, sort_keys=True, indent=4))
+ result = self._run_module_with_fail_json({'operation': operation_name})
- result = self._run_module_with_fail_json({
- 'operation': 'test',
- 'data': {}
- })
- key = 'Invalid path_params provided'
- assert result['msg'][key]
- result['msg'][key] = json.loads(result['msg'][key])
+ assert result['failed']
+ assert msg == result['msg']
- assert result == {'msg': {key: {
- 'path_params': {
- 'invalid_type': [{'actually_value': 1, 'expected_type': 'string', 'path': 'objects[3].id'}],
- 'required': ['objects[0].type']}}}, 'failed': True}
+ def test_module_should_fail_when_check_mode_exception(self, resource_mock):
+ operation_name = 'test name'
+ msg = 'Foo error.'
+ resource_mock.side_effect = CheckModeException(msg)
- def test_module_should_fail_if_validation_error_in_all_params(self, connection_mock):
- connection_mock.get_operation_spec.return_value = {'method': HTTPMethod.POST, 'url': '/test'}
- report = {
- 'data': {
- 'required': ['objects[0].type'],
- 'invalid_type': [
- {
- 'path': 'objects[3].id',
- 'expected_type': 'string',
- 'actually_value': 1
- }
- ]
- },
- 'path_params': {
- 'required': ['some_param'],
- 'invalid_type': [
- {
- 'path': 'name',
- 'expected_type': 'string',
- 'actually_value': True
- }
- ]
- },
- 'query_params': {
- 'required': ['other_param'],
- 'invalid_type': [
- {
- 'path': 'f_integer',
- 'expected_type': 'integer',
- 'actually_value': "test"
- }
- ]
- }
- }
- connection_mock.validate_data.return_value = (False, json.dumps(report['data'], sort_keys=True, indent=4))
- connection_mock.validate_query_params.return_value = (False,
- json.dumps(report['query_params'], sort_keys=True,
- indent=4))
- connection_mock.validate_path_params.return_value = (False,
- json.dumps(report['path_params'], sort_keys=True,
- indent=4))
+ result = self._run_module({'operation': operation_name})
+ assert not result['changed']
- result = self._run_module_with_fail_json({
- 'operation': 'test',
- 'data': {}
- })
- key_data = 'Invalid data provided'
- assert result['msg'][key_data]
- result['msg'][key_data] = json.loads(result['msg'][key_data])
+ def test_module_should_run_successful(self, resource_mock):
+ operation_name = 'test name'
+ resource_mock.return_value = 'ok'
- key_path_params = 'Invalid path_params provided'
- assert result['msg'][key_path_params]
- result['msg'][key_path_params] = json.loads(result['msg'][key_path_params])
-
- key_query_params = 'Invalid query_params provided'
- assert result['msg'][key_query_params]
- result['msg'][key_query_params] = json.loads(result['msg'][key_query_params])
-
- assert result == {'msg': {
- key_data: {'invalid_type': [{'actually_value': 1, 'expected_type': 'string', 'path': 'objects[3].id'}],
- 'required': ['objects[0].type']},
- key_path_params: {'invalid_type': [{'actually_value': True, 'expected_type': 'string', 'path': 'name'}],
- 'required': ['some_param']},
- key_query_params: {
- 'invalid_type': [{'actually_value': 'test', 'expected_type': 'integer', 'path': 'f_integer'}],
- 'required': ['other_param']}}, 'failed': True}
+ result = self._run_module({'operation': operation_name})
+ assert result['response'] == 'ok'
def _run_module(self, module_args):
set_module_args(module_args)
diff --git a/test/units/modules/network/ftd/test_ftd_file_upload.py b/test/units/modules/network/ftd/test_ftd_file_upload.py
index f05b6f787cf..dd37c3fc97d 100644
--- a/test/units/modules/network/ftd/test_ftd_file_upload.py
+++ b/test/units/modules/network/ftd/test_ftd_file_upload.py
@@ -39,9 +39,9 @@ class TestFtdFileUpload(object):
connection_class_mock = mocker.patch('ansible.modules.network.ftd.ftd_file_upload.Connection')
return connection_class_mock.return_value
- @pytest.mark.parametrize("missing_arg", ['operation', 'fileToUpload'])
+ @pytest.mark.parametrize("missing_arg", ['operation', 'file_to_upload'])
def test_module_should_fail_without_required_args(self, missing_arg):
- module_args = {'operation': 'uploadFile', 'fileToUpload': '/tmp/test.txt'}
+ module_args = {'operation': 'uploadFile', 'file_to_upload': '/tmp/test.txt'}
del module_args[missing_arg]
set_module_args(module_args)
@@ -52,7 +52,7 @@ class TestFtdFileUpload(object):
def test_module_should_fail_when_no_operation_spec_found(self, connection_mock):
connection_mock.get_operation_spec.return_value = None
- set_module_args({'operation': 'nonExistingUploadOperation', 'fileToUpload': '/tmp/test.txt'})
+ set_module_args({'operation': 'nonExistingUploadOperation', 'file_to_upload': '/tmp/test.txt'})
with pytest.raises(AnsibleFailJson) as ex:
self.module.main()
@@ -67,7 +67,7 @@ class TestFtdFileUpload(object):
OperationField.URL: '/object/network',
OperationField.MODEL_NAME: 'NetworkObject'
}
- set_module_args({'operation': 'nonUploadOperation', 'fileToUpload': '/tmp/test.txt'})
+ set_module_args({'operation': 'nonUploadOperation', 'file_to_upload': '/tmp/test.txt'})
with pytest.raises(AnsibleFailJson) as ex:
self.module.main()
@@ -87,7 +87,7 @@ class TestFtdFileUpload(object):
set_module_args({
'operation': 'uploadFile',
- 'fileToUpload': '/tmp/test.txt'
+ 'file_to_upload': '/tmp/test.txt'
})
with pytest.raises(AnsibleExitJson) as ex:
self.module.main()
diff --git a/test/units/plugins/httpapi/test_ftd.py b/test/units/plugins/httpapi/test_ftd.py
index 4573674821e..7bfddba742d 100644
--- a/test/units/plugins/httpapi/test_ftd.py
+++ b/test/units/plugins/httpapi/test_ftd.py
@@ -17,20 +17,18 @@
#
import json
-import os
-from io import BytesIO
from ansible.module_utils.six.moves.urllib.error import HTTPError
-
from units.compat import mock
from units.compat import unittest
from units.compat.builtins import BUILTINS
from units.compat.mock import mock_open, patch
+
from ansible.errors import AnsibleConnectionFailure
from ansible.module_utils.connection import ConnectionError
from ansible.module_utils.network.ftd.common import HTTPMethod, ResponseParams
from ansible.module_utils.network.ftd.fdm_swagger_client import SpecProp, FdmSwaggerParser
-from ansible.module_utils.six import PY3, StringIO
+from ansible.module_utils.six import BytesIO, StringIO
from ansible.plugins.httpapi.ftd import HttpApi
EXPECTED_BASE_HEADERS = {
@@ -114,6 +112,15 @@ class TestFtdHttpApi(unittest.TestCase):
assert 'Server returned response without token info during connection authentication' in str(res.exception)
+ def test_login_raises_exception_when_http_error(self):
+ self.connection_mock.send.side_effect = HTTPError('http://testhost.com', 400, '', {},
+ StringIO('{"message": "Failed to authenticate user"}'))
+
+ with self.assertRaises(ConnectionError) as res:
+ self.ftd_plugin.login('foo', 'bar')
+
+ assert 'Failed to authenticate user' in str(res.exception)
+
def test_logout_should_revoke_tokens(self):
self.ftd_plugin.access_token = 'ACCESS_TOKEN_TO_REVOKE'
self.ftd_plugin.refresh_token = 'REFRESH_TOKEN_TO_REVOKE'
@@ -182,6 +189,10 @@ class TestFtdHttpApi(unittest.TestCase):
def test_handle_httperror_should_not_retry_on_non_auth_errors(self):
assert not self.ftd_plugin.handle_httperror(HTTPError('http://testhost.com', 500, '', {}, None))
+ def test_handle_httperror_should_not_retry_when_ignoring_http_errors(self):
+ self.ftd_plugin._ignore_http_errors = True
+ assert not self.ftd_plugin.handle_httperror(HTTPError('http://testhost.com', 401, '', {}, None))
+
@patch('os.path.isdir', mock.Mock(return_value=False))
def test_download_file(self):
self.connection_mock.send.return_value = self._connection_response('File content')
@@ -259,6 +270,44 @@ class TestFtdHttpApi(unittest.TestCase):
assert 'Specification for TestModel' == self.ftd_plugin.get_model_spec('TestModel')
assert self.ftd_plugin.get_model_spec('NonExistingTestModel') is None
+ @patch.object(FdmSwaggerParser, 'parse_spec')
+ def test_get_model_spec(self, parse_spec_mock):
+ self.connection_mock.send.return_value = self._connection_response(None)
+ operation1 = {'modelName': 'TestModel'}
+ op_model_name_is_none = {'modelName': None}
+ op_without_model_name = {'url': 'testUrl'}
+
+ parse_spec_mock.return_value = {
+ SpecProp.MODEL_OPERATIONS: {
+ 'TestModel': {
+ 'testOp1': operation1,
+ 'testOp2': 'spec2'
+ },
+ 'TestModel2': {
+ 'testOp10': 'spec10',
+ 'testOp20': 'spec20'
+ }
+ },
+ SpecProp.OPERATIONS: {
+ 'testOp1': operation1,
+ 'testOp10': {
+ 'modelName': 'TestModel2'
+ },
+ 'testOpWithoutModelName': op_without_model_name,
+ 'testOpModelNameIsNone': op_model_name_is_none
+ }
+ }
+
+ assert {'testOp1': operation1, 'testOp2': 'spec2'} == self.ftd_plugin.get_operation_specs_by_model_name(
+ 'TestModel')
+ assert None is self.ftd_plugin.get_operation_specs_by_model_name(
+ 'testOpModelNameIsNone')
+
+ assert None is self.ftd_plugin.get_operation_specs_by_model_name(
+ 'testOpWithoutModelName')
+
+ assert self.ftd_plugin.get_operation_specs_by_model_name('nonExistingOperation') is None
+
@staticmethod
def _connection_response(response, status=200):
response_mock = mock.Mock()