[stable-2.7] FTD HTTP Api plugin bug fixes (#47747) (#48982)

* FTD modules: upsert functionality and bug fixes (#47747)

* FTD modules: bug fixes and upsert functionality

* Fix sanity checks

* Fix unit tests for Python 2.6

* Log status code for login/logout

* Use string formatting in logging

(cherry picked from commit 9770ac70f9)

* Add changelog entry
This commit is contained in:
Anton Nikulin 2018-11-22 17:24:55 +02:00 committed by Toshio Kuratomi
parent 845776ed6a
commit 02a8121dea
16 changed files with 2235 additions and 544 deletions

View file

@ -0,0 +1,4 @@
bugfixes:
- Fix the issue that FTD HTTP API retries authentication-related HTTP requests.
- Fix the issue that module fails when the Swagger model does not have required fields.
- Fix the issue with comparing string-like objects.

View file

@ -18,6 +18,9 @@
import re
from ansible.module_utils._text import to_text
from ansible.module_utils.common.collections import is_string
INVALID_IDENTIFIER_SYMBOLS = r'[^a-zA-Z0-9_]'
IDENTITY_PROPERTIES = ['id', 'version', 'ruleId']
@ -38,7 +41,10 @@ class ResponseParams:
class FtdConfigurationError(Exception):
pass
def __init__(self, msg, obj=None):
super(FtdConfigurationError, self).__init__(msg)
self.msg = msg
self.obj = obj
class FtdServerError(Exception):
@ -48,6 +54,11 @@ class FtdServerError(Exception):
self.code = code
class FtdUnexpectedResponse(Exception):
"""The exception to be raised in case of unexpected responses from 3d parties."""
pass
def construct_ansible_facts(response, params):
facts = dict()
if response:
@ -149,6 +160,11 @@ def equal_values(v1, v2):
:return: True if types and content of passed values are equal. Otherwise, returns False.
:rtype: bool
"""
# string-like values might have same text but different types, so checking them separately
if is_string(v1) and is_string(v2):
return to_text(v1) == to_text(v2)
if type(v1) != type(v2):
return False
value_type = type(v1)

View file

@ -15,11 +15,13 @@
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
import copy
from functools import partial
from ansible.module_utils.network.ftd.common import HTTPMethod, equal_objects, copy_identity_properties, \
FtdConfigurationError, FtdServerError, ResponseParams
from ansible.module_utils.network.ftd.common import HTTPMethod, equal_objects, FtdConfigurationError, \
FtdServerError, ResponseParams, copy_identity_properties, FtdUnexpectedResponse
from ansible.module_utils.network.ftd.fdm_swagger_client import OperationField, ValidationError
from ansible.module_utils.six import iteritems
DEFAULT_PAGE_SIZE = 10
DEFAULT_OFFSET = 0
@ -28,86 +30,358 @@ UNPROCESSABLE_ENTITY_STATUS = 422
INVALID_UUID_ERROR_MESSAGE = "Validation failed due to an invalid UUID"
DUPLICATE_NAME_ERROR_MESSAGE = "Validation failed due to a duplicate name"
MULTIPLE_DUPLICATES_FOUND_ERROR = (
"Cannot add a new object. An object(s) with the same attributes exists."
"Multiple objects returned according to filters being specified. "
"Please specify more specific filters which can find exact object that caused duplication error")
class OperationNamePrefix:
ADD = 'add'
EDIT = 'edit'
GET = 'get'
DELETE = 'delete'
UPSERT = 'upsert'
class QueryParams:
FILTER = 'filter'
class ParamName:
QUERY_PARAMS = 'query_params'
PATH_PARAMS = 'path_params'
DATA = 'data'
FILTERS = 'filters'
class CheckModeException(Exception):
pass
class FtdInvalidOperationNameError(Exception):
def __init__(self, operation_name):
super(FtdInvalidOperationNameError, self).__init__(operation_name)
self.operation_name = operation_name
class OperationChecker(object):
@classmethod
def is_add_operation(cls, operation_name, operation_spec):
"""
Check if operation defined with 'operation_name' is add object operation according to 'operation_spec'.
:param operation_name: name of the operation being called by the user
:type operation_name: str
:param operation_spec: specification of the operation being called by the user
:type operation_spec: dict
:return: True if the called operation is add object operation, otherwise False
:rtype: bool
"""
# Some endpoints have non-CRUD operations, so checking operation name is required in addition to the HTTP method
return operation_name.startswith(OperationNamePrefix.ADD) and is_post_request(operation_spec)
@classmethod
def is_edit_operation(cls, operation_name, operation_spec):
"""
Check if operation defined with 'operation_name' is edit object operation according to 'operation_spec'.
:param operation_name: name of the operation being called by the user
:type operation_name: str
:param operation_spec: specification of the operation being called by the user
:type operation_spec: dict
:return: True if the called operation is edit object operation, otherwise False
:rtype: bool
"""
# Some endpoints have non-CRUD operations, so checking operation name is required in addition to the HTTP method
return operation_name.startswith(OperationNamePrefix.EDIT) and is_put_request(operation_spec)
@classmethod
def is_delete_operation(cls, operation_name, operation_spec):
"""
Check if operation defined with 'operation_name' is delete object operation according to 'operation_spec'.
:param operation_name: name of the operation being called by the user
:type operation_name: str
:param operation_spec: specification of the operation being called by the user
:type operation_spec: dict
:return: True if the called operation is delete object operation, otherwise False
:rtype: bool
"""
# Some endpoints have non-CRUD operations, so checking operation name is required in addition to the HTTP method
return operation_name.startswith(OperationNamePrefix.DELETE) \
and operation_spec[OperationField.METHOD] == HTTPMethod.DELETE
@classmethod
def is_get_list_operation(cls, operation_name, operation_spec):
"""
Check if operation defined with 'operation_name' is get list of objects operation according to 'operation_spec'.
:param operation_name: name of the operation being called by the user
:type operation_name: str
:param operation_spec: specification of the operation being called by the user
:type operation_spec: dict
:return: True if the called operation is get a list of objects operation, otherwise False
:rtype: bool
"""
return operation_spec[OperationField.METHOD] == HTTPMethod.GET \
and operation_spec[OperationField.RETURN_MULTIPLE_ITEMS]
@classmethod
def is_get_operation(cls, operation_name, operation_spec):
"""
Check if operation defined with 'operation_name' is get objects operation according to 'operation_spec'.
:param operation_name: name of the operation being called by the user
:type operation_name: str
:param operation_spec: specification of the operation being called by the user
:type operation_spec: dict
:return: True if the called operation is get object operation, otherwise False
:rtype: bool
"""
return operation_spec[OperationField.METHOD] == HTTPMethod.GET \
and not operation_spec[OperationField.RETURN_MULTIPLE_ITEMS]
@classmethod
def is_upsert_operation(cls, operation_name):
"""
Check if operation defined with 'operation_name' is upsert objects operation according to 'operation_name'.
:param operation_name: name of the operation being called by the user
:type operation_name: str
:return: True if the called operation is upsert object operation, otherwise False
:rtype: bool
"""
return operation_name.startswith(OperationNamePrefix.UPSERT)
@classmethod
def is_find_by_filter_operation(cls, operation_name, params, operation_spec):
"""
Checks whether the called operation is 'find by filter'. This operation fetches all objects and finds
the matching ones by the given filter. As filtering is done on the client side, this operation should be used
only when selected filters are not implemented on the server side.
:param operation_name: name of the operation being called by the user
:type operation_name: str
:param operation_spec: specification of the operation being called by the user
:type operation_spec: dict
:param params: params - params should contain 'filters'
:return: True if the called operation is find by filter, otherwise False
:rtype: bool
"""
is_get_list = cls.is_get_list_operation(operation_name, operation_spec)
return is_get_list and ParamName.FILTERS in params and params[ParamName.FILTERS]
@classmethod
def is_upsert_operation_supported(cls, operations):
"""
Checks if all operations required for upsert object operation are defined in 'operations'.
:param operations: specification of the operations supported by model
:type operations: dict
:return: True if all criteria required to provide requested called operation are satisfied, otherwise False
:rtype: bool
"""
amount_operations_need_for_upsert_operation = 3
amount_supported_operations = 0
for operation_name, operation_spec in operations.items():
if cls.is_add_operation(operation_name, operation_spec) \
or cls.is_edit_operation(operation_name, operation_spec) \
or cls.is_get_list_operation(operation_name, operation_spec):
amount_supported_operations += 1
return amount_supported_operations == amount_operations_need_for_upsert_operation
class BaseConfigurationResource(object):
def __init__(self, conn):
def __init__(self, conn, check_mode=False):
self._conn = conn
self.config_changed = False
self._operation_spec_cache = {}
self._models_operations_specs_cache = {}
self._check_mode = check_mode
self._operation_checker = OperationChecker
def get_object_by_name(self, url_path, name, path_params=None):
item_generator = iterate_over_pageable_resource(
partial(self.send_request, url_path=url_path, http_method=HTTPMethod.GET, path_params=path_params),
{'filter': 'name:%s' % name}
)
# not all endpoints support filtering so checking name explicitly
return next((item for item in item_generator if item['name'] == name), None)
def execute_operation(self, op_name, params):
"""
Allow user request execution of simple operations(natively supported by API provider) as well as complex
operations(operations that are implemented as a set of simple operations).
def get_objects_by_filter(self, url_path, filters, path_params=None, query_params=None):
def match_filters(obj):
for k, v in filters.items():
:param op_name: name of the operation being called by the user
:type op_name: str
:param params: definition of the params that operation should be executed with
:type params: dict
:return: Result of the operation being executed
:rtype: dict
"""
if self._operation_checker.is_upsert_operation(op_name):
return self.upsert_object(op_name, params)
else:
return self.crud_operation(op_name, params)
def crud_operation(self, op_name, params):
"""
Allow user request execution of simple operations(natively supported by API provider) only.
:param op_name: name of the operation being called by the user
:type op_name: str
:param params: definition of the params that operation should be executed with
:type params: dict
:return: Result of the operation being executed
:rtype: dict
"""
op_spec = self.get_operation_spec(op_name)
if op_spec is None:
raise FtdInvalidOperationNameError(op_name)
if self._operation_checker.is_add_operation(op_name, op_spec):
resp = self.add_object(op_name, params)
elif self._operation_checker.is_edit_operation(op_name, op_spec):
resp = self.edit_object(op_name, params)
elif self._operation_checker.is_delete_operation(op_name, op_spec):
resp = self.delete_object(op_name, params)
elif self._operation_checker.is_find_by_filter_operation(op_name, params, op_spec):
resp = list(self.get_objects_by_filter(op_name, params))
else:
resp = self.send_general_request(op_name, params)
return resp
def get_operation_spec(self, operation_name):
if operation_name not in self._operation_spec_cache:
self._operation_spec_cache[operation_name] = self._conn.get_operation_spec(operation_name)
return self._operation_spec_cache[operation_name]
def get_operation_specs_by_model_name(self, model_name):
if model_name not in self._models_operations_specs_cache:
model_op_specs = self._conn.get_operation_specs_by_model_name(model_name)
self._models_operations_specs_cache[model_name] = model_op_specs
for op_name, op_spec in iteritems(model_op_specs):
self._operation_spec_cache.setdefault(op_name, op_spec)
return self._models_operations_specs_cache[model_name]
def get_objects_by_filter(self, operation_name, params):
def transform_filters_to_query_param(filter_params):
return ';'.join(['%s:%s' % (key, val) for key, val in sorted(iteritems(filter_params))])
def match_filters(filter_params, obj):
for k, v in iteritems(filter_params):
if k not in obj or obj[k] != v:
return False
return True
item_generator = iterate_over_pageable_resource(
partial(self.send_request, url_path=url_path, http_method=HTTPMethod.GET, path_params=path_params),
query_params
)
return [i for i in item_generator if match_filters(i)]
dummy, query_params, path_params = _get_user_params(params)
# copy required params to avoid mutation of passed `params` dict
get_list_params = {ParamName.QUERY_PARAMS: dict(query_params), ParamName.PATH_PARAMS: dict(path_params)}
def add_object(self, url_path, body_params, path_params=None, query_params=None, update_if_exists=False):
filters = params.get(ParamName.FILTERS) or {}
if filters:
get_list_params[ParamName.QUERY_PARAMS][QueryParams.FILTER] = transform_filters_to_query_param(filters)
item_generator = iterate_over_pageable_resource(
partial(self.send_general_request, operation_name=operation_name), get_list_params
)
return (i for i in item_generator if match_filters(filters, i))
def add_object(self, operation_name, params):
def is_duplicate_name_error(err):
return err.code == UNPROCESSABLE_ENTITY_STATUS and DUPLICATE_NAME_ERROR_MESSAGE in str(err)
def update_existing_object(obj):
new_path_params = {} if path_params is None else path_params
new_path_params['objId'] = obj['id']
return self.send_request(url_path=url_path + '/{objId}',
http_method=HTTPMethod.PUT,
body_params=copy_identity_properties(obj, body_params),
path_params=new_path_params,
query_params=query_params)
try:
return self.send_request(url_path=url_path, http_method=HTTPMethod.POST, body_params=body_params,
path_params=path_params, query_params=query_params)
return self.send_general_request(operation_name, params)
except FtdServerError as e:
if is_duplicate_name_error(e):
existing_obj = self.get_object_by_name(url_path, body_params['name'], path_params)
if equal_objects(existing_obj, body_params):
return existing_obj
elif update_if_exists:
return update_existing_object(existing_obj)
else:
raise FtdConfigurationError(
'Cannot add new object. An object with the same name but different parameters already exists.')
return self._check_if_the_same_object(operation_name, params, e)
else:
raise e
def delete_object(self, url_path, path_params):
def _check_if_the_same_object(self, operation_name, params, e):
"""
Special check used in the scope of 'add_object' operation, which can be requested as a standalone operation or
in the scope of 'upsert_object' operation. This method executed in case 'add_object' failed and should try to
find the object that caused "object duplicate" error. In case single object found and it's equal to one we are
trying to create - the existing object will be returned (attempt to have kind of idempotency for add action).
In the case when we got more than one object returned as a result of the request to API - it will be hard to
find exact duplicate so the exception will be raised.
"""
model_name = self.get_operation_spec(operation_name)[OperationField.MODEL_NAME]
get_list_operation = self._find_get_list_operation(model_name)
if get_list_operation:
data = params[ParamName.DATA]
if not params.get(ParamName.FILTERS):
params[ParamName.FILTERS] = {'name': data['name']}
existing_obj = None
existing_objs = self.get_objects_by_filter(get_list_operation, params)
for i, obj in enumerate(existing_objs):
if i > 0:
raise FtdConfigurationError(MULTIPLE_DUPLICATES_FOUND_ERROR)
existing_obj = obj
if existing_obj is not None:
if equal_objects(existing_obj, data):
return existing_obj
else:
raise FtdConfigurationError(
'Cannot add new object. '
'An object with the same name but different parameters already exists.',
existing_obj)
raise e
def _find_get_list_operation(self, model_name):
operations = self.get_operation_specs_by_model_name(model_name) or {}
return next((
op for op, op_spec in operations.items()
if self._operation_checker.is_get_list_operation(op, op_spec)), None)
def _find_get_operation(self, model_name):
operations = self.get_operation_specs_by_model_name(model_name) or {}
return next((
op for op, op_spec in operations.items()
if self._operation_checker.is_get_operation(op, op_spec)), None)
def delete_object(self, operation_name, params):
def is_invalid_uuid_error(err):
return err.code == UNPROCESSABLE_ENTITY_STATUS and INVALID_UUID_ERROR_MESSAGE in str(err)
try:
return self.send_request(url_path=url_path, http_method=HTTPMethod.DELETE, path_params=path_params)
return self.send_general_request(operation_name, params)
except FtdServerError as e:
if is_invalid_uuid_error(e):
return {'status': 'Referenced object does not exist'}
else:
raise e
def edit_object(self, url_path, body_params, path_params=None, query_params=None):
existing_object = self.send_request(url_path=url_path, http_method=HTTPMethod.GET, path_params=path_params)
def edit_object(self, operation_name, params):
data, dummy, path_params = _get_user_params(params)
if not existing_object:
raise FtdConfigurationError('Referenced object does not exist')
elif equal_objects(existing_object, body_params):
return existing_object
else:
return self.send_request(url_path=url_path, http_method=HTTPMethod.PUT, body_params=body_params,
path_params=path_params, query_params=query_params)
model_name = self.get_operation_spec(operation_name)[OperationField.MODEL_NAME]
get_operation = self._find_get_operation(model_name)
def send_request(self, url_path, http_method, body_params=None, path_params=None, query_params=None):
if get_operation:
existing_object = self.send_general_request(get_operation, {ParamName.PATH_PARAMS: path_params})
if not existing_object:
raise FtdConfigurationError('Referenced object does not exist')
elif equal_objects(existing_object, data):
return existing_object
return self.send_general_request(operation_name, params)
def send_general_request(self, operation_name, params):
self.validate_params(operation_name, params)
if self._check_mode:
raise CheckModeException()
data, query_params, path_params = _get_user_params(params)
op_spec = self.get_operation_spec(operation_name)
url, method = op_spec[OperationField.URL], op_spec[OperationField.METHOD]
return self._send_request(url, method, data, path_params, query_params)
def _send_request(self, url_path, http_method, body_params=None, path_params=None, query_params=None):
def raise_for_failure(resp):
if not resp[ResponseParams.SUCCESS]:
raise FtdServerError(resp[ResponseParams.RESPONSE], resp[ResponseParams.STATUS_CODE])
@ -119,28 +393,152 @@ class BaseConfigurationResource(object):
self.config_changed = True
return response[ResponseParams.RESPONSE]
def validate_params(self, operation_name, params):
report = {}
op_spec = self.get_operation_spec(operation_name)
data, query_params, path_params = _get_user_params(params)
def iterate_over_pageable_resource(resource_func, query_params=None):
def validate(validation_method, field_name, user_params):
key = 'Invalid %s provided' % field_name
try:
is_valid, validation_report = validation_method(operation_name, user_params)
if not is_valid:
report[key] = validation_report
except Exception as e:
report[key] = str(e)
return report
validate(self._conn.validate_query_params, ParamName.QUERY_PARAMS, query_params)
validate(self._conn.validate_path_params, ParamName.PATH_PARAMS, path_params)
if is_post_request(op_spec) or is_put_request(op_spec):
validate(self._conn.validate_data, ParamName.DATA, data)
if report:
raise ValidationError(report)
def is_upsert_operation_supported(self, op_name):
"""
Checks if all operations required for upsert object operation are defined in 'operations'.
:param op_name: upsert operation name
:type op_name: str
:return: True if all criteria required to provide requested called operation are satisfied, otherwise False
:rtype: bool
"""
model_name = _extract_model_from_upsert_operation(op_name)
operations = self.get_operation_specs_by_model_name(model_name)
return self._operation_checker.is_upsert_operation_supported(operations)
@staticmethod
def _get_operation_name(checker, operations):
for operation_name, op_spec in operations.items():
if checker(operation_name, op_spec):
return operation_name
raise FtdConfigurationError("Operation is not supported")
def _add_upserted_object(self, model_operations, params):
add_op_name = self._get_operation_name(self._operation_checker.is_add_operation, model_operations)
return self.add_object(add_op_name, params)
def _edit_upserted_object(self, model_operations, existing_object, params):
edit_op_name = self._get_operation_name(self._operation_checker.is_edit_operation, model_operations)
_set_default(params, 'path_params', {})
_set_default(params, 'data', {})
params['path_params']['objId'] = existing_object['id']
copy_identity_properties(existing_object, params['data'])
return self.edit_object(edit_op_name, params)
def upsert_object(self, op_name, params):
"""
The wrapper on top of add object operation, get a list of objects and edit object operations that implement
upsert object operation. As a result, the object will be created if the object does not exist, if a single
object exists with requested 'params' this object will be updated otherwise, Exception will be raised.
:param op_name: upsert operation name
:type op_name: str
:param params: params that upsert operation should be executed with
:type params: dict
:return: upserted object representation
:rtype: dict
"""
if not self.is_upsert_operation_supported(op_name):
raise FtdInvalidOperationNameError(op_name)
model_name = _extract_model_from_upsert_operation(op_name)
model_operations = self.get_operation_specs_by_model_name(model_name)
try:
return self._add_upserted_object(model_operations, params)
except FtdConfigurationError as e:
if e.obj:
return self._edit_upserted_object(model_operations, e.obj, params)
raise e
def _set_default(params, field_name, value):
if field_name not in params or params[field_name] is None:
params[field_name] = value
def is_post_request(operation_spec):
return operation_spec[OperationField.METHOD] == HTTPMethod.POST
def is_put_request(operation_spec):
return operation_spec[OperationField.METHOD] == HTTPMethod.PUT
def _extract_model_from_upsert_operation(op_name):
return op_name[len(OperationNamePrefix.UPSERT):]
def _get_user_params(params):
return params.get(ParamName.DATA) or {}, params.get(ParamName.QUERY_PARAMS) or {}, params.get(
ParamName.PATH_PARAMS) or {}
def iterate_over_pageable_resource(resource_func, params):
"""
A generator function that iterates over a resource that supports pagination and lazily returns present items
one by one.
:param resource_func: function that receives `query_params` named argument and returns a page of objects
:param resource_func: function that receives `params` argument and returns a page of objects
:type resource_func: callable
:param query_params: initial dictionary of query parameters that will be passed to the resource_func
:type query_params: dict
:param params: initial dictionary of parameters that will be passed to the resource_func.
Should contain `query_params` inside.
:type params: dict
:return: an iterator containing returned items
:rtype: iterator of dict
"""
query_params = {} if query_params is None else dict(query_params)
query_params.setdefault('limit', DEFAULT_PAGE_SIZE)
query_params.setdefault('offset', DEFAULT_OFFSET)
# creating a copy not to mutate passed dict
params = copy.deepcopy(params)
params[ParamName.QUERY_PARAMS].setdefault('limit', DEFAULT_PAGE_SIZE)
params[ParamName.QUERY_PARAMS].setdefault('offset', DEFAULT_OFFSET)
limit = int(params[ParamName.QUERY_PARAMS]['limit'])
def received_less_items_than_requested(items_in_response, items_expected):
if items_in_response == items_expected:
return False
elif items_in_response < items_expected:
return True
raise FtdUnexpectedResponse(
"Get List of Objects Response from the server contains more objects than requested. "
"There are {0} item(s) in the response while {1} was(ere) requested".format(items_in_response,
items_expected)
)
while True:
result = resource_func(params=params)
result = resource_func(query_params=query_params)
while result['items']:
for item in result['items']:
yield item
if received_less_items_than_requested(len(result['items']), limit):
break
# creating a copy not to mutate existing dict
query_params = dict(query_params)
query_params['offset'] = int(query_params['offset']) + int(query_params['limit'])
result = resource_func(query_params=query_params)
params = copy.deepcopy(params)
query_params = params[ParamName.QUERY_PARAMS]
query_params['offset'] = int(query_params['offset']) + limit

View file

@ -17,10 +17,11 @@
#
from ansible.module_utils.network.ftd.common import HTTPMethod
from ansible.module_utils.six import integer_types, string_types
from ansible.module_utils.six import integer_types, string_types, iteritems
FILE_MODEL_NAME = '_File'
SUCCESS_RESPONSE_CODE = '200'
DELETE_PREFIX = 'delete'
class OperationField:
@ -28,12 +29,15 @@ class OperationField:
METHOD = 'method'
PARAMETERS = 'parameters'
MODEL_NAME = 'modelName'
DESCRIPTION = 'description'
RETURN_MULTIPLE_ITEMS = 'returnMultipleItems'
class SpecProp:
DEFINITIONS = 'definitions'
OPERATIONS = 'operations'
MODELS = 'models'
MODEL_OPERATIONS = 'model_operations'
class PropName:
@ -51,6 +55,7 @@ class PropName:
PROPERTIES = 'properties'
RESPONSES = 'responses'
NAME = 'name'
DESCRIPTION = 'description'
class PropType:
@ -68,6 +73,10 @@ class OperationParams:
QUERY = 'query'
class QueryParams:
FILTER = 'filter'
def _get_model_name_from_url(schema_ref):
path = schema_ref.split('/')
return path[len(path) - 1]
@ -89,13 +98,18 @@ class ValidationError(ValueError):
class FdmSwaggerParser:
_definitions = None
_base_path = None
def parse_spec(self, spec):
def parse_spec(self, spec, docs=None):
"""
This method simplifies a swagger format and also resolves a model name for each operation
:param spec: dict
expect data in the swagger format see <https://github.com/OAI/OpenAPI-Specification/blob/master/versions/2.0.md>
:rtype: (bool, string|dict)
This method simplifies a swagger format, resolves a model name for each operation, and adds documentation for
each operation and model if it is provided.
:param spec: An API specification in the swagger format, see <https://github.com/OAI/OpenAPI-Specification/blob/master/versions/2.0.md>
:type spec: dict
:param spec: A documentation map containing descriptions for models, operations and operation parameters.
:type docs: dict
:rtype: dict
:return:
Ex.
The models field contains model definition from swagger see <#https://github.com/OAI/OpenAPI-Specification/blob/master/versions/2.0.md#definitions>
@ -111,6 +125,7 @@ class FdmSwaggerParser:
'modelName': 'NetworkObject', # it is a link to the model from 'models'
# None - for a delete operation or we don't have information
# '_File' - if an endpoint works with files
'returnMultipleItems': False, # shows if the operation returns a single item or an item list
'parameters': {
'path':{
'param_name':{
@ -129,26 +144,49 @@ class FdmSwaggerParser:
}
},
...
},
'model_operations':{
'model_name':{ # a list of operations available for the current model
'operation_name':{
... # the same as in the operations section
},
...
},
...
}
}
"""
self._definitions = spec[SpecProp.DEFINITIONS]
config = {
self._base_path = spec[PropName.BASE_PATH]
operations = self._get_operations(spec)
if docs:
operations = self._enrich_operations_with_docs(operations, docs)
self._definitions = self._enrich_definitions_with_docs(self._definitions, docs)
return {
SpecProp.MODELS: self._definitions,
SpecProp.OPERATIONS: self._get_operations(spec)
SpecProp.OPERATIONS: operations,
SpecProp.MODEL_OPERATIONS: self._get_model_operations(operations)
}
return config
def _get_model_operations(self, operations):
model_operations = {}
for operations_name, params in iteritems(operations):
model_name = params[OperationField.MODEL_NAME]
model_operations.setdefault(model_name, {})[operations_name] = params
return model_operations
def _get_operations(self, spec):
base_path = spec[PropName.BASE_PATH]
paths_dict = spec[PropName.PATHS]
operations_dict = {}
for url, operation_params in paths_dict.items():
for method, params in operation_params.items():
for url, operation_params in iteritems(paths_dict):
for method, params in iteritems(operation_params):
operation = {
OperationField.METHOD: method,
OperationField.URL: base_path + url,
OperationField.MODEL_NAME: self._get_model_name(method, params)
OperationField.URL: self._base_path + url,
OperationField.MODEL_NAME: self._get_model_name(method, params),
OperationField.RETURN_MULTIPLE_ITEMS: self._return_multiple_items(params)
}
if OperationField.PARAMETERS in params:
operation[OperationField.PARAMETERS] = self._get_rest_params(params[OperationField.PARAMETERS])
@ -157,14 +195,68 @@ class FdmSwaggerParser:
operations_dict[operation_id] = operation
return operations_dict
def _enrich_operations_with_docs(self, operations, docs):
def get_operation_docs(op):
op_url = op[OperationField.URL][len(self._base_path):]
return docs[PropName.PATHS].get(op_url, {}).get(op[OperationField.METHOD], {})
for operation in operations.values():
operation_docs = get_operation_docs(operation)
operation[OperationField.DESCRIPTION] = operation_docs.get(PropName.DESCRIPTION, '')
if OperationField.PARAMETERS in operation:
param_descriptions = dict((p[PropName.NAME], p[PropName.DESCRIPTION])
for p in operation_docs.get(OperationField.PARAMETERS, {}))
for param_name, params_spec in operation[OperationField.PARAMETERS][OperationParams.PATH].items():
params_spec[OperationField.DESCRIPTION] = param_descriptions.get(param_name, '')
for param_name, params_spec in operation[OperationField.PARAMETERS][OperationParams.QUERY].items():
params_spec[OperationField.DESCRIPTION] = param_descriptions.get(param_name, '')
return operations
def _enrich_definitions_with_docs(self, definitions, docs):
for model_name, model_def in definitions.items():
model_docs = docs[SpecProp.DEFINITIONS].get(model_name, {})
model_def[PropName.DESCRIPTION] = model_docs.get(PropName.DESCRIPTION, '')
for prop_name, prop_spec in model_def.get(PropName.PROPERTIES, {}).items():
prop_spec[PropName.DESCRIPTION] = model_docs.get(PropName.PROPERTIES, {}).get(prop_name, '')
prop_spec[PropName.REQUIRED] = prop_name in model_def.get(PropName.REQUIRED, [])
return definitions
def _get_model_name(self, method, params):
if method == HTTPMethod.GET:
return self._get_model_name_from_responses(params)
elif method == HTTPMethod.POST or method == HTTPMethod.PUT:
return self._get_model_name_for_post_put_requests(params)
elif method == HTTPMethod.DELETE:
return self._get_model_name_from_delete_operation(params)
else:
return None
@staticmethod
def _return_multiple_items(op_params):
"""
Defines if the operation returns one item or a list of items.
:param op_params: operation specification
:return: True if the operation returns a list of items, otherwise False
"""
try:
schema = op_params[PropName.RESPONSES][SUCCESS_RESPONSE_CODE][PropName.SCHEMA]
return PropName.ITEMS in schema[PropName.PROPERTIES]
except KeyError:
return False
def _get_model_name_from_delete_operation(self, params):
operation_id = params[PropName.OPERATION_ID]
if operation_id.startswith(DELETE_PREFIX):
model_name = operation_id[len(DELETE_PREFIX):]
if model_name in self._definitions:
return model_name
return None
def _get_model_name_for_post_put_requests(self, params):
model_name = None
if OperationField.PARAMETERS in params:
@ -429,7 +521,8 @@ class FdmSwaggerValidator:
self._add_invalid_type_report(status, path, '', PropType.OBJECT, data)
return None
self._check_required_fields(status, model[PropName.REQUIRED], data, path)
if PropName.REQUIRED in model:
self._check_required_fields(status, model[PropName.REQUIRED], data, path)
model_properties = model[PropName.PROPERTIES]
for prop in model_properties.keys():
@ -472,14 +565,25 @@ class FdmSwaggerValidator:
@staticmethod
def _is_correct_simple_types(expected_type, value):
def is_numeric_string(s):
try:
float(s)
return True
except ValueError:
return False
if expected_type == PropType.STRING:
return isinstance(value, string_types)
elif expected_type == PropType.BOOLEAN:
return isinstance(value, bool)
elif expected_type == PropType.INTEGER:
return isinstance(value, integer_types) and not isinstance(value, bool)
is_integer = isinstance(value, integer_types) and not isinstance(value, bool)
is_digit_string = isinstance(value, string_types) and value.isdigit()
return is_integer or is_digit_string
elif expected_type == PropType.NUMBER:
return isinstance(value, (integer_types, float)) and not isinstance(value, bool)
is_number = isinstance(value, (integer_types, float)) and not isinstance(value, bool)
is_numeric_string = isinstance(value, string_types) and is_numeric_string(value)
return is_number or is_numeric_string
return False
@staticmethod

View file

@ -19,8 +19,8 @@
#
from __future__ import absolute_import, division, print_function
__metaclass__ = type
__metaclass__ = type
ANSIBLE_METADATA = {'metadata_version': '1.1',
'status': ['preview'],
@ -38,25 +38,31 @@ author: "Cisco Systems, Inc."
options:
operation:
description:
- The name of the operation to execute. Commonly, the operation starts with 'add', 'edit', 'get'
- The name of the operation to execute. Commonly, the operation starts with 'add', 'edit', 'get', 'upsert'
or 'delete' verbs, but can have an arbitrary name too.
required: true
type: str
data:
description:
- Key-value pairs that should be sent as body parameters in a REST API call
type: dict
query_params:
description:
- Key-value pairs that should be sent as query parameters in a REST API call.
type: dict
path_params:
description:
- Key-value pairs that should be sent as path parameters in a REST API call.
type: dict
register_as:
description:
- Specifies Ansible fact name that is used to register received response from the FTD device.
type: str
filters:
description:
- Key-value dict that represents equality filters. Every key is a property name and value is its desired value.
If multiple filters are present, they are combined with logical operator AND.
type: dict
"""
EXAMPLES = """
@ -88,74 +94,11 @@ response:
"""
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.connection import Connection
from ansible.module_utils.network.ftd.common import HTTPMethod, construct_ansible_facts, FtdConfigurationError, \
FtdServerError
from ansible.module_utils.network.ftd.configuration import BaseConfigurationResource
from ansible.module_utils.network.ftd.fdm_swagger_client import OperationField, ValidationError
def is_post_request(operation_spec):
return operation_spec[OperationField.METHOD] == HTTPMethod.POST
def is_put_request(operation_spec):
return operation_spec[OperationField.METHOD] == HTTPMethod.PUT
def is_add_operation(operation_name, operation_spec):
# Some endpoints have non-CRUD operations, so checking operation name is required in addition to the HTTP method
return operation_name.startswith('add') and is_post_request(operation_spec)
def is_edit_operation(operation_name, operation_spec):
# Some endpoints have non-CRUD operations, so checking operation name is required in addition to the HTTP method
return operation_name.startswith('edit') and is_put_request(operation_spec)
def is_delete_operation(operation_name, operation_spec):
# Some endpoints have non-CRUD operations, so checking operation name is required in addition to the HTTP method
return operation_name.startswith('delete') and operation_spec[OperationField.METHOD] == HTTPMethod.DELETE
def validate_params(connection, op_name, query_params, path_params, data, op_spec):
report = {}
def validate(validation_method, field_name, params):
key = 'Invalid %s provided' % field_name
try:
is_valid, validation_report = validation_method(op_name, params)
if not is_valid:
report[key] = validation_report
except Exception as e:
report[key] = str(e)
return report
validate(connection.validate_query_params, 'query_params', query_params)
validate(connection.validate_path_params, 'path_params', path_params)
if is_post_request(op_spec) or is_post_request(op_spec):
validate(connection.validate_data, 'data', data)
if report:
raise ValidationError(report)
def is_find_by_filter_operation(operation_name, operation_spec, params):
"""
Checks whether the called operation is 'find by filter'. This operation fetches all objects and finds
the matching ones by the given filter. As filtering is done on the client side, this operation should be used
only when selected filters are not implemented on the server side.
:param operation_name: name of the operation being called by the user
:type operation_name: str
:param operation_spec: specification of the operation being called by the user
:type operation_spec: dict
:param params: module parameters
:return: True if called operation is find by filter, otherwise False
:rtype: bool
"""
is_get_list_operation = operation_name.startswith('get') and operation_name.endswith('List')
is_get_method = operation_spec[OperationField.METHOD] == HTTPMethod.GET
return is_get_list_operation and is_get_method and params['filters']
from ansible.module_utils.network.ftd.configuration import BaseConfigurationResource, CheckModeException, \
FtdInvalidOperationNameError
from ansible.module_utils.network.ftd.fdm_swagger_client import ValidationError
from ansible.module_utils.network.ftd.common import construct_ansible_facts, FtdConfigurationError, \
FtdServerError, FtdUnexpectedResponse
def main():
@ -172,47 +115,25 @@ def main():
params = module.params
connection = Connection(module._socket_path)
resource = BaseConfigurationResource(connection, module.check_mode)
op_name = params['operation']
op_spec = connection.get_operation_spec(op_name)
if op_spec is None:
module.fail_json(msg='Invalid operation name provided: %s' % op_name)
data, query_params, path_params = params['data'], params['query_params'], params['path_params']
try:
validate_params(connection, op_name, query_params, path_params, data, op_spec)
except ValidationError as e:
module.fail_json(msg=e.args[0])
try:
if module.check_mode:
module.exit_json(changed=False)
resource = BaseConfigurationResource(connection)
url = op_spec[OperationField.URL]
if is_add_operation(op_name, op_spec):
resp = resource.add_object(url, data, path_params, query_params)
elif is_edit_operation(op_name, op_spec):
resp = resource.edit_object(url, data, path_params, query_params)
elif is_delete_operation(op_name, op_spec):
resp = resource.delete_object(url, path_params)
elif is_find_by_filter_operation(op_name, op_spec, params):
resp = resource.get_objects_by_filter(url, params['filters'], path_params,
query_params)
else:
resp = resource.send_request(url, op_spec[OperationField.METHOD], data,
path_params,
query_params)
resp = resource.execute_operation(op_name, params)
module.exit_json(changed=resource.config_changed, response=resp,
ansible_facts=construct_ansible_facts(resp, module.params))
except FtdInvalidOperationNameError as e:
module.fail_json(msg='Invalid operation name provided: %s' % e.operation_name)
except FtdConfigurationError as e:
module.fail_json(msg='Failed to execute %s operation because of the configuration error: %s' % (op_name, e))
module.fail_json(msg='Failed to execute %s operation because of the configuration error: %s' % (op_name, e.msg))
except FtdServerError as e:
module.fail_json(msg='Server returned an error trying to execute %s operation. Status code: %s. '
'Server response: %s' % (op_name, e.code, e.response))
except FtdUnexpectedResponse as e:
module.fail_json(msg=e.args[0])
except ValidationError as e:
module.fail_json(msg=e.args[0])
except CheckModeException:
module.exit_json(changed=False)
if __name__ == '__main__':

View file

@ -41,14 +41,17 @@ options:
- The name of the operation to execute.
- Only operations that return a file can be used in this module.
required: true
type: str
path_params:
description:
- Key-value pairs that should be sent as path parameters in a REST API call.
type: dict
destination:
description:
- Absolute path of where to download the file to.
- If destination is a directory, the module uses a filename from 'Content-Disposition' header specified by the server.
required: true
type: path
"""
EXAMPLES = """
@ -62,7 +65,7 @@ EXAMPLES = """
RETURN = """
msg:
description: the error message describing why the module failed
description: The error message describing why the module failed.
returned: error
type: string
"""

View file

@ -40,25 +40,29 @@ options:
- The name of the operation to execute.
- Only operations that upload file can be used in this module.
required: true
fileToUpload:
type: str
file_to_upload:
description:
- Absolute path to the file that should be uploaded.
required: true
type: path
version_added: "2.8"
register_as:
description:
- Specifies Ansible fact name that is used to register received response from the FTD device.
type: str
"""
EXAMPLES = """
- name: Upload disk file
ftd_file_upload:
operation: 'postuploaddiskfile'
fileToUpload: /tmp/test1.txt
file_to_upload: /tmp/test1.txt
"""
RETURN = """
msg:
description: the error message describing why the module failed
description: The error message describing why the module failed.
returned: error
type: string
"""
@ -75,7 +79,7 @@ def is_upload_operation(op_spec):
def main():
fields = dict(
operation=dict(type='str', required=True),
fileToUpload=dict(type='path', required=True),
file_to_upload=dict(type='path', required=True),
register_as=dict(type='str'),
)
module = AnsibleModule(argument_spec=fields,
@ -94,7 +98,7 @@ def main():
try:
if module.check_mode:
module.exit_json()
resp = connection.upload_file(params['fileToUpload'], op_spec[OperationField.URL])
resp = connection.upload_file(params['file_to_upload'], op_spec[OperationField.URL])
module.exit_json(changed=True, response=resp, ansible_facts=construct_ansible_facts(resp, module.params))
except FtdServerError as e:
module.fail_json(msg='Upload request for %s operation failed. Status code: %s. '

View file

@ -37,7 +37,6 @@ options:
default: '/api/fdm/v2/fdm/token'
vars:
- name: ansible_httpapi_ftd_token_path
spec_path:
type: str
description:
@ -70,6 +69,13 @@ BASE_HEADERS = {
TOKEN_EXPIRATION_STATUS_CODE = 408
UNAUTHORIZED_STATUS_CODE = 401
try:
from __main__ import display
except ImportError:
from ansible.utils.display import Display
display = Display()
class HttpApi(HttpApiBase):
def __init__(self, connection):
@ -79,6 +85,7 @@ class HttpApi(HttpApiBase):
self.refresh_token = None
self._api_spec = None
self._api_validator = None
self._ignore_http_errors = False
def login(self, username, password):
def request_token_payload(username, password):
@ -101,10 +108,15 @@ class HttpApi(HttpApiBase):
else:
raise AnsibleConnectionFailure('Username and password are required for login in absence of refresh token')
dummy, response_data = self.connection.send(
self._get_api_token_path(), json.dumps(payload), method=HTTPMethod.POST, headers=BASE_HEADERS
url = self._get_api_token_path()
self._display(HTTPMethod.POST, 'login', url)
response, response_data = self._send_auth_request(
url, json.dumps(payload), method=HTTPMethod.POST, headers=BASE_HEADERS
)
response = self._response_to_json(response_data.getvalue())
self._display(HTTPMethod.POST, 'login:status_code', response.getcode())
response = self._response_to_json(self._get_response_value(response_data))
try:
self.refresh_token = response['refresh_token']
@ -120,13 +132,29 @@ class HttpApi(HttpApiBase):
'access_token': self.access_token,
'token_to_revoke': self.refresh_token
}
self.connection.send(
self._get_api_token_path(), json.dumps(auth_payload), method=HTTPMethod.POST,
headers=BASE_HEADERS
)
url = self._get_api_token_path()
self._display(HTTPMethod.POST, 'logout', url)
response, dummy = self._send_auth_request(url, json.dumps(auth_payload), method=HTTPMethod.POST,
headers=BASE_HEADERS)
self._display(HTTPMethod.POST, 'logout:status_code', response.getcode())
self.refresh_token = None
self.access_token = None
def _send_auth_request(self, path, data, **kwargs):
try:
self._ignore_http_errors = True
return self.connection.send(path, data, **kwargs)
except HTTPError as e:
# HttpApi connection does not read the error response from HTTPError, so we do it here and wrap it up in
# ConnectionError, so the actual error message is displayed to the user.
error_msg = self._response_to_json(to_text(e.read()))
raise ConnectionError('Server returned an error during authentication request: %s' % error_msg)
finally:
self._ignore_http_errors = False
def update_auth(self, response, response_data):
# With tokens, authentication should not be checked and updated on each request
return None
@ -135,23 +163,34 @@ class HttpApi(HttpApiBase):
url = construct_url_path(url_path, path_params, query_params)
data = json.dumps(body_params) if body_params else None
try:
self._display(http_method, 'url', url)
if data:
self._display(http_method, 'data', data)
response, response_data = self.connection.send(url, data, method=http_method, headers=BASE_HEADERS)
value = self._get_response_value(response_data)
self._display(http_method, 'response', value)
return {
ResponseParams.SUCCESS: True,
ResponseParams.STATUS_CODE: response.getcode(),
ResponseParams.RESPONSE: self._response_to_json(response_data.getvalue())
ResponseParams.RESPONSE: self._response_to_json(value)
}
# Being invoked via JSON-RPC, this method does not serialize and pass HTTPError correctly to the method caller.
# Thus, in order to handle non-200 responses, we need to wrap them into a simple structure and pass explicitly.
except HTTPError as e:
error_msg = to_text(e.read())
self._display(http_method, 'error', error_msg)
return {
ResponseParams.SUCCESS: False,
ResponseParams.STATUS_CODE: e.code,
ResponseParams.RESPONSE: self._response_to_json(e.read())
ResponseParams.RESPONSE: self._response_to_json(error_msg)
}
def upload_file(self, from_path, to_url):
url = construct_url_path(to_url)
self._display(HTTPMethod.POST, 'upload', url)
with open(from_path, 'rb') as src_file:
rf = RequestField('fileToUpload', src_file.read(), os.path.basename(src_file.name))
rf.make_multipart()
@ -162,10 +201,13 @@ class HttpApi(HttpApiBase):
headers['Content-Length'] = len(body)
dummy, response_data = self.connection.send(url, data=body, method=HTTPMethod.POST, headers=headers)
return self._response_to_json(response_data.getvalue())
value = self._get_response_value(response_data)
self._display(HTTPMethod.POST, 'upload:response', value)
return self._response_to_json(value)
def download_file(self, from_url, to_path, path_params=None):
url = construct_url_path(from_url, path_params=path_params)
self._display(HTTPMethod.GET, 'download', url)
response, response_data = self.connection.send(url, data=None, method=HTTPMethod.GET, headers=BASE_HEADERS)
if os.path.isdir(to_path):
@ -174,15 +216,24 @@ class HttpApi(HttpApiBase):
with open(to_path, "wb") as output_file:
output_file.write(response_data.getvalue())
self._display(HTTPMethod.GET, 'downloaded', to_path)
def handle_httperror(self, exc):
if exc.code == TOKEN_EXPIRATION_STATUS_CODE or exc.code == UNAUTHORIZED_STATUS_CODE:
is_auth_related_code = exc.code == TOKEN_EXPIRATION_STATUS_CODE or exc.code == UNAUTHORIZED_STATUS_CODE
if not self._ignore_http_errors and is_auth_related_code:
self.connection._auth = None
self.login(self.connection.get_option('remote_user'), self.connection.get_option('password'))
return True
# None means that the exception will be passed further to the caller
return None
def _display(self, http_method, title, msg=''):
display.vvvv('REST:%s:%s:%s\n%s' % (http_method, self.connection._url, title, msg))
@staticmethod
def _get_response_value(response_data):
return to_text(response_data.getvalue())
def _get_api_spec_path(self):
return self.get_option('spec_path')
@ -190,8 +241,7 @@ class HttpApi(HttpApiBase):
return self.get_option('token_path')
@staticmethod
def _response_to_json(response_data):
response_text = to_text(response_data)
def _response_to_json(response_text):
try:
return json.loads(response_text) if response_text else {}
# JSONDecodeError only available on Python 3.5+
@ -201,6 +251,12 @@ class HttpApi(HttpApiBase):
def get_operation_spec(self, operation_name):
return self.api_spec[SpecProp.OPERATIONS].get(operation_name, None)
def get_operation_specs_by_model_name(self, model_name):
if model_name:
return self.api_spec[SpecProp.MODEL_OPERATIONS].get(model_name, None)
else:
return None
def get_model_spec(self, model_name):
return self.api_spec[SpecProp.MODELS].get(model_name, None)

View file

@ -70,6 +70,13 @@ def test_equal_objects_return_true_with_equal_objects():
)
def test_equal_objects_return_true_with_equal_str_like_values():
assert equal_objects(
{'foo': b'bar'},
{'foo': u'bar'}
)
def test_equal_objects_return_true_with_equal_nested_dicts():
assert equal_objects(
{'foo': {'bar': 1, 'buz': 2}},

View file

@ -16,34 +16,81 @@
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
import json
import unittest
import pytest
from ansible.compat.tests import mock
from ansible.compat.tests.mock import call, patch
from ansible.module_utils.network.ftd.configuration import iterate_over_pageable_resource, BaseConfigurationResource
from ansible.module_utils.network.ftd.common import HTTPMethod, FtdUnexpectedResponse
from ansible.module_utils.network.ftd.configuration import iterate_over_pageable_resource, BaseConfigurationResource, \
OperationChecker, OperationNamePrefix, ParamName, QueryParams
from ansible.module_utils.network.ftd.fdm_swagger_client import ValidationError, OperationField
class TestBaseConfigurationResource(object):
@pytest.fixture
def connection_mock(self, mocker):
connection_class_mock = mocker.patch('ansible.modules.network.ftd.ftd_configuration.Connection')
connection_instance = connection_class_mock.return_value
connection_instance.validate_data.return_value = True, None
connection_instance.validate_query_params.return_value = True, None
connection_instance.validate_path_params.return_value = True, None
@patch.object(BaseConfigurationResource, 'send_request')
def test_get_objects_by_filter_with_multiple_filters(self, send_request_mock):
return connection_instance
@patch.object(BaseConfigurationResource, '_send_request')
def test_get_objects_by_filter_with_multiple_filters(self, send_request_mock, connection_mock):
objects = [
{'name': 'obj1', 'type': 1, 'foo': {'bar': 'buzz'}},
{'name': 'obj2', 'type': 1, 'foo': {'bar': 'buz'}},
{'name': 'obj3', 'type': 2, 'foo': {'bar': 'buzz'}}
]
resource = BaseConfigurationResource(None)
connection_mock.get_operation_spec.return_value = {
'method': HTTPMethod.GET,
'url': '/object/'
}
resource = BaseConfigurationResource(connection_mock, False)
send_request_mock.side_effect = [{'items': objects}, {'items': []}]
assert objects == resource.get_objects_by_filter('/objects', {})
# resource.get_objects_by_filter returns generator so to be able compare generated list with expected list
# we need evaluate it.
assert objects == list(resource.get_objects_by_filter('test', {}))
send_request_mock.assert_has_calls(
[
mock.call('/object/', 'get', {}, {}, {'limit': 10, 'offset': 0})
]
)
send_request_mock.reset_mock()
send_request_mock.side_effect = [{'items': objects}, {'items': []}]
assert [objects[0]] == resource.get_objects_by_filter('/objects', {'name': 'obj1'})
# resource.get_objects_by_filter returns generator so to be able compare generated list with expected list
# we need evaluate it.
assert [objects[0]] == list(resource.get_objects_by_filter('test', {ParamName.FILTERS: {'name': 'obj1'}}))
send_request_mock.assert_has_calls(
[
mock.call('/object/', 'get', {}, {}, {QueryParams.FILTER: 'name:obj1', 'limit': 10, 'offset': 0})
]
)
send_request_mock.reset_mock()
send_request_mock.side_effect = [{'items': objects}, {'items': []}]
assert [objects[1]] == resource.get_objects_by_filter('/objects',
{'type': 1, 'foo': {'bar': 'buz'}})
# resource.get_objects_by_filter returns generator so to be able compare generated list with expected list
# we need evaluate it.
assert [objects[1]] == list(resource.get_objects_by_filter(
'test',
{ParamName.FILTERS: {'type': 1, 'foo': {'bar': 'buz'}}}))
@patch.object(BaseConfigurationResource, 'send_request')
def test_get_objects_by_filter_with_multiple_responses(self, send_request_mock):
send_request_mock.assert_has_calls(
[
mock.call('/object/', 'get', {}, {},
{QueryParams.FILTER: "foo:{'bar': 'buz'};type:1", 'limit': 10, 'offset': 0})
]
)
@patch.object(BaseConfigurationResource, '_send_request')
def test_get_objects_by_filter_with_multiple_responses(self, send_request_mock, connection_mock):
send_request_mock.side_effect = [
{'items': [
{'name': 'obj1', 'type': 'foo'},
@ -54,11 +101,204 @@ class TestBaseConfigurationResource(object):
]},
{'items': []}
]
connection_mock.get_operation_spec.return_value = {
'method': HTTPMethod.GET,
'url': '/object/'
}
resource = BaseConfigurationResource(connection_mock, False)
assert [{'name': 'obj1', 'type': 'foo'}] == list(resource.get_objects_by_filter(
'test',
{ParamName.FILTERS: {'type': 'foo'}}))
send_request_mock.assert_has_calls(
[
mock.call('/object/', 'get', {}, {},
{QueryParams.FILTER: "type:foo", 'limit': 10, 'offset': 0})
]
)
resource = BaseConfigurationResource(None)
send_request_mock.reset_mock()
send_request_mock.side_effect = [
{'items': [
{'name': 'obj1', 'type': 'foo'},
{'name': 'obj2', 'type': 'bar'}
]},
{'items': [
{'name': 'obj3', 'type': 'foo'}
]},
{'items': []}
]
resp = list(resource.get_objects_by_filter(
'test',
{
ParamName.FILTERS: {'type': 'foo'},
ParamName.QUERY_PARAMS: {'limit': 2}
}))
assert [{'name': 'obj1', 'type': 'foo'}, {'name': 'obj3', 'type': 'foo'}] == resp
send_request_mock.assert_has_calls(
[
mock.call('/object/', 'get', {}, {},
{QueryParams.FILTER: "type:foo", 'limit': 2, 'offset': 0}),
mock.call('/object/', 'get', {}, {},
{QueryParams.FILTER: "type:foo", 'limit': 2, 'offset': 2})
]
)
assert [{'name': 'obj1', 'type': 'foo'}, {'name': 'obj3', 'type': 'foo'}] == resource.get_objects_by_filter(
'/objects', {'type': 'foo'})
def test_module_should_fail_if_validation_error_in_data(self, connection_mock):
connection_mock.get_operation_spec.return_value = {'method': HTTPMethod.POST, 'url': '/test'}
report = {
'required': ['objects[0].type'],
'invalid_type': [
{
'path': 'objects[3].id',
'expected_type': 'string',
'actually_value': 1
}
]
}
connection_mock.validate_data.return_value = (False, json.dumps(report, sort_keys=True, indent=4))
with pytest.raises(ValidationError) as e_info:
resource = BaseConfigurationResource(connection_mock, False)
resource.crud_operation('addTest', {'data': {}})
result = e_info.value.args[0]
key = 'Invalid data provided'
assert result[key]
result[key] = json.loads(result[key])
assert result == {key: {
'invalid_type': [{'actually_value': 1, 'expected_type': 'string', 'path': 'objects[3].id'}],
'required': ['objects[0].type']
}}
def test_module_should_fail_if_validation_error_in_query_params(self, connection_mock):
connection_mock.get_operation_spec.return_value = {'method': HTTPMethod.GET, 'url': '/test',
'returnMultipleItems': False}
report = {
'required': ['objects[0].type'],
'invalid_type': [
{
'path': 'objects[3].id',
'expected_type': 'string',
'actually_value': 1
}
]
}
connection_mock.validate_query_params.return_value = (False, json.dumps(report, sort_keys=True, indent=4))
with pytest.raises(ValidationError) as e_info:
resource = BaseConfigurationResource(connection_mock, False)
resource.crud_operation('getTestList', {'data': {}})
result = e_info.value.args[0]
key = 'Invalid query_params provided'
assert result[key]
result[key] = json.loads(result[key])
assert result == {key: {
'invalid_type': [{'actually_value': 1, 'expected_type': 'string', 'path': 'objects[3].id'}],
'required': ['objects[0].type']}}
def test_module_should_fail_if_validation_error_in_path_params(self, connection_mock):
connection_mock.get_operation_spec.return_value = {'method': HTTPMethod.GET, 'url': '/test',
'returnMultipleItems': False}
report = {
'path_params': {
'required': ['objects[0].type'],
'invalid_type': [
{
'path': 'objects[3].id',
'expected_type': 'string',
'actually_value': 1
}
]
}
}
connection_mock.validate_path_params.return_value = (False, json.dumps(report, sort_keys=True, indent=4))
with pytest.raises(ValidationError) as e_info:
resource = BaseConfigurationResource(connection_mock, False)
resource.crud_operation('putTest', {'data': {}})
result = e_info.value.args[0]
key = 'Invalid path_params provided'
assert result[key]
result[key] = json.loads(result[key])
assert result == {key: {
'path_params': {
'invalid_type': [{'actually_value': 1, 'expected_type': 'string', 'path': 'objects[3].id'}],
'required': ['objects[0].type']}}}
def test_module_should_fail_if_validation_error_in_all_params(self, connection_mock):
connection_mock.get_operation_spec.return_value = {'method': HTTPMethod.POST, 'url': '/test'}
report = {
'data': {
'required': ['objects[0].type'],
'invalid_type': [
{
'path': 'objects[3].id',
'expected_type': 'string',
'actually_value': 1
}
]
},
'path_params': {
'required': ['some_param'],
'invalid_type': [
{
'path': 'name',
'expected_type': 'string',
'actually_value': True
}
]
},
'query_params': {
'required': ['other_param'],
'invalid_type': [
{
'path': 'f_integer',
'expected_type': 'integer',
'actually_value': "test"
}
]
}
}
connection_mock.validate_data.return_value = (False, json.dumps(report['data'], sort_keys=True, indent=4))
connection_mock.validate_query_params.return_value = (False,
json.dumps(report['query_params'], sort_keys=True,
indent=4))
connection_mock.validate_path_params.return_value = (False,
json.dumps(report['path_params'], sort_keys=True,
indent=4))
with pytest.raises(ValidationError) as e_info:
resource = BaseConfigurationResource(connection_mock, False)
resource.crud_operation('putTest', {'data': {}})
result = e_info.value.args[0]
key_data = 'Invalid data provided'
assert result[key_data]
result[key_data] = json.loads(result[key_data])
key_path_params = 'Invalid path_params provided'
assert result[key_path_params]
result[key_path_params] = json.loads(result[key_path_params])
key_query_params = 'Invalid query_params provided'
assert result[key_query_params]
result[key_query_params] = json.loads(result[key_query_params])
assert result == {
key_data: {'invalid_type': [{'actually_value': 1, 'expected_type': 'string', 'path': 'objects[3].id'}],
'required': ['objects[0].type']},
key_path_params: {'invalid_type': [{'actually_value': True, 'expected_type': 'string', 'path': 'name'}],
'required': ['some_param']},
key_query_params: {
'invalid_type': [{'actually_value': 'test', 'expected_type': 'integer', 'path': 'f_integer'}],
'required': ['other_param']}}
class TestIterateOverPageableResource(object):
@ -66,7 +306,7 @@ class TestIterateOverPageableResource(object):
def test_iterate_over_pageable_resource_with_no_items(self):
resource_func = mock.Mock(return_value={'items': []})
items = iterate_over_pageable_resource(resource_func)
items = iterate_over_pageable_resource(resource_func, {'query_params': {}})
assert [] == list(items)
@ -76,33 +316,37 @@ class TestIterateOverPageableResource(object):
{'items': []},
])
items = iterate_over_pageable_resource(resource_func)
items = iterate_over_pageable_resource(resource_func, {'query_params': {}})
assert ['foo', 'bar'] == list(items)
resource_func.assert_has_calls([
call(query_params={'offset': 0, 'limit': 10}),
call(query_params={'offset': 10, 'limit': 10})
call(params={'query_params': {'offset': 0, 'limit': 10}})
])
def test_iterate_over_pageable_resource_with_multiple_pages(self):
resource_func = mock.Mock(side_effect=[
objects = [
{'items': ['foo']},
{'items': ['bar']},
{'items': ['buzz']},
{'items': []},
])
]
resource_func = mock.Mock(side_effect=objects)
items = iterate_over_pageable_resource(resource_func)
items = iterate_over_pageable_resource(resource_func, {'query_params': {}})
assert ['foo'] == list(items)
resource_func.reset_mock()
resource_func = mock.Mock(side_effect=objects)
items = iterate_over_pageable_resource(resource_func, {'query_params': {'limit': 1}})
assert ['foo', 'bar', 'buzz'] == list(items)
def test_iterate_over_pageable_resource_should_preserve_query_params(self):
resource_func = mock.Mock(return_value={'items': []})
items = iterate_over_pageable_resource(resource_func, {'filter': 'name:123'})
items = iterate_over_pageable_resource(resource_func, {'query_params': {'filter': 'name:123'}})
assert [] == list(items)
resource_func.assert_called_once_with(query_params={'filter': 'name:123', 'offset': 0, 'limit': 10})
resource_func.assert_called_once_with(params={'query_params': {'filter': 'name:123', 'offset': 0, 'limit': 10}})
def test_iterate_over_pageable_resource_should_preserve_limit(self):
resource_func = mock.Mock(side_effect=[
@ -110,12 +354,11 @@ class TestIterateOverPageableResource(object):
{'items': []},
])
items = iterate_over_pageable_resource(resource_func, {'limit': 1})
items = iterate_over_pageable_resource(resource_func, {'query_params': {'limit': 1}})
assert ['foo'] == list(items)
resource_func.assert_has_calls([
call(query_params={'offset': 0, 'limit': 1}),
call(query_params={'offset': 1, 'limit': 1})
call(params={'query_params': {'offset': 0, 'limit': 1}})
])
def test_iterate_over_pageable_resource_should_preserve_offset(self):
@ -124,12 +367,11 @@ class TestIterateOverPageableResource(object):
{'items': []},
])
items = iterate_over_pageable_resource(resource_func, {'offset': 3})
items = iterate_over_pageable_resource(resource_func, {'query_params': {'offset': 3}})
assert ['foo'] == list(items)
resource_func.assert_has_calls([
call(query_params={'offset': 3, 'limit': 10}),
call(query_params={'offset': 13, 'limit': 10})
call(params={'query_params': {'offset': 3, 'limit': 10}}),
])
def test_iterate_over_pageable_resource_should_pass_with_string_offset_and_limit(self):
@ -138,10 +380,191 @@ class TestIterateOverPageableResource(object):
{'items': []},
])
items = iterate_over_pageable_resource(resource_func, {'offset': '1', 'limit': '1'})
items = iterate_over_pageable_resource(resource_func, {'query_params': {'offset': '1', 'limit': '1'}})
assert ['foo'] == list(items)
resource_func.assert_has_calls([
call(query_params={'offset': '1', 'limit': '1'}),
call(query_params={'offset': 2, 'limit': '1'})
call(params={'query_params': {'offset': '1', 'limit': '1'}}),
call(params={'query_params': {'offset': 2, 'limit': '1'}})
])
def test_iterate_over_pageable_resource_raises_exception_when_server_returned_more_items_than_requested(self):
resource_func = mock.Mock(side_effect=[
{'items': ['foo', 'redundant_bar']},
{'items': []},
])
with pytest.raises(FtdUnexpectedResponse):
list(iterate_over_pageable_resource(resource_func, {'query_params': {'offset': '1', 'limit': '1'}}))
resource_func.assert_has_calls([
call(params={'query_params': {'offset': '1', 'limit': '1'}})
])
class TestOperationCheckerClass(unittest.TestCase):
def setUp(self):
self._checker = OperationChecker
def test_is_add_operation_positive(self):
operation_name = OperationNamePrefix.ADD + "Object"
operation_spec = {OperationField.METHOD: HTTPMethod.POST}
assert self._checker.is_add_operation(operation_name, operation_spec)
def test_is_add_operation_wrong_method_in_spec(self):
operation_name = OperationNamePrefix.ADD + "Object"
operation_spec = {OperationField.METHOD: HTTPMethod.GET}
assert not self._checker.is_add_operation(operation_name, operation_spec)
def test_is_add_operation_negative_wrong_operation_name(self):
operation_name = OperationNamePrefix.GET + "Object"
operation_spec = {OperationField.METHOD: HTTPMethod.POST}
assert not self._checker.is_add_operation(operation_name, operation_spec)
def test_is_edit_operation_positive(self):
operation_name = OperationNamePrefix.EDIT + "Object"
operation_spec = {OperationField.METHOD: HTTPMethod.PUT}
assert self._checker.is_edit_operation(operation_name, operation_spec)
def test_is_edit_operation_wrong_method_in_spec(self):
operation_name = OperationNamePrefix.EDIT + "Object"
operation_spec = {OperationField.METHOD: HTTPMethod.GET}
assert not self._checker.is_edit_operation(operation_name, operation_spec)
def test_is_edit_operation_negative_wrong_operation_name(self):
operation_name = OperationNamePrefix.GET + "Object"
operation_spec = {OperationField.METHOD: HTTPMethod.PUT}
assert not self._checker.is_edit_operation(operation_name, operation_spec)
def test_is_delete_operation_positive(self):
operation_name = OperationNamePrefix.DELETE + "Object"
operation_spec = {OperationField.METHOD: HTTPMethod.DELETE}
self.assertTrue(
self._checker.is_delete_operation(operation_name, operation_spec)
)
def test_is_delete_operation_wrong_method_in_spec(self):
operation_name = OperationNamePrefix.DELETE + "Object"
operation_spec = {OperationField.METHOD: HTTPMethod.GET}
assert not self._checker.is_delete_operation(operation_name, operation_spec)
def test_is_delete_operation_negative_wrong_operation_name(self):
operation_name = OperationNamePrefix.GET + "Object"
operation_spec = {OperationField.METHOD: HTTPMethod.DELETE}
assert not self._checker.is_delete_operation(operation_name, operation_spec)
def test_is_get_list_operation_positive(self):
operation_name = OperationNamePrefix.GET + "Object"
operation_spec = {
OperationField.METHOD: HTTPMethod.GET,
OperationField.RETURN_MULTIPLE_ITEMS: True
}
assert self._checker.is_get_list_operation(operation_name, operation_spec)
def test_is_get_list_operation_wrong_method_in_spec(self):
operation_name = OperationNamePrefix.GET + "Object"
operation_spec = {
OperationField.METHOD: HTTPMethod.POST,
OperationField.RETURN_MULTIPLE_ITEMS: True
}
assert not self._checker.is_get_list_operation(operation_name, operation_spec)
def test_is_get_list_operation_does_not_return_list(self):
operation_name = OperationNamePrefix.GET + "Object"
operation_spec = {
OperationField.METHOD: HTTPMethod.GET,
OperationField.RETURN_MULTIPLE_ITEMS: False
}
assert not self._checker.is_get_list_operation(operation_name, operation_spec)
def test_is_get_operation_positive(self):
operation_name = OperationNamePrefix.GET + "Object"
operation_spec = {
OperationField.METHOD: HTTPMethod.GET,
OperationField.RETURN_MULTIPLE_ITEMS: False
}
self.assertTrue(
self._checker.is_get_operation(operation_name, operation_spec)
)
def test_is_get_operation_wrong_method_in_spec(self):
operation_name = OperationNamePrefix.ADD + "Object"
operation_spec = {
OperationField.METHOD: HTTPMethod.POST,
OperationField.RETURN_MULTIPLE_ITEMS: False
}
assert not self._checker.is_get_operation(operation_name, operation_spec)
def test_is_get_operation_negative_when_returns_multiple(self):
operation_name = OperationNamePrefix.GET + "Object"
operation_spec = {
OperationField.METHOD: HTTPMethod.GET,
OperationField.RETURN_MULTIPLE_ITEMS: True
}
assert not self._checker.is_get_operation(operation_name, operation_spec)
def test_is_upsert_operation_positive(self):
operation_name = OperationNamePrefix.UPSERT + "Object"
assert self._checker.is_upsert_operation(operation_name)
def test_is_upsert_operation_with_wrong_operation_name(self):
for op_type in [OperationNamePrefix.ADD, OperationNamePrefix.GET, OperationNamePrefix.EDIT,
OperationNamePrefix.DELETE]:
operation_name = op_type + "Object"
assert not self._checker.is_upsert_operation(operation_name)
def test_is_find_by_filter_operation(self):
operation_name = OperationNamePrefix.GET + "Object"
operation_spec = {
OperationField.METHOD: HTTPMethod.GET,
OperationField.RETURN_MULTIPLE_ITEMS: True
}
params = {ParamName.FILTERS: 1}
self.assertTrue(
self._checker.is_find_by_filter_operation(
operation_name, params, operation_spec
)
)
def test_is_find_by_filter_operation_negative_when_filters_empty(self):
operation_name = OperationNamePrefix.GET + "Object"
operation_spec = {
OperationField.METHOD: HTTPMethod.GET,
OperationField.RETURN_MULTIPLE_ITEMS: True
}
params = {ParamName.FILTERS: None}
assert not self._checker.is_find_by_filter_operation(
operation_name, params, operation_spec
)
params = {}
assert not self._checker.is_find_by_filter_operation(
operation_name, params, operation_spec
)
@patch.object(OperationChecker, "is_add_operation")
@patch.object(OperationChecker, "is_edit_operation")
@patch.object(OperationChecker, "is_get_list_operation")
def test_is_upsert_operation_supported_operation(self, is_add_mock, is_edit_mock, is_get_list_mock):
operations_spec = {
'add': 1,
'edit': 1,
'getList': 1
}
is_add_mock.side_effect = [1, 0, 0]
is_edit_mock.side_effect = [1, 0, 0]
is_get_list_mock.side_effect = [1, 0, 0]
assert self._checker.is_upsert_operation_supported(operations_spec)
is_add_mock.side_effect = [1, 0, 0]
is_edit_mock.side_effect = [0, 1, 0]
is_get_list_mock.side_effect = [0, 0, 0]
assert not self._checker.is_upsert_operation_supported(operations_spec)
is_add_mock.side_effect = [1, 0, 0]
is_edit_mock.side_effect = [0, 0, 0]
is_get_list_mock.side_effect = [1, 0, 0]
assert not self._checker.is_upsert_operation_supported(operations_spec)

View file

@ -39,7 +39,7 @@ base = {
"$ref": "#/definitions/FQDNDNSResolution"},
"id": {"type": "string"},
"type": {"type": "string", "default": "networkobject"}},
"required": ["subType", "type", "value"]},
"required": ["subType", "type", "value", "name"]},
"NetworkObjectWrapper": {
"allOf": [{"$ref": "#/definitions/NetworkObject"}, {"$ref": "#/definitions/LinksWrapper"}]}
},
@ -140,14 +140,16 @@ class TestFdmSwaggerParser(unittest.TestCase):
'type': 'string'
}
}
}
},
'returnMultipleItems': True
},
'addNetworkObject': {
'method': HTTPMethod.POST,
'url': '/api/fdm/v2/object/networks',
'modelName': 'NetworkObject',
'parameters': {'path': {},
'query': {}}
'query': {}},
'returnMultipleItems': False
},
'getNetworkObject': {
'method': HTTPMethod.GET,
@ -161,7 +163,8 @@ class TestFdmSwaggerParser(unittest.TestCase):
}
},
'query': {}
}
},
'returnMultipleItems': False
},
'editNetworkObject': {
'method': HTTPMethod.PUT,
@ -175,12 +178,13 @@ class TestFdmSwaggerParser(unittest.TestCase):
}
},
'query': {}
}
},
'returnMultipleItems': False
},
'deleteNetworkObject': {
'method': HTTPMethod.DELETE,
'url': '/api/fdm/v2/object/networks/{objId}',
'modelName': None,
'modelName': 'NetworkObject',
'parameters': {
'path': {
'objId': {
@ -189,8 +193,173 @@ class TestFdmSwaggerParser(unittest.TestCase):
}
},
'query': {}
}
},
'returnMultipleItems': False
}
}
assert sorted(['NetworkObject', 'NetworkObjectWrapper']) == sorted(self.fdm_data['models'].keys())
assert expected_operations == self.fdm_data['operations']
assert {'NetworkObject': expected_operations} == self.fdm_data['model_operations']
def test_simple_object_with_documentation(self):
api_spec = copy.deepcopy(base)
docs = {
'definitions': {
'NetworkObject': {
'description': 'Description for Network Object',
'properties': {'name': 'Description for name field'}
}
},
'paths': {
'/object/networks': {
'get': {
'description': 'Description for getNetworkObjectList operation',
'parameters': [{'name': 'offset', 'description': 'Description for offset field'}]
},
'post': {'description': 'Description for addNetworkObject operation'}
}
}
}
self.fdm_data = FdmSwaggerParser().parse_spec(api_spec, docs)
assert 'Description for Network Object' == self.fdm_data['models']['NetworkObject']['description']
assert '' == self.fdm_data['models']['NetworkObjectWrapper']['description']
network_properties = self.fdm_data['models']['NetworkObject']['properties']
assert '' == network_properties['id']['description']
assert not network_properties['id']['required']
assert 'Description for name field' == network_properties['name']['description']
assert network_properties['name']['required']
ops = self.fdm_data['operations']
assert 'Description for getNetworkObjectList operation' == ops['getNetworkObjectList']['description']
assert 'Description for addNetworkObject operation' == ops['addNetworkObject']['description']
assert '' == ops['deleteNetworkObject']['description']
get_op_params = ops['getNetworkObjectList']['parameters']
assert 'Description for offset field' == get_op_params['query']['offset']['description']
assert '' == get_op_params['query']['limit']['description']
def test_model_operations_should_contain_all_operations(self):
data = {
'basePath': '/v2/',
'definitions': {
'Model1': {"type": "object"},
'Model2': {"type": "object"},
'Model3': {"type": "object"}
},
'paths': {
'path1': {
'get': {
'operationId': 'getSomeModelList',
"responses": {
"200": {"description": "",
"schema": {"type": "object",
"title": "NetworkObjectList",
"properties": {
"items": {
"type": "array",
"items": {
"$ref": "#/definitions/Model1"
}
}
}}
}
}
},
"post": {
"operationId": "addSomeModel",
"parameters": [{"in": "body",
"name": "body",
"schema": {"$ref": "#/definitions/Model2"}
}]}
},
'path2/{id}': {
"get": {"operationId": "getSomeModel",
"responses": {"200": {"description": "",
"schema": {"type": "object",
"$ref": "#/definitions/Model3"}},
}
},
"put": {"operationId": "editSomeModel",
"parameters": [{"in": "body",
"name": "body",
"schema": {"$ref": "#/definitions/Model1"}}
]},
"delete": {
"operationId": "deleteModel3",
}},
'path3': {
"delete": {
"operationId": "deleteNoneModel",
}
}
}
}
expected_operations = {
'getSomeModelList': {
'method': HTTPMethod.GET,
'url': '/v2/path1',
'modelName': 'Model1',
'returnMultipleItems': True
},
'addSomeModel': {
'method': HTTPMethod.POST,
'url': '/v2/path1',
'modelName': 'Model2',
'parameters': {
'path': {},
'query': {}
},
'returnMultipleItems': False
},
'getSomeModel': {
'method': HTTPMethod.GET,
'url': '/v2/path2/{id}',
'modelName': 'Model3',
'returnMultipleItems': False
},
'editSomeModel': {
'method': HTTPMethod.PUT,
'url': '/v2/path2/{id}',
'modelName': 'Model1',
'parameters': {
'path': {},
'query': {}
},
'returnMultipleItems': False
},
'deleteModel3': {
'method': HTTPMethod.DELETE,
'url': '/v2/path2/{id}',
'modelName': 'Model3',
'returnMultipleItems': False
},
'deleteNoneModel': {
'method': HTTPMethod.DELETE,
'url': '/v2/path3',
'modelName': None,
'returnMultipleItems': False
}
}
fdm_data = FdmSwaggerParser().parse_spec(data)
assert sorted(['Model1', 'Model2', 'Model3']) == sorted(fdm_data['models'].keys())
assert expected_operations == fdm_data['operations']
assert {
'Model1': {
'getSomeModelList': expected_operations['getSomeModelList'],
'editSomeModel': expected_operations['editSomeModel']
},
'Model2': {
'addSomeModel': expected_operations['addSomeModel']
},
'Model3': {
'getSomeModel': expected_operations['getSomeModel'],
'deleteModel3': expected_operations['deleteModel3']
},
None: {
'deleteNoneModel': expected_operations['deleteNoneModel']
}
} == fdm_data['model_operations']

View file

@ -15,7 +15,7 @@
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
import copy
import os
import unittest
@ -248,7 +248,7 @@ class TestFdmSwaggerValidator(unittest.TestCase):
def test_path_params_invalid_params(self):
self.url_params_invalid_params(method='validate_path_params', parameters_type='path')
def test_path_params_invalid_params(self):
def test_query_params_invalid_params(self):
self.url_params_invalid_params(method='validate_query_params', parameters_type='query')
@staticmethod
@ -384,7 +384,7 @@ class TestFdmSwaggerValidator(unittest.TestCase):
'someParam': 1.2,
'p_integer': "1",
'p_boolean': "",
'p_number': "2"
'p_number': "2.1"
}
valid, rez = getattr(validator, method)('getNetwork', data)
assert not valid
@ -405,20 +405,10 @@ class TestFdmSwaggerValidator(unittest.TestCase):
'expected_type': 'string',
'actually_value': 1.2
},
{
'path': 'p_integer',
'expected_type': 'integer',
'actually_value': "1"
},
{
'path': 'p_boolean',
'expected_type': 'boolean',
'actually_value': ""
},
{
'path': 'p_number',
'expected_type': 'number',
'actually_value': "2"
}
]
}) == sort_validator_rez(rez)
@ -603,6 +593,15 @@ class TestFdmSwaggerValidator(unittest.TestCase):
assert valid
assert rez is None
def test_pass_no_data_with_no_required_fields(self):
spec = copy.deepcopy(mock_data)
del spec['models']['NetworkObject']['required']
valid, rez = FdmSwaggerValidator(spec).validate_data('getNetworkObjectList', {})
assert valid
assert rez is None
def test_pass_all_fields_with_correct_data(self):
data = {
'id': 'id-di',
@ -818,7 +817,7 @@ class TestFdmSwaggerValidator(unittest.TestCase):
"f_string": False,
"f_number": "1",
"f_boolean": "",
"f_integer": 1.2
"f_integer": "1.2"
}
valid, rez = FdmSwaggerValidator(local_mock_data).validate_data('getdata', invalid_data)
@ -830,11 +829,6 @@ class TestFdmSwaggerValidator(unittest.TestCase):
'expected_type': 'string',
'actually_value': False
},
{
'path': 'f_number',
'expected_type': 'number',
'actually_value': "1"
},
{
'path': 'f_boolean',
'expected_type': 'boolean',
@ -843,7 +837,7 @@ class TestFdmSwaggerValidator(unittest.TestCase):
{
'path': 'f_integer',
'expected_type': 'integer',
'actually_value': 1.2
'actually_value': '1.2'
}
]
}) == sort_validator_rez(rez)

View file

@ -0,0 +1,762 @@
# Copyright (c) 2018 Cisco and/or its affiliates.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import absolute_import
import copy
import json
import unittest
import pytest
from ansible.compat.tests import mock
from ansible.module_utils.network.ftd.common import FtdServerError, HTTPMethod, ResponseParams, FtdConfigurationError
from ansible.module_utils.network.ftd.configuration import DUPLICATE_NAME_ERROR_MESSAGE, UNPROCESSABLE_ENTITY_STATUS, \
MULTIPLE_DUPLICATES_FOUND_ERROR, BaseConfigurationResource, FtdInvalidOperationNameError, QueryParams
from ansible.module_utils.network.ftd.fdm_swagger_client import ValidationError
ADD_RESPONSE = {'status': 'Object added'}
EDIT_RESPONSE = {'status': 'Object edited'}
DELETE_RESPONSE = {'status': 'Object deleted'}
GET_BY_FILTER_RESPONSE = [{'name': 'foo', 'description': 'bar'}]
ARBITRARY_RESPONSE = {'status': 'Arbitrary request sent'}
class TestUpsertOperationUnitTests(unittest.TestCase):
def setUp(self):
conn = mock.MagicMock()
self._resource = BaseConfigurationResource(conn)
def test_get_operation_name(self):
operation_a = mock.MagicMock()
operation_b = mock.MagicMock()
def checker_wrapper(expected_object):
def checker(obj, *args, **kwargs):
return obj == expected_object
return checker
operations = {
operation_a: "spec",
operation_b: "spec"
}
assert operation_a == self._resource._get_operation_name(checker_wrapper(operation_a), operations)
assert operation_b == self._resource._get_operation_name(checker_wrapper(operation_b), operations)
self.assertRaises(
FtdConfigurationError,
self._resource._get_operation_name, checker_wrapper(None), operations
)
@mock.patch.object(BaseConfigurationResource, "_get_operation_name")
@mock.patch.object(BaseConfigurationResource, "add_object")
def test_add_upserted_object(self, add_object_mock, get_operation_mock):
model_operations = mock.MagicMock()
params = mock.MagicMock()
add_op_name = get_operation_mock.return_value
assert add_object_mock.return_value == self._resource._add_upserted_object(model_operations, params)
get_operation_mock.assert_called_once_with(
self._resource._operation_checker.is_add_operation,
model_operations)
add_object_mock.assert_called_once_with(add_op_name, params)
@mock.patch.object(BaseConfigurationResource, "_get_operation_name")
@mock.patch.object(BaseConfigurationResource, "edit_object")
@mock.patch("ansible.module_utils.network.ftd.configuration.copy_identity_properties")
@mock.patch("ansible.module_utils.network.ftd.configuration._set_default")
def test_edit_upserted_object(self, _set_default_mock, copy_properties_mock, edit_object_mock, get_operation_mock):
model_operations = mock.MagicMock()
existing_object = mock.MagicMock()
params = {
'path_params': {},
'data': {}
}
result = self._resource._edit_upserted_object(model_operations, existing_object, params)
assert result == edit_object_mock.return_value
_set_default_mock.assert_has_calls([
mock.call(params, 'path_params', {}),
mock.call(params, 'data', {})
])
get_operation_mock.assert_called_once_with(
self._resource._operation_checker.is_edit_operation,
model_operations
)
copy_properties_mock.assert_called_once_with(
existing_object,
params['data']
)
edit_object_mock.assert_called_once_with(
get_operation_mock.return_value,
params
)
@mock.patch.object(BaseConfigurationResource, "get_operation_specs_by_model_name")
@mock.patch("ansible.module_utils.network.ftd.configuration.OperationChecker.is_upsert_operation_supported")
@mock.patch("ansible.module_utils.network.ftd.configuration._extract_model_from_upsert_operation")
def test_is_upsert_operation_supported(self, extract_model_mock, is_upsert_supported_mock, get_operation_spec_mock):
op_name = mock.MagicMock()
result = self._resource.is_upsert_operation_supported(op_name)
assert result == is_upsert_supported_mock.return_value
extract_model_mock.assert_called_once_with(op_name)
get_operation_spec_mock.assert_called_once_with(extract_model_mock.return_value)
is_upsert_supported_mock.assert_called_once_with(get_operation_spec_mock.return_value)
@mock.patch.object(BaseConfigurationResource, "is_upsert_operation_supported")
@mock.patch.object(BaseConfigurationResource, "get_operation_specs_by_model_name")
@mock.patch.object(BaseConfigurationResource, "_add_upserted_object")
@mock.patch.object(BaseConfigurationResource, "_edit_upserted_object")
@mock.patch("ansible.module_utils.network.ftd.configuration._extract_model_from_upsert_operation")
def test_upsert_object_succesfully_added(self, extract_model_mock, edit_mock, add_mock, get_operation_mock,
is_upsert_supported_mock):
op_name = mock.MagicMock()
params = mock.MagicMock()
is_upsert_supported_mock.return_value = True
result = self._resource.upsert_object(op_name, params)
assert result == add_mock.return_value
is_upsert_supported_mock.assert_called_once_with(op_name)
extract_model_mock.assert_called_once_with(op_name)
get_operation_mock.assert_called_once_with(extract_model_mock.return_value)
add_mock.assert_called_once_with(get_operation_mock.return_value, params)
edit_mock.assert_not_called()
@mock.patch.object(BaseConfigurationResource, "is_upsert_operation_supported")
@mock.patch.object(BaseConfigurationResource, "get_operation_specs_by_model_name")
@mock.patch.object(BaseConfigurationResource, "_add_upserted_object")
@mock.patch.object(BaseConfigurationResource, "_edit_upserted_object")
@mock.patch("ansible.module_utils.network.ftd.configuration._extract_model_from_upsert_operation")
def test_upsert_object_succesfully_edited(self, extract_model_mock, edit_mock, add_mock, get_operation_mock,
is_upsert_supported_mock):
op_name = mock.MagicMock()
params = mock.MagicMock()
is_upsert_supported_mock.return_value = True
error = FtdConfigurationError("Obj duplication error")
error.obj = mock.MagicMock()
add_mock.side_effect = error
result = self._resource.upsert_object(op_name, params)
assert result == edit_mock.return_value
is_upsert_supported_mock.assert_called_once_with(op_name)
extract_model_mock.assert_called_once_with(op_name)
get_operation_mock.assert_called_once_with(extract_model_mock.return_value)
add_mock.assert_called_once_with(get_operation_mock.return_value, params)
edit_mock.assert_called_once_with(get_operation_mock.return_value, error.obj, params)
@mock.patch.object(BaseConfigurationResource, "is_upsert_operation_supported")
@mock.patch.object(BaseConfigurationResource, "get_operation_specs_by_model_name")
@mock.patch.object(BaseConfigurationResource, "_add_upserted_object")
@mock.patch.object(BaseConfigurationResource, "_edit_upserted_object")
@mock.patch("ansible.module_utils.network.ftd.configuration._extract_model_from_upsert_operation")
def test_upsert_object_not_supported(self, extract_model_mock, edit_mock, add_mock, get_operation_mock,
is_upsert_supported_mock):
op_name = mock.MagicMock()
params = mock.MagicMock()
is_upsert_supported_mock.return_value = False
self.assertRaises(
FtdInvalidOperationNameError,
self._resource.upsert_object, op_name, params
)
is_upsert_supported_mock.assert_called_once_with(op_name)
extract_model_mock.assert_not_called()
get_operation_mock.assert_not_called()
add_mock.assert_not_called()
edit_mock.assert_not_called()
@mock.patch.object(BaseConfigurationResource, "is_upsert_operation_supported")
@mock.patch.object(BaseConfigurationResource, "get_operation_specs_by_model_name")
@mock.patch.object(BaseConfigurationResource, "_add_upserted_object")
@mock.patch.object(BaseConfigurationResource, "_edit_upserted_object")
@mock.patch("ansible.module_utils.network.ftd.configuration._extract_model_from_upsert_operation")
def test_upsert_object_neither_added_nor_edited(self, extract_model_mock, edit_mock, add_mock, get_operation_mock,
is_upsert_supported_mock):
op_name = mock.MagicMock()
params = mock.MagicMock()
is_upsert_supported_mock.return_value = True
error = FtdConfigurationError("Obj duplication error")
error.obj = mock.MagicMock()
add_mock.side_effect = error
edit_mock.side_effect = FtdConfigurationError("Some object edit error")
self.assertRaises(
FtdConfigurationError,
self._resource.upsert_object, op_name, params
)
is_upsert_supported_mock.assert_called_once_with(op_name)
extract_model_mock.assert_called_once_with(op_name)
get_operation_mock.assert_called_once_with(extract_model_mock.return_value)
add_mock.assert_called_once_with(get_operation_mock.return_value, params)
edit_mock.assert_called_once_with(get_operation_mock.return_value, error.obj, params)
@mock.patch.object(BaseConfigurationResource, "is_upsert_operation_supported")
@mock.patch.object(BaseConfigurationResource, "get_operation_specs_by_model_name")
@mock.patch.object(BaseConfigurationResource, "_add_upserted_object")
@mock.patch.object(BaseConfigurationResource, "_edit_upserted_object")
@mock.patch("ansible.module_utils.network.ftd.configuration._extract_model_from_upsert_operation")
def test_upsert_object_with_fatal_error_during_add(self, extract_model_mock, edit_mock, add_mock,
get_operation_mock, is_upsert_supported_mock):
op_name = mock.MagicMock()
params = mock.MagicMock()
is_upsert_supported_mock.return_value = True
error = FtdConfigurationError("Obj duplication error")
add_mock.side_effect = error
self.assertRaises(
FtdConfigurationError,
self._resource.upsert_object, op_name, params
)
is_upsert_supported_mock.assert_called_once_with(op_name)
extract_model_mock.assert_called_once_with(op_name)
get_operation_mock.assert_called_once_with(extract_model_mock.return_value)
add_mock.assert_called_once_with(get_operation_mock.return_value, params)
edit_mock.assert_not_called()
# functional tests below
class TestUpsertOperationFunctionalTests(object):
@pytest.fixture(autouse=True)
def connection_mock(self, mocker):
connection_class_mock = mocker.patch('ansible.modules.network.ftd.ftd_configuration.Connection')
connection_instance = connection_class_mock.return_value
connection_instance.validate_data.return_value = True, None
connection_instance.validate_query_params.return_value = True, None
connection_instance.validate_path_params.return_value = True, None
return connection_instance
def test_module_should_create_object_when_upsert_operation_and_object_does_not_exist(self, connection_mock):
url = '/test'
operations = {
'getObjectList': {
'method': HTTPMethod.GET,
'url': url,
'modelName': 'Object',
'returnMultipleItems': True},
'addObject': {
'method': HTTPMethod.POST,
'modelName': 'Object',
'url': url},
'editObject': {
'method': HTTPMethod.PUT,
'modelName': 'Object',
'url': '/test/{objId}'},
'otherObjectOperation': {
'method': HTTPMethod.GET,
'modelName': 'Object',
'url': '/test/{objId}',
'returnMultipleItems': False
}
}
def get_operation_spec(name):
return operations[name]
connection_mock.get_operation_spec = get_operation_spec
connection_mock.get_operation_specs_by_model_name.return_value = operations
connection_mock.send_request.return_value = {
ResponseParams.SUCCESS: True,
ResponseParams.RESPONSE: ADD_RESPONSE
}
params = {
'operation': 'upsertObject',
'data': {'id': '123', 'name': 'testObject', 'type': 'object'},
'path_params': {'objId': '123'},
'register_as': 'test_var'
}
result = self._resource_execute_operation(params, connection=connection_mock)
connection_mock.send_request.assert_called_once_with(url_path=url,
http_method=HTTPMethod.POST,
path_params=params['path_params'],
query_params={},
body_params=params['data'])
assert ADD_RESPONSE == result
# test when object exists but with different fields(except id)
def test_module_should_update_object_when_upsert_operation_and_object_exists(self, connection_mock):
url = '/test'
obj_id = '456'
version = 'test_version'
url_with_id_templ = '{0}/{1}'.format(url, '{objId}')
new_value = '0000'
old_value = '1111'
params = {
'operation': 'upsertObject',
'data': {'name': 'testObject', 'value': new_value, 'type': 'object'},
'register_as': 'test_var'
}
def request_handler(url_path=None, http_method=None, body_params=None, path_params=None, query_params=None):
if http_method == HTTPMethod.POST:
assert url_path == url
assert body_params == params['data']
assert query_params == {}
assert path_params == {}
return {
ResponseParams.SUCCESS: False,
ResponseParams.RESPONSE: DUPLICATE_NAME_ERROR_MESSAGE,
ResponseParams.STATUS_CODE: UNPROCESSABLE_ENTITY_STATUS
}
elif http_method == HTTPMethod.GET:
is_get_list_req = url_path == url
is_get_req = url_path == url_with_id_templ
assert is_get_req or is_get_list_req
if is_get_list_req:
assert body_params == {}
assert query_params == {QueryParams.FILTER: 'name:testObject', 'limit': 10, 'offset': 0}
assert path_params == {}
elif is_get_req:
assert body_params == {}
assert query_params == {}
assert path_params == {'objId': obj_id}
return {
ResponseParams.SUCCESS: True,
ResponseParams.RESPONSE: {
'items': [
{'name': 'testObject', 'value': old_value, 'type': 'object', 'id': obj_id,
'version': version}
]
}
}
elif http_method == HTTPMethod.PUT:
assert url_path == url_with_id_templ
return {
ResponseParams.SUCCESS: True,
ResponseParams.RESPONSE: body_params
}
else:
assert False
operations = {
'getObjectList': {'method': HTTPMethod.GET, 'url': url, 'modelName': 'Object', 'returnMultipleItems': True},
'addObject': {'method': HTTPMethod.POST, 'modelName': 'Object', 'url': url},
'editObject': {'method': HTTPMethod.PUT, 'modelName': 'Object', 'url': url_with_id_templ},
'otherObjectOperation': {
'method': HTTPMethod.GET,
'modelName': 'Object',
'url': url_with_id_templ,
'returnMultipleItems': False}
}
def get_operation_spec(name):
return operations[name]
connection_mock.get_operation_spec = get_operation_spec
connection_mock.get_operation_specs_by_model_name.return_value = operations
connection_mock.send_request = request_handler
expected_val = {'name': 'testObject', 'value': new_value, 'type': 'object', 'id': obj_id, 'version': version}
result = self._resource_execute_operation(params, connection=connection_mock)
assert expected_val == result
# test when object exists and all fields have the same value
def test_module_should_not_update_object_when_upsert_operation_and_object_exists_with_the_same_fields(
self, connection_mock):
url = '/test'
url_with_id_templ = '{0}/{1}'.format(url, '{objId}')
params = {
'operation': 'upsertObject',
'data': {'name': 'testObject', 'value': '3333', 'type': 'object'},
'register_as': 'test_var'
}
expected_val = copy.deepcopy(params['data'])
expected_val['version'] = 'test_version'
expected_val['id'] = 'test_id'
def request_handler(url_path=None, http_method=None, body_params=None, path_params=None, query_params=None):
if http_method == HTTPMethod.POST:
assert url_path == url
assert body_params == params['data']
assert query_params == {}
assert path_params == {}
return {
ResponseParams.SUCCESS: False,
ResponseParams.RESPONSE: DUPLICATE_NAME_ERROR_MESSAGE,
ResponseParams.STATUS_CODE: UNPROCESSABLE_ENTITY_STATUS
}
elif http_method == HTTPMethod.GET:
assert url_path == url
assert body_params == {}
assert query_params == {QueryParams.FILTER: 'name:testObject', 'limit': 10, 'offset': 0}
assert path_params == {}
return {
ResponseParams.SUCCESS: True,
ResponseParams.RESPONSE: {
'items': [expected_val]
}
}
else:
assert False
operations = {
'getObjectList': {'method': HTTPMethod.GET, 'modelName': 'Object', 'url': url, 'returnMultipleItems': True},
'addObject': {'method': HTTPMethod.POST, 'modelName': 'Object', 'url': url},
'editObject': {'method': HTTPMethod.PUT, 'modelName': 'Object', 'url': url_with_id_templ},
'otherObjectOperation': {
'method': HTTPMethod.GET,
'modelName': 'Object',
'url': url_with_id_templ,
'returnMultipleItems': False}
}
def get_operation_spec(name):
return operations[name]
connection_mock.get_operation_spec = get_operation_spec
connection_mock.get_operation_specs_by_model_name.return_value = operations
connection_mock.send_request = request_handler
result = self._resource_execute_operation(params, connection=connection_mock)
assert expected_val == result
def test_module_should_fail_when_upsert_operation_is_not_supported(self, connection_mock):
connection_mock.get_operation_specs_by_model_name.return_value = {
'addObject': {'method': HTTPMethod.POST, 'modelName': 'Object', 'url': '/test'},
'editObject': {'method': HTTPMethod.PUT, 'modelName': 'Object', 'url': '/test/{objId}'},
'otherObjectOperation': {
'method': HTTPMethod.GET,
'modelName': 'Object',
'url': '/test/{objId}',
'returnMultipleItems': False}
}
operation_name = 'upsertObject'
params = {
'operation': operation_name,
'data': {'id': '123', 'name': 'testObject', 'type': 'object'},
'path_params': {'objId': '123'},
'register_as': 'test_var'
}
result = self._resource_execute_operation_with_expected_failure(
expected_exception_class=FtdInvalidOperationNameError,
params=params, connection=connection_mock)
connection_mock.send_request.assert_not_called()
assert operation_name == result.operation_name
# when create operation raised FtdConfigurationError exception without id and version
def test_module_should_fail_when_upsert_operation_and_failed_create_without_id_and_version(self, connection_mock):
url = '/test'
url_with_id_templ = '{0}/{1}'.format(url, '{objId}')
params = {
'operation': 'upsertObject',
'data': {'name': 'testObject', 'value': '3333', 'type': 'object'},
'register_as': 'test_var'
}
def request_handler(url_path=None, http_method=None, body_params=None, path_params=None, query_params=None):
if http_method == HTTPMethod.POST:
assert url_path == url
assert body_params == params['data']
assert query_params == {}
assert path_params == {}
return {
ResponseParams.SUCCESS: False,
ResponseParams.RESPONSE: DUPLICATE_NAME_ERROR_MESSAGE,
ResponseParams.STATUS_CODE: UNPROCESSABLE_ENTITY_STATUS
}
elif http_method == HTTPMethod.GET:
assert url_path == url
assert body_params == {}
assert query_params == {QueryParams.FILTER: 'name:testObject', 'limit': 10, 'offset': 0}
assert path_params == {}
return {
ResponseParams.SUCCESS: True,
ResponseParams.RESPONSE: {
'items': []
}
}
else:
assert False
operations = {
'getObjectList': {'method': HTTPMethod.GET, 'modelName': 'Object', 'url': url, 'returnMultipleItems': True},
'addObject': {'method': HTTPMethod.POST, 'modelName': 'Object', 'url': url},
'editObject': {'method': HTTPMethod.PUT, 'modelName': 'Object', 'url': url_with_id_templ},
'otherObjectOperation': {
'method': HTTPMethod.GET,
'modelName': 'Object',
'url': url_with_id_templ,
'returnMultipleItems': False}
}
def get_operation_spec(name):
return operations[name]
connection_mock.get_operation_spec = get_operation_spec
connection_mock.get_operation_specs_by_model_name.return_value = operations
connection_mock.send_request = request_handler
result = self._resource_execute_operation_with_expected_failure(
expected_exception_class=FtdServerError,
params=params, connection=connection_mock)
assert result.code == 422
assert result.response == 'Validation failed due to a duplicate name'
def test_module_should_fail_when_upsert_operation_and_failed_update_operation(self, connection_mock):
url = '/test'
obj_id = '456'
version = 'test_version'
url_with_id_templ = '{0}/{1}'.format(url, '{objId}')
error_code = 404
new_value = '0000'
old_value = '1111'
params = {
'operation': 'upsertObject',
'data': {'name': 'testObject', 'value': new_value, 'type': 'object'},
'register_as': 'test_var'
}
error_msg = 'test error'
def request_handler(url_path=None, http_method=None, body_params=None, path_params=None, query_params=None):
if http_method == HTTPMethod.POST:
assert url_path == url
assert body_params == params['data']
assert query_params == {}
assert path_params == {}
return {
ResponseParams.SUCCESS: False,
ResponseParams.RESPONSE: DUPLICATE_NAME_ERROR_MESSAGE,
ResponseParams.STATUS_CODE: UNPROCESSABLE_ENTITY_STATUS
}
elif http_method == HTTPMethod.GET:
is_get_list_req = url_path == url
is_get_req = url_path == url_with_id_templ
assert is_get_req or is_get_list_req
if is_get_list_req:
assert body_params == {}
assert query_params == {QueryParams.FILTER: 'name:testObject', 'limit': 10, 'offset': 0}
elif is_get_req:
assert body_params == {}
assert query_params == {}
assert path_params == {'objId': obj_id}
return {
ResponseParams.SUCCESS: True,
ResponseParams.RESPONSE: {
'items': [
{'name': 'testObject', 'value': old_value, 'type': 'object', 'id': obj_id,
'version': version}
]
}
}
elif http_method == HTTPMethod.PUT:
assert url_path == url_with_id_templ
raise FtdServerError(error_msg, error_code)
else:
assert False
operations = {
'getObjectList': {'method': HTTPMethod.GET, 'modelName': 'Object', 'url': url, 'returnMultipleItems': True},
'addObject': {'method': HTTPMethod.POST, 'modelName': 'Object', 'url': url},
'editObject': {'method': HTTPMethod.PUT, 'modelName': 'Object', 'url': url_with_id_templ},
'otherObjectOperation': {
'method': HTTPMethod.GET,
'modelName': 'Object',
'url': url_with_id_templ,
'returnMultipleItems': False}
}
def get_operation_spec(name):
return operations[name]
connection_mock.get_operation_spec = get_operation_spec
connection_mock.get_operation_specs_by_model_name.return_value = operations
connection_mock.send_request = request_handler
result = self._resource_execute_operation_with_expected_failure(
expected_exception_class=FtdServerError,
params=params, connection=connection_mock)
assert result.code == error_code
assert result.response == error_msg
def test_module_should_fail_when_upsert_operation_and_invalid_data_for_create_operation(self, connection_mock):
new_value = '0000'
params = {
'operation': 'upsertObject',
'data': {'name': 'testObject', 'value': new_value, 'type': 'object'},
'register_as': 'test_var'
}
connection_mock.send_request.assert_not_called()
operations = {
'getObjectList': {
'method': HTTPMethod.GET,
'modelName': 'Object',
'url': 'sd',
'returnMultipleItems': True},
'addObject': {'method': HTTPMethod.POST, 'modelName': 'Object', 'url': 'sdf'},
'editObject': {'method': HTTPMethod.PUT, 'modelName': 'Object', 'url': 'sadf'},
'otherObjectOperation': {
'method': HTTPMethod.GET,
'modelName': 'Object',
'url': 'sdfs',
'returnMultipleItems': False}
}
def get_operation_spec(name):
return operations[name]
connection_mock.get_operation_spec = get_operation_spec
connection_mock.get_operation_specs_by_model_name.return_value = operations
report = {
'required': ['objects[0].type'],
'invalid_type': [
{
'path': 'objects[3].id',
'expected_type': 'string',
'actually_value': 1
}
]
}
connection_mock.validate_data.return_value = (False, json.dumps(report, sort_keys=True, indent=4))
key = 'Invalid data provided'
result = self._resource_execute_operation_with_expected_failure(
expected_exception_class=ValidationError,
params=params, connection=connection_mock)
assert len(result.args) == 1
assert key in result.args[0]
assert json.loads(result.args[0][key]) == {
'invalid_type': [{'actually_value': 1, 'expected_type': 'string', 'path': 'objects[3].id'}],
'required': ['objects[0].type']
}
def test_module_should_fail_when_upsert_operation_and_few_objects_found_by_filter(self, connection_mock):
url = '/test'
url_with_id_templ = '{0}/{1}'.format(url, '{objId}')
sample_obj = {'name': 'testObject', 'value': '3333', 'type': 'object'}
params = {
'operation': 'upsertObject',
'data': sample_obj,
'register_as': 'test_var'
}
def request_handler(url_path=None, http_method=None, body_params=None, path_params=None, query_params=None):
if http_method == HTTPMethod.POST:
assert url_path == url
assert body_params == params['data']
assert query_params == {}
assert path_params == {}
return {
ResponseParams.SUCCESS: False,
ResponseParams.RESPONSE: DUPLICATE_NAME_ERROR_MESSAGE,
ResponseParams.STATUS_CODE: UNPROCESSABLE_ENTITY_STATUS
}
elif http_method == HTTPMethod.GET:
assert url_path == url
assert body_params == {}
assert query_params == {QueryParams.FILTER: 'name:testObject', 'limit': 10, 'offset': 0}
assert path_params == {}
return {
ResponseParams.SUCCESS: True,
ResponseParams.RESPONSE: {
'items': [sample_obj, sample_obj]
}
}
else:
assert False
operations = {
'getObjectList': {'method': HTTPMethod.GET, 'modelName': 'Object', 'url': url, 'returnMultipleItems': True},
'addObject': {'method': HTTPMethod.POST, 'modelName': 'Object', 'url': url},
'editObject': {'method': HTTPMethod.PUT, 'modelName': 'Object', 'url': url_with_id_templ},
'otherObjectOperation': {
'method': HTTPMethod.GET,
'modelName': 'Object',
'url': url_with_id_templ,
'returnMultipleItems': False}
}
def get_operation_spec(name):
return operations[name]
connection_mock.get_operation_spec = get_operation_spec
connection_mock.get_operation_specs_by_model_name.return_value = operations
connection_mock.send_request = request_handler
result = self._resource_execute_operation_with_expected_failure(
expected_exception_class=FtdConfigurationError,
params=params, connection=connection_mock)
assert result.msg is MULTIPLE_DUPLICATES_FOUND_ERROR
assert result.obj is None
@staticmethod
def _resource_execute_operation(params, connection):
resource = BaseConfigurationResource(connection)
op_name = params['operation']
resp = resource.execute_operation(op_name, params)
return resp
def _resource_execute_operation_with_expected_failure(self, expected_exception_class, params, connection):
with pytest.raises(expected_exception_class) as ex:
self._resource_execute_operation(params, connection)
# 'ex' here is the instance of '_pytest._code.code.ExceptionInfo' but not <expected_exception_class>
# actual instance of <expected_exception_class> is in the value attribute of 'ex'. That's why we should return
# 'ex.value' here, so it can be checked in a test later.
return ex.value

View file

@ -18,20 +18,14 @@
from __future__ import absolute_import
import json
import pytest
from ansible.module_utils import basic
from ansible.module_utils.network.ftd.common import HTTPMethod, FtdConfigurationError, FtdServerError
from ansible.modules.network.ftd import ftd_configuration
from units.modules.utils import set_module_args, exit_json, fail_json, AnsibleFailJson, AnsibleExitJson
ADD_RESPONSE = {'status': 'Object added'}
EDIT_RESPONSE = {'status': 'Object edited'}
DELETE_RESPONSE = {'status': 'Object deleted'}
GET_BY_FILTER_RESPONSE = [{'name': 'foo', 'description': 'bar'}]
ARBITRARY_RESPONSE = {'status': 'Arbitrary request sent'}
from ansible.module_utils import basic
from ansible.module_utils.network.ftd.common import FtdConfigurationError, FtdServerError, FtdUnexpectedResponse
from ansible.module_utils.network.ftd.configuration import FtdInvalidOperationNameError, CheckModeException
from ansible.module_utils.network.ftd.fdm_swagger_client import ValidationError
from ansible.modules.network.ftd import ftd_configuration
class TestFtdConfiguration(object):
@ -41,295 +35,80 @@ class TestFtdConfiguration(object):
def module_mock(self, mocker):
return mocker.patch.multiple(basic.AnsibleModule, exit_json=exit_json, fail_json=fail_json)
@pytest.fixture
@pytest.fixture(autouse=True)
def connection_mock(self, mocker):
connection_class_mock = mocker.patch('ansible.modules.network.ftd.ftd_configuration.Connection')
connection_instance = connection_class_mock.return_value
connection_instance.validate_data.return_value = True, None
connection_instance.validate_query_params.return_value = True, None
connection_instance.validate_path_params.return_value = True, None
return connection_instance
return connection_class_mock.return_value
@pytest.fixture
def resource_mock(self, mocker):
resource_class_mock = mocker.patch('ansible.modules.network.ftd.ftd_configuration.BaseConfigurationResource')
resource_instance = resource_class_mock.return_value
resource_instance.add_object.return_value = ADD_RESPONSE
resource_instance.edit_object.return_value = EDIT_RESPONSE
resource_instance.delete_object.return_value = DELETE_RESPONSE
resource_instance.send_request.return_value = ARBITRARY_RESPONSE
resource_instance.get_objects_by_filter.return_value = GET_BY_FILTER_RESPONSE
return resource_instance
return resource_instance.execute_operation
def test_module_should_fail_without_operation_arg(self):
set_module_args({})
def test_module_should_fail_when_ftd_invalid_operation_name_error(self, resource_mock):
operation_name = 'test name'
resource_mock.side_effect = FtdInvalidOperationNameError(operation_name)
with pytest.raises(AnsibleFailJson) as ex:
self.module.main()
assert 'missing required arguments: operation' in str(ex)
def test_module_should_fail_when_no_operation_spec_found(self, connection_mock):
connection_mock.get_operation_spec.return_value = None
set_module_args({'operation': 'nonExistingOperation'})
with pytest.raises(AnsibleFailJson) as ex:
self.module.main()
assert 'Invalid operation name provided: nonExistingOperation' in str(ex)
def test_module_should_add_object_when_add_operation(self, connection_mock, resource_mock):
connection_mock.get_operation_spec.return_value = {
'method': HTTPMethod.POST,
'url': '/object'
}
params = {
'operation': 'addObject',
'data': {'name': 'testObject', 'type': 'object'}
}
result = self._run_module(params)
assert ADD_RESPONSE == result['response']
resource_mock.add_object.assert_called_with(connection_mock.get_operation_spec.return_value['url'],
params['data'], None, None)
def test_module_should_edit_object_when_edit_operation(self, connection_mock, resource_mock):
connection_mock.get_operation_spec.return_value = {
'method': HTTPMethod.PUT,
'url': '/object/{objId}'
}
params = {
'operation': 'editObject',
'data': {'id': '123', 'name': 'testObject', 'type': 'object'},
'path_params': {'objId': '123'}
}
result = self._run_module(params)
assert EDIT_RESPONSE == result['response']
resource_mock.edit_object.assert_called_with(connection_mock.get_operation_spec.return_value['url'],
params['data'],
params['path_params'], None)
def test_module_should_delete_object_when_delete_operation(self, connection_mock, resource_mock):
connection_mock.get_operation_spec.return_value = {
'method': HTTPMethod.DELETE,
'url': '/object/{objId}'
}
params = {
'operation': 'deleteObject',
'path_params': {'objId': '123'}
}
result = self._run_module(params)
assert DELETE_RESPONSE == result['response']
resource_mock.delete_object.assert_called_with(connection_mock.get_operation_spec.return_value['url'],
params['path_params'])
def test_module_should_get_objects_by_filter_when_find_by_filter_operation(self, connection_mock, resource_mock):
connection_mock.get_operation_spec.return_value = {
'method': HTTPMethod.GET,
'url': '/objects'
}
params = {
'operation': 'getObjectList',
'filters': {'name': 'foo'}
}
result = self._run_module(params)
assert GET_BY_FILTER_RESPONSE == result['response']
resource_mock.get_objects_by_filter.assert_called_with(connection_mock.get_operation_spec.return_value['url'],
params['filters'],
None, None)
def test_module_should_send_request_when_arbitrary_operation(self, connection_mock, resource_mock):
connection_mock.get_operation_spec.return_value = {
'method': HTTPMethod.GET,
'url': '/object/status/{objId}'
}
params = {
'operation': 'checkStatus',
'path_params': {'objId': '123'}
}
result = self._run_module(params)
assert ARBITRARY_RESPONSE == result['response']
resource_mock.send_request.assert_called_with(connection_mock.get_operation_spec.return_value['url'],
HTTPMethod.GET, None,
params['path_params'], None)
def test_module_should_fail_when_operation_raises_configuration_error(self, connection_mock, resource_mock):
connection_mock.get_operation_spec.return_value = {'method': HTTPMethod.GET, 'url': '/test'}
resource_mock.send_request.side_effect = FtdConfigurationError('Foo error.')
result = self._run_module_with_fail_json({'operation': 'failure'})
result = self._run_module_with_fail_json({'operation': operation_name})
assert result['failed']
assert 'Failed to execute failure operation because of the configuration error: Foo error.' == result['msg']
assert 'Invalid operation name provided: %s' % operation_name == result['msg']
def test_module_should_fail_when_operation_raises_server_error(self, connection_mock, resource_mock):
connection_mock.get_operation_spec.return_value = {'method': HTTPMethod.GET, 'url': '/test'}
resource_mock.send_request.side_effect = FtdServerError({'error': 'foo'}, 500)
def test_module_should_fail_when_ftd_configuration_error(self, resource_mock):
operation_name = 'test name'
msg = 'Foo error.'
resource_mock.side_effect = FtdConfigurationError(msg)
result = self._run_module_with_fail_json({'operation': 'failure'})
result = self._run_module_with_fail_json({'operation': operation_name})
assert result['failed']
assert 'Server returned an error trying to execute failure operation. Status code: 500. ' \
'Server response: {\'error\': \'foo\'}' == result['msg']
assert 'Failed to execute %s operation because of the configuration error: %s' % (operation_name, msg) == \
result['msg']
def test_module_should_fail_if_validation_error_in_data(self, connection_mock):
connection_mock.get_operation_spec.return_value = {'method': HTTPMethod.POST, 'url': '/test'}
report = {
'required': ['objects[0].type'],
'invalid_type': [
{
'path': 'objects[3].id',
'expected_type': 'string',
'actually_value': 1
}
]
}
connection_mock.validate_data.return_value = (False, json.dumps(report, sort_keys=True, indent=4))
def test_module_should_fail_when_ftd_server_error(self, resource_mock):
operation_name = 'test name'
code = 500
response = {'error': 'foo'}
resource_mock.side_effect = FtdServerError(response, code)
result = self._run_module_with_fail_json({
'operation': 'test',
'data': {}
})
key = 'Invalid data provided'
assert result['msg'][key]
result['msg'][key] = json.loads(result['msg'][key])
assert result == {
'msg':
{key: {
'invalid_type': [{'actually_value': 1, 'expected_type': 'string', 'path': 'objects[3].id'}],
'required': ['objects[0].type']
}},
'failed': True}
result = self._run_module_with_fail_json({'operation': operation_name})
assert result['failed']
assert 'Server returned an error trying to execute %s operation. Status code: %s. ' \
'Server response: %s' % (operation_name, code, response) == \
result['msg']
def test_module_should_fail_if_validation_error_in_query_params(self, connection_mock):
connection_mock.get_operation_spec.return_value = {'method': HTTPMethod.GET, 'url': '/test'}
report = {
'required': ['objects[0].type'],
'invalid_type': [
{
'path': 'objects[3].id',
'expected_type': 'string',
'actually_value': 1
}
]
}
connection_mock.validate_query_params.return_value = (False, json.dumps(report, sort_keys=True, indent=4))
def test_module_should_fail_when_validation_error(self, resource_mock):
operation_name = 'test name'
msg = 'Foo error.'
resource_mock.side_effect = ValidationError(msg)
result = self._run_module_with_fail_json({
'operation': 'test',
'data': {}
})
key = 'Invalid query_params provided'
assert result['msg'][key]
result['msg'][key] = json.loads(result['msg'][key])
result = self._run_module_with_fail_json({'operation': operation_name})
assert result['failed']
assert msg == result['msg']
assert result == {'msg': {key: {
'invalid_type': [{'actually_value': 1, 'expected_type': 'string', 'path': 'objects[3].id'}],
'required': ['objects[0].type']}}, 'failed': True}
def test_module_should_fail_when_unexpected_server_response(self, resource_mock):
operation_name = 'test name'
msg = 'Foo error.'
resource_mock.side_effect = FtdUnexpectedResponse(msg)
def test_module_should_fail_if_validation_error_in_path_params(self, connection_mock):
connection_mock.get_operation_spec.return_value = {'method': HTTPMethod.GET, 'url': '/test'}
report = {
'path_params': {
'required': ['objects[0].type'],
'invalid_type': [
{
'path': 'objects[3].id',
'expected_type': 'string',
'actually_value': 1
}
]
}
}
connection_mock.validate_path_params.return_value = (False, json.dumps(report, sort_keys=True, indent=4))
result = self._run_module_with_fail_json({'operation': operation_name})
result = self._run_module_with_fail_json({
'operation': 'test',
'data': {}
})
key = 'Invalid path_params provided'
assert result['msg'][key]
result['msg'][key] = json.loads(result['msg'][key])
assert result['failed']
assert msg == result['msg']
assert result == {'msg': {key: {
'path_params': {
'invalid_type': [{'actually_value': 1, 'expected_type': 'string', 'path': 'objects[3].id'}],
'required': ['objects[0].type']}}}, 'failed': True}
def test_module_should_fail_when_check_mode_exception(self, resource_mock):
operation_name = 'test name'
msg = 'Foo error.'
resource_mock.side_effect = CheckModeException(msg)
def test_module_should_fail_if_validation_error_in_all_params(self, connection_mock):
connection_mock.get_operation_spec.return_value = {'method': HTTPMethod.POST, 'url': '/test'}
report = {
'data': {
'required': ['objects[0].type'],
'invalid_type': [
{
'path': 'objects[3].id',
'expected_type': 'string',
'actually_value': 1
}
]
},
'path_params': {
'required': ['some_param'],
'invalid_type': [
{
'path': 'name',
'expected_type': 'string',
'actually_value': True
}
]
},
'query_params': {
'required': ['other_param'],
'invalid_type': [
{
'path': 'f_integer',
'expected_type': 'integer',
'actually_value': "test"
}
]
}
}
connection_mock.validate_data.return_value = (False, json.dumps(report['data'], sort_keys=True, indent=4))
connection_mock.validate_query_params.return_value = (False,
json.dumps(report['query_params'], sort_keys=True,
indent=4))
connection_mock.validate_path_params.return_value = (False,
json.dumps(report['path_params'], sort_keys=True,
indent=4))
result = self._run_module({'operation': operation_name})
assert not result['changed']
result = self._run_module_with_fail_json({
'operation': 'test',
'data': {}
})
key_data = 'Invalid data provided'
assert result['msg'][key_data]
result['msg'][key_data] = json.loads(result['msg'][key_data])
def test_module_should_run_successful(self, resource_mock):
operation_name = 'test name'
resource_mock.return_value = 'ok'
key_path_params = 'Invalid path_params provided'
assert result['msg'][key_path_params]
result['msg'][key_path_params] = json.loads(result['msg'][key_path_params])
key_query_params = 'Invalid query_params provided'
assert result['msg'][key_query_params]
result['msg'][key_query_params] = json.loads(result['msg'][key_query_params])
assert result == {'msg': {
key_data: {'invalid_type': [{'actually_value': 1, 'expected_type': 'string', 'path': 'objects[3].id'}],
'required': ['objects[0].type']},
key_path_params: {'invalid_type': [{'actually_value': True, 'expected_type': 'string', 'path': 'name'}],
'required': ['some_param']},
key_query_params: {
'invalid_type': [{'actually_value': 'test', 'expected_type': 'integer', 'path': 'f_integer'}],
'required': ['other_param']}}, 'failed': True}
result = self._run_module({'operation': operation_name})
assert result['response'] == 'ok'
def _run_module(self, module_args):
set_module_args(module_args)

View file

@ -39,9 +39,9 @@ class TestFtdFileUpload(object):
connection_class_mock = mocker.patch('ansible.modules.network.ftd.ftd_file_upload.Connection')
return connection_class_mock.return_value
@pytest.mark.parametrize("missing_arg", ['operation', 'fileToUpload'])
@pytest.mark.parametrize("missing_arg", ['operation', 'file_to_upload'])
def test_module_should_fail_without_required_args(self, missing_arg):
module_args = {'operation': 'uploadFile', 'fileToUpload': '/tmp/test.txt'}
module_args = {'operation': 'uploadFile', 'file_to_upload': '/tmp/test.txt'}
del module_args[missing_arg]
set_module_args(module_args)
@ -52,7 +52,7 @@ class TestFtdFileUpload(object):
def test_module_should_fail_when_no_operation_spec_found(self, connection_mock):
connection_mock.get_operation_spec.return_value = None
set_module_args({'operation': 'nonExistingUploadOperation', 'fileToUpload': '/tmp/test.txt'})
set_module_args({'operation': 'nonExistingUploadOperation', 'file_to_upload': '/tmp/test.txt'})
with pytest.raises(AnsibleFailJson) as ex:
self.module.main()
@ -67,7 +67,7 @@ class TestFtdFileUpload(object):
OperationField.URL: '/object/network',
OperationField.MODEL_NAME: 'NetworkObject'
}
set_module_args({'operation': 'nonUploadOperation', 'fileToUpload': '/tmp/test.txt'})
set_module_args({'operation': 'nonUploadOperation', 'file_to_upload': '/tmp/test.txt'})
with pytest.raises(AnsibleFailJson) as ex:
self.module.main()
@ -87,7 +87,7 @@ class TestFtdFileUpload(object):
set_module_args({
'operation': 'uploadFile',
'fileToUpload': '/tmp/test.txt'
'file_to_upload': '/tmp/test.txt'
})
with pytest.raises(AnsibleExitJson) as ex:
self.module.main()

View file

@ -17,12 +17,12 @@
#
import json
import os
from ansible.module_utils.six.moves.urllib.error import HTTPError
from ansible.compat.tests import mock
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import mock_open, patch
from ansible.errors import AnsibleConnectionFailure
from ansible.module_utils.connection import ConnectionError
@ -117,6 +117,15 @@ class TestFtdHttpApi(unittest.TestCase):
assert 'Server returned response without token info during connection authentication' in str(res.exception)
def test_login_raises_exception_when_http_error(self):
self.connection_mock.send.side_effect = HTTPError('http://testhost.com', 400, '', {},
StringIO('{"message": "Failed to authenticate user"}'))
with self.assertRaises(ConnectionError) as res:
self.ftd_plugin.login('foo', 'bar')
assert 'Failed to authenticate user' in str(res.exception)
def test_logout_should_revoke_tokens(self):
self.ftd_plugin.access_token = 'ACCESS_TOKEN_TO_REVOKE'
self.ftd_plugin.refresh_token = 'REFRESH_TOKEN_TO_REVOKE'
@ -185,6 +194,10 @@ class TestFtdHttpApi(unittest.TestCase):
def test_handle_httperror_should_not_retry_on_non_auth_errors(self):
assert not self.ftd_plugin.handle_httperror(HTTPError('http://testhost.com', 500, '', {}, None))
def test_handle_httperror_should_not_retry_when_ignoring_http_errors(self):
self.ftd_plugin._ignore_http_errors = True
assert not self.ftd_plugin.handle_httperror(HTTPError('http://testhost.com', 401, '', {}, None))
@patch('os.path.isdir', mock.Mock(return_value=False))
def test_download_file(self):
self.connection_mock.send.return_value = self._connection_response('File content')
@ -262,6 +275,44 @@ class TestFtdHttpApi(unittest.TestCase):
assert 'Specification for TestModel' == self.ftd_plugin.get_model_spec('TestModel')
assert self.ftd_plugin.get_model_spec('NonExistingTestModel') is None
@patch.object(FdmSwaggerParser, 'parse_spec')
def test_get_model_spec(self, parse_spec_mock):
self.connection_mock.send.return_value = self._connection_response(None)
operation1 = {'modelName': 'TestModel'}
op_model_name_is_none = {'modelName': None}
op_without_model_name = {'url': 'testUrl'}
parse_spec_mock.return_value = {
SpecProp.MODEL_OPERATIONS: {
'TestModel': {
'testOp1': operation1,
'testOp2': 'spec2'
},
'TestModel2': {
'testOp10': 'spec10',
'testOp20': 'spec20'
}
},
SpecProp.OPERATIONS: {
'testOp1': operation1,
'testOp10': {
'modelName': 'TestModel2'
},
'testOpWithoutModelName': op_without_model_name,
'testOpModelNameIsNone': op_model_name_is_none
}
}
assert {'testOp1': operation1, 'testOp2': 'spec2'} == self.ftd_plugin.get_operation_specs_by_model_name(
'TestModel')
assert None is self.ftd_plugin.get_operation_specs_by_model_name(
'testOpModelNameIsNone')
assert None is self.ftd_plugin.get_operation_specs_by_model_name(
'testOpWithoutModelName')
assert self.ftd_plugin.get_operation_specs_by_model_name('nonExistingOperation') is None
@staticmethod
def _connection_response(response, status=200):
response_mock = mock.Mock()