Add latest updates from FTD Ansible downstream repository. (#53638)

* Add latest updates from FTD Ansible downstream repository.
 - add a better implementation of the upsert operation;
 - add API version lookup functionality;
 - add filter which remove duplicated references from the list of references;
 - fix minor bugs.

* fix issues outlined by ansibot

* fix argument name for _check_enum_method
This commit is contained in:
Vitalii Kostenko 2019-04-01 15:38:01 +03:00 committed by Sumit Jaiswal
parent 71216cace5
commit 2176b53a55
15 changed files with 882 additions and 298 deletions

View file

@ -15,11 +15,11 @@
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
import re
from ansible.module_utils._text import to_text
from ansible.module_utils.common.collections import is_string
from ansible.module_utils.six import iteritems
INVALID_IDENTIFIER_SYMBOLS = r'[^a-zA-Z0-9_]'
@ -65,7 +65,7 @@ def construct_ansible_facts(response, params):
response_body = response['items'] if 'items' in response else response
if params.get('register_as'):
facts[params['register_as']] = response_body
elif 'name' in response_body and 'type' in response_body:
elif response_body.get('name') and response_body.get('type'):
object_name = re.sub(INVALID_IDENTIFIER_SYMBOLS, '_', response_body['name'].lower())
fact_name = '%s_%s' % (response_body['type'], object_name)
facts[fact_name] = response_body
@ -181,13 +181,58 @@ def equal_objects(d1, d2):
"""
Checks whether two objects are equal. Ignores special object properties (e.g. 'id', 'version') and
properties with None and empty values. In case properties contains a reference to the other object,
only object identities (ids and types) are checked.
only object identities (ids and types) are checked. Also, if an array field contains multiple references
to the same object, duplicates are ignored when comparing objects.
:type d1: dict
:type d2: dict
:return: True if passed objects and their properties are equal. Otherwise, returns False.
"""
d1 = dict((k, d1[k]) for k in d1.keys() if k not in NON_COMPARABLE_PROPERTIES and d1[k])
d2 = dict((k, d2[k]) for k in d2.keys() if k not in NON_COMPARABLE_PROPERTIES and d2[k])
def prepare_data_for_comparison(d):
d = dict((k, d[k]) for k in d.keys() if k not in NON_COMPARABLE_PROPERTIES and d[k])
d = delete_ref_duplicates(d)
return d
d1 = prepare_data_for_comparison(d1)
d2 = prepare_data_for_comparison(d2)
return equal_dicts(d1, d2, compare_by_reference=False)
def delete_ref_duplicates(d):
"""
Removes reference duplicates from array fields: if an array contains multiple items and some of
them refer to the same object, only unique references are preserved (duplicates are removed).
:param d: dict with data
:type d: dict
:return: dict without reference duplicates
"""
def delete_ref_duplicates_from_list(refs):
if all(type(i) == dict and is_object_ref(i) for i in refs):
unique_refs = set()
unique_list = list()
for i in refs:
key = (i['id'], i['type'])
if key not in unique_refs:
unique_refs.add(key)
unique_list.append(i)
return list(unique_list)
else:
return refs
if not d:
return d
modified_d = {}
for k, v in iteritems(d):
if type(v) == list:
modified_d[k] = delete_ref_duplicates_from_list(v)
elif type(v) == dict:
modified_d[k] = delete_ref_duplicates(v)
else:
modified_d[k] = v
return modified_d

View file

@ -31,9 +31,19 @@ INVALID_UUID_ERROR_MESSAGE = "Validation failed due to an invalid UUID"
DUPLICATE_NAME_ERROR_MESSAGE = "Validation failed due to a duplicate name"
MULTIPLE_DUPLICATES_FOUND_ERROR = (
"Cannot add a new object. An object(s) with the same attributes exists."
"Multiple objects returned according to filters being specified. "
"Please specify more specific filters which can find exact object that caused duplication error")
"Multiple objects matching specified filters are found. "
"Please, define filters more precisely to match one object exactly."
)
DUPLICATE_ERROR = (
"Cannot add a new object. "
"An object with the same name but different parameters already exists."
)
ADD_OPERATION_NOT_SUPPORTED_ERROR = (
"Cannot add a new object while executing an upsert request. "
"Creation of objects with this type is not supported."
)
PATH_PARAMS_FOR_DEFAULT_OBJ = {'objId': 'default'}
PATH_PARAMS_FOR_DEFAULT_OBJ = {'objId': 'default'}
@ -185,15 +195,10 @@ class OperationChecker(object):
:return: True if all criteria required to provide requested called operation are satisfied, otherwise False
:rtype: bool
"""
amount_operations_need_for_upsert_operation = 3
amount_supported_operations = 0
for operation_name, operation_spec in operations.items():
if cls.is_add_operation(operation_name, operation_spec) \
or cls.is_edit_operation(operation_name, operation_spec) \
or cls.is_get_list_operation(operation_name, operation_spec):
amount_supported_operations += 1
return amount_supported_operations == amount_operations_need_for_upsert_operation
has_edit_op = next((name for name, spec in iteritems(operations) if cls.is_edit_operation(name, spec)), None)
has_get_list_op = next((name for name, spec in iteritems(operations)
if cls.is_get_list_operation(name, spec)), None)
return has_edit_op and has_get_list_op
class BaseConfigurationResource(object):
@ -264,8 +269,6 @@ class BaseConfigurationResource(object):
return self._models_operations_specs_cache[model_name]
def get_objects_by_filter(self, operation_name, params):
def transform_filters_to_query_param(filter_params):
return ';'.join(['%s:%s' % (key, val) for key, val in sorted(iteritems(filter_params))])
def match_filters(filter_params, obj):
for k, v in iteritems(filter_params):
@ -275,14 +278,15 @@ class BaseConfigurationResource(object):
dummy, query_params, path_params = _get_user_params(params)
# copy required params to avoid mutation of passed `params` dict
get_list_params = {ParamName.QUERY_PARAMS: dict(query_params), ParamName.PATH_PARAMS: dict(path_params)}
url_params = {ParamName.QUERY_PARAMS: dict(query_params), ParamName.PATH_PARAMS: dict(path_params)}
filters = params.get(ParamName.FILTERS) or {}
if filters:
get_list_params[ParamName.QUERY_PARAMS][QueryParams.FILTER] = transform_filters_to_query_param(filters)
if QueryParams.FILTER not in url_params[ParamName.QUERY_PARAMS] and 'name' in filters:
# most endpoints only support filtering by name, so remaining `filters` are applied on returned objects
url_params[ParamName.QUERY_PARAMS][QueryParams.FILTER] = 'name:%s' % filters['name']
item_generator = iterate_over_pageable_resource(
partial(self.send_general_request, operation_name=operation_name), get_list_params
partial(self.send_general_request, operation_name=operation_name), url_params
)
return (i for i in item_generator if match_filters(filters, i))
@ -294,45 +298,51 @@ class BaseConfigurationResource(object):
return self.send_general_request(operation_name, params)
except FtdServerError as e:
if is_duplicate_name_error(e):
return self._check_if_the_same_object(operation_name, params, e)
return self._check_equality_with_existing_object(operation_name, params, e)
else:
raise e
def _check_if_the_same_object(self, operation_name, params, e):
def _check_equality_with_existing_object(self, operation_name, params, e):
"""
Special check used in the scope of 'add_object' operation, which can be requested as a standalone operation or
in the scope of 'upsert_object' operation. This method executed in case 'add_object' failed and should try to
find the object that caused "object duplicate" error. In case single object found and it's equal to one we are
trying to create - the existing object will be returned (attempt to have kind of idempotency for add action).
In the case when we got more than one object returned as a result of the request to API - it will be hard to
find exact duplicate so the exception will be raised.
Looks for an existing object that caused "object duplicate" error and
checks whether it corresponds to the one specified in `params`.
In case a single object is found and it is equal to one we are trying
to create, the existing object is returned.
When the existing object is not equal to the object being created or
several objects are returned, an exception is raised.
"""
model_name = self.get_operation_spec(operation_name)[OperationField.MODEL_NAME]
get_list_operation = self._find_get_list_operation(model_name)
if get_list_operation:
data = params[ParamName.DATA]
if not params.get(ParamName.FILTERS):
params[ParamName.FILTERS] = {'name': data['name']}
existing_obj = self._find_object_matching_params(model_name, params)
existing_obj = None
existing_objs = self.get_objects_by_filter(get_list_operation, params)
for i, obj in enumerate(existing_objs):
if i > 0:
raise FtdConfigurationError(MULTIPLE_DUPLICATES_FOUND_ERROR)
existing_obj = obj
if existing_obj is not None:
if equal_objects(existing_obj, data):
return existing_obj
else:
raise FtdConfigurationError(
'Cannot add new object. '
'An object with the same name but different parameters already exists.',
existing_obj)
if existing_obj is not None:
if equal_objects(existing_obj, params[ParamName.DATA]):
return existing_obj
else:
raise FtdConfigurationError(DUPLICATE_ERROR, existing_obj)
raise e
def _find_object_matching_params(self, model_name, params):
get_list_operation = self._find_get_list_operation(model_name)
if not get_list_operation:
return None
data = params[ParamName.DATA]
if not params.get(ParamName.FILTERS):
params[ParamName.FILTERS] = {'name': data['name']}
obj = None
filtered_objs = self.get_objects_by_filter(get_list_operation, params)
for i, obj in enumerate(filtered_objs):
if i > 0:
raise FtdConfigurationError(MULTIPLE_DUPLICATES_FOUND_ERROR)
obj = obj
return obj
def _find_get_list_operation(self, model_name):
operations = self.get_operation_specs_by_model_name(model_name) or {}
return next((
@ -373,9 +383,12 @@ class BaseConfigurationResource(object):
return self.send_general_request(operation_name, params)
def send_general_request(self, operation_name, params):
def stop_if_check_mode():
if self._check_mode:
raise CheckModeException()
self.validate_params(operation_name, params)
if self._check_mode:
raise CheckModeException()
stop_if_check_mode()
data, query_params, path_params = _get_user_params(params)
op_spec = self.get_operation_spec(operation_name)
@ -418,28 +431,14 @@ class BaseConfigurationResource(object):
if report:
raise ValidationError(report)
def is_upsert_operation_supported(self, op_name):
"""
Checks if all operations required for upsert object operation are defined in 'operations'.
:param op_name: upsert operation name
:type op_name: str
:return: True if all criteria required to provide requested called operation are satisfied, otherwise False
:rtype: bool
"""
model_name = _extract_model_from_upsert_operation(op_name)
operations = self.get_operation_specs_by_model_name(model_name)
return self._operation_checker.is_upsert_operation_supported(operations)
@staticmethod
def _get_operation_name(checker, operations):
for operation_name, op_spec in operations.items():
if checker(operation_name, op_spec):
return operation_name
raise FtdConfigurationError("Operation is not supported")
return next((op_name for op_name, op_spec in iteritems(operations) if checker(op_name, op_spec)), None)
def _add_upserted_object(self, model_operations, params):
add_op_name = self._get_operation_name(self._operation_checker.is_add_operation, model_operations)
if not add_op_name:
raise FtdConfigurationError(ADD_OPERATION_NOT_SUPPORTED_ERROR)
return self.add_object(add_op_name, params)
def _edit_upserted_object(self, model_operations, existing_object, params):
@ -453,9 +452,9 @@ class BaseConfigurationResource(object):
def upsert_object(self, op_name, params):
"""
The wrapper on top of add object operation, get a list of objects and edit object operations that implement
upsert object operation. As a result, the object will be created if the object does not exist, if a single
object exists with requested 'params' this object will be updated otherwise, Exception will be raised.
Updates an object if it already exists, or tries to create a new one if there is no
such object. If multiple objects match filter criteria, or add operation is not supported,
the exception is raised.
:param op_name: upsert operation name
:type op_name: str
@ -464,18 +463,26 @@ class BaseConfigurationResource(object):
:return: upserted object representation
:rtype: dict
"""
if not self.is_upsert_operation_supported(op_name):
raise FtdInvalidOperationNameError(op_name)
model_name = _extract_model_from_upsert_operation(op_name)
def extract_and_validate_model():
model = op_name[len(OperationNamePrefix.UPSERT):]
if not self._conn.get_model_spec(model):
raise FtdInvalidOperationNameError(op_name)
return model
model_name = extract_and_validate_model()
model_operations = self.get_operation_specs_by_model_name(model_name)
try:
if not self._operation_checker.is_upsert_operation_supported(model_operations):
raise FtdInvalidOperationNameError(op_name)
existing_obj = self._find_object_matching_params(model_name, params)
if existing_obj:
equal_to_existing_obj = equal_objects(existing_obj, params[ParamName.DATA])
return existing_obj if equal_to_existing_obj \
else self._edit_upserted_object(model_operations, existing_obj, params)
else:
return self._add_upserted_object(model_operations, params)
except FtdConfigurationError as e:
if e.obj:
return self._edit_upserted_object(model_operations, e.obj, params)
raise e
def _set_default(params, field_name, value):
@ -491,10 +498,6 @@ def is_put_request(operation_spec):
return operation_spec[OperationField.METHOD] == HTTPMethod.PUT
def _extract_model_from_upsert_operation(op_name):
return op_name[len(OperationNamePrefix.UPSERT):]
def _get_user_params(params):
return params.get(ParamName.DATA) or {}, params.get(ParamName.QUERY_PARAMS) or {}, params.get(
ParamName.PATH_PARAMS) or {}
@ -527,8 +530,8 @@ def iterate_over_pageable_resource(resource_func, params):
raise FtdUnexpectedResponse(
"Get List of Objects Response from the server contains more objects than requested. "
"There are {0} item(s) in the response while {1} was(ere) requested".format(items_in_response,
items_expected)
"There are {0} item(s) in the response while {1} was(ere) requested".format(
items_in_response, items_expected)
)
while True:

View file

@ -31,6 +31,7 @@ class OperationField:
MODEL_NAME = 'modelName'
DESCRIPTION = 'description'
RETURN_MULTIPLE_ITEMS = 'returnMultipleItems'
TAGS = "tags"
class SpecProp:
@ -105,14 +106,16 @@ class FdmSwaggerParser:
This method simplifies a swagger format, resolves a model name for each operation, and adds documentation for
each operation and model if it is provided.
:param spec: An API specification in the swagger format, see <https://github.com/OAI/OpenAPI-Specification/blob/master/versions/2.0.md>
:param spec: An API specification in the swagger format, see
<https://github.com/OAI/OpenAPI-Specification/blob/master/versions/2.0.md>
:type spec: dict
:param spec: A documentation map containing descriptions for models, operations and operation parameters.
:type docs: dict
:rtype: dict
:return:
Ex.
The models field contains model definition from swagger see <#https://github.com/OAI/OpenAPI-Specification/blob/master/versions/2.0.md#definitions>
The models field contains model definition from swagger see
<#https://github.com/OAI/OpenAPI-Specification/blob/master/versions/2.0.md#definitions>
{
'models':{
'model_name':{...},
@ -170,6 +173,10 @@ class FdmSwaggerParser:
SpecProp.MODEL_OPERATIONS: self._get_model_operations(operations)
}
@property
def base_path(self):
return self._base_path
def _get_model_operations(self, operations):
model_operations = {}
for operations_name, params in iteritems(operations):
@ -186,7 +193,8 @@ class FdmSwaggerParser:
OperationField.METHOD: method,
OperationField.URL: self._base_path + url,
OperationField.MODEL_NAME: self._get_model_name(method, params),
OperationField.RETURN_MULTIPLE_ITEMS: self._return_multiple_items(params)
OperationField.RETURN_MULTIPLE_ITEMS: self._return_multiple_items(params),
OperationField.TAGS: params.get(OperationField.TAGS, [])
}
if OperationField.PARAMETERS in params:
operation[OperationField.PARAMETERS] = self._get_rest_params(params[OperationField.PARAMETERS])
@ -205,8 +213,10 @@ class FdmSwaggerParser:
operation[OperationField.DESCRIPTION] = operation_docs.get(PropName.DESCRIPTION, '')
if OperationField.PARAMETERS in operation:
param_descriptions = dict((p[PropName.NAME], p[PropName.DESCRIPTION])
for p in operation_docs.get(OperationField.PARAMETERS, {}))
param_descriptions = dict((
(p[PropName.NAME], p[PropName.DESCRIPTION])
for p in operation_docs.get(OperationField.PARAMETERS, {})
))
for param_name, params_spec in operation[OperationField.PARAMETERS][OperationParams.PATH].items():
params_spec[OperationField.DESCRIPTION] = param_descriptions.get(param_name, '')
@ -493,7 +503,7 @@ class FdmSwaggerValidator:
if prop_name in params:
expected_type = prop[PropName.TYPE]
value = params[prop_name]
if prop_name in params and not self._is_correct_simple_types(expected_type, value):
if prop_name in params and not self._is_correct_simple_types(expected_type, value, allow_null=False):
self._add_invalid_type_report(status, '', prop_name, expected_type, value)
def _validate_object(self, status, model, data, path):
@ -505,9 +515,9 @@ class FdmSwaggerValidator:
def _is_enum(self, model):
return self._is_string_type(model) and PropName.ENUM in model
def _check_enum(self, status, model, value, path):
if value not in model[PropName.ENUM]:
self._add_invalid_type_report(status, path, '', PropName.ENUM, value)
def _check_enum(self, status, model, data, path):
if data is not None and data not in model[PropName.ENUM]:
self._add_invalid_type_report(status, path, '', PropName.ENUM, data)
def _add_invalid_type_report(self, status, path, prop_name, expected_type, actually_value):
status[PropName.INVALID_TYPE].append({
@ -517,6 +527,9 @@ class FdmSwaggerValidator:
})
def _check_object(self, status, model, data, path):
if data is None:
return
if not isinstance(data, dict):
self._add_invalid_type_report(status, path, '', PropType.OBJECT, data)
return None
@ -550,12 +563,14 @@ class FdmSwaggerValidator:
def _check_required_fields(self, status, required_fields, data, path):
missed_required_fields = [self._create_path_to_field(path, field) for field in
required_fields if field not in data.keys()]
required_fields if field not in data.keys() or data[field] is None]
if len(missed_required_fields) > 0:
status[PropName.REQUIRED] += missed_required_fields
def _check_array(self, status, model, data, path):
if not isinstance(data, list):
if data is None:
return
elif not isinstance(data, list):
self._add_invalid_type_report(status, path, '', PropType.ARRAY, data)
else:
item_model = model[PropName.ITEMS]
@ -564,7 +579,7 @@ class FdmSwaggerValidator:
'')
@staticmethod
def _is_correct_simple_types(expected_type, value):
def _is_correct_simple_types(expected_type, value, allow_null=True):
def is_numeric_string(s):
try:
float(s)
@ -572,7 +587,9 @@ class FdmSwaggerValidator:
except ValueError:
return False
if expected_type == PropType.STRING:
if value is None and allow_null:
return True
elif expected_type == PropType.STRING:
return isinstance(value, string_types)
elif expected_type == PropType.BOOLEAN:
return isinstance(value, bool)

View file

@ -49,7 +49,8 @@ options:
destination:
description:
- Absolute path of where to download the file to.
- If destination is a directory, the module uses a filename from 'Content-Disposition' header specified by the server.
- If destination is a directory, the module uses a filename from 'Content-Disposition' header specified by
the server.
required: true
type: path
"""

View file

@ -34,7 +34,6 @@ options:
type: str
description:
- Specifies the api token path of the FTD device
default: '/api/fdm/v2/fdm/token'
vars:
- name: ansible_httpapi_ftd_token_path
spec_path:
@ -50,6 +49,8 @@ import json
import os
import re
from ansible import __version__ as ansible_version
from ansible.module_utils.basic import to_text
from ansible.errors import AnsibleConnectionFailure
from ansible.module_utils.network.ftd.fdm_swagger_client import FdmSwaggerParser, SpecProp, FdmSwaggerValidator
@ -63,11 +64,21 @@ from ansible.module_utils.connection import ConnectionError
BASE_HEADERS = {
'Content-Type': 'application/json',
'Accept': 'application/json'
'Accept': 'application/json',
'User-Agent': 'FTD Ansible/%s' % ansible_version
}
TOKEN_EXPIRATION_STATUS_CODE = 408
UNAUTHORIZED_STATUS_CODE = 401
API_TOKEN_PATH_OPTION_NAME = 'token_path'
TOKEN_PATH_TEMPLATE = '/api/fdm/{0}/fdm/token'
GET_API_VERSIONS_PATH = '/api/versions'
DEFAULT_API_VERSIONS = ['v2', 'v1']
INVALID_API_TOKEN_PATH_MSG = ('The API token path is incorrect. Please, check correctness of '
'the `ansible_httpapi_ftd_token_path` variable in the inventory file.')
MISSING_API_TOKEN_PATH_MSG = ('Ansible could not determine the API token path automatically. Please, '
'specify the `ansible_httpapi_ftd_token_path` variable in the inventory file.')
class HttpApi(HttpApiBase):
@ -101,15 +112,7 @@ class HttpApi(HttpApiBase):
else:
raise AnsibleConnectionFailure('Username and password are required for login in absence of refresh token')
url = self._get_api_token_path()
self._display(HTTPMethod.POST, 'login', url)
response, response_data = self._send_auth_request(
url, json.dumps(payload), method=HTTPMethod.POST, headers=BASE_HEADERS
)
self._display(HTTPMethod.POST, 'login:status_code', response.getcode())
response = self._response_to_json(self._get_response_value(response_data))
response = self._lookup_login_url(payload)
try:
self.refresh_token = response['refresh_token']
@ -119,6 +122,48 @@ class HttpApi(HttpApiBase):
raise ConnectionError(
'Server returned response without token info during connection authentication: %s' % response)
def _lookup_login_url(self, payload):
""" Try to find correct login URL and get api token using this URL.
:param payload: Token request payload
:type payload: dict
:return: token generation response
"""
preconfigured_token_path = self._get_api_token_path()
if preconfigured_token_path:
token_paths = [preconfigured_token_path]
else:
token_paths = self._get_known_token_paths()
for url in token_paths:
try:
response = self._send_login_request(payload, url)
except ConnectionError as e:
self.connection.queue_message('vvvv', 'REST:request to %s failed because of connection error: %s ' % (
url, e))
# In the case of ConnectionError caused by HTTPError we should check response code.
# Response code 400 returned in case of invalid credentials so we should stop attempts to log in and
# inform the user.
if hasattr(e, 'http_code') and e.http_code == 400:
raise
else:
if not preconfigured_token_path:
self._set_api_token_path(url)
return response
raise ConnectionError(INVALID_API_TOKEN_PATH_MSG if preconfigured_token_path else MISSING_API_TOKEN_PATH_MSG)
def _send_login_request(self, payload, url):
self._display(HTTPMethod.POST, 'login', url)
response, response_data = self._send_auth_request(
url, json.dumps(payload), method=HTTPMethod.POST, headers=BASE_HEADERS
)
self._display(HTTPMethod.POST, 'login:status_code', response.getcode())
response = self._response_to_json(self._get_response_value(response_data))
return response
def logout(self):
auth_payload = {
'grant_type': 'revoke_token',
@ -137,6 +182,10 @@ class HttpApi(HttpApiBase):
self.access_token = None
def _send_auth_request(self, path, data, **kwargs):
error_msg_prefix = 'Server returned an error during authentication request'
return self._send_service_request(path, error_msg_prefix, data=data, **kwargs)
def _send_service_request(self, path, error_msg_prefix, data=None, **kwargs):
try:
self._ignore_http_errors = True
return self.connection.send(path, data, **kwargs)
@ -144,7 +193,7 @@ class HttpApi(HttpApiBase):
# HttpApi connection does not read the error response from HTTPError, so we do it here and wrap it up in
# ConnectionError, so the actual error message is displayed to the user.
error_msg = self._response_to_json(to_text(e.read()))
raise ConnectionError('Server returned an error during authentication request: %s' % error_msg)
raise ConnectionError('%s: %s' % (error_msg_prefix, error_msg), http_code=e.code)
finally:
self._ignore_http_errors = False
@ -230,8 +279,47 @@ class HttpApi(HttpApiBase):
def _get_api_spec_path(self):
return self.get_option('spec_path')
def _get_known_token_paths(self):
"""Generate list of token generation urls based on list of versions supported by device(if exposed via API) or
default list of API versions.
:returns: list of token generation urls
:rtype: generator
"""
try:
api_versions = self._get_supported_api_versions()
except ConnectionError:
# API versions API is not supported we need to check all known version
api_versions = DEFAULT_API_VERSIONS
return [TOKEN_PATH_TEMPLATE.format(version) for version in api_versions]
def _get_supported_api_versions(self):
"""
Fetch list of API versions supported by device.
:return: list of API versions suitable for device
:rtype: list
"""
# Try to fetch supported API version
http_method = HTTPMethod.GET
response, response_data = self._send_service_request(
path=GET_API_VERSIONS_PATH,
error_msg_prefix="Can't fetch list of supported api versions",
method=http_method,
headers=BASE_HEADERS
)
value = self._get_response_value(response_data)
self._display(http_method, 'response', value)
api_versions_info = self._response_to_json(value)
return api_versions_info["supportedVersions"]
def _get_api_token_path(self):
return self.get_option('token_path')
return self.get_option(API_TOKEN_PATH_OPTION_NAME)
def _set_api_token_path(self, url):
return self.set_option(API_TOKEN_PATH_OPTION_NAME, url)
@staticmethod
def _response_to_json(response_text):

View file

@ -16,7 +16,7 @@
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
from ansible.module_utils.network.ftd.common import equal_objects
from ansible.module_utils.network.ftd.common import equal_objects, delete_ref_duplicates
# simple objects
@ -246,3 +246,129 @@ def test_equal_objects_return_true_with_equal_nested_list_of_object_references()
}
}
)
def test_equal_objects_return_true_with_reference_list_containing_duplicates():
assert equal_objects(
{
'name': 'foo',
'config': {
'version': '1',
'ports': [{
'name': 'oldPortName',
'type': 'port',
'id': '123'
}, {
'name': 'oldPortName',
'type': 'port',
'id': '123'
}, {
'name': 'oldPortName2',
'type': 'port',
'id': '234'
}]
}
},
{
'name': 'foo',
'config': {
'version': '1',
'ports': [{
'name': 'newPortName',
'type': 'port',
'id': '123'
}, {
'name': 'newPortName2',
'type': 'port',
'id': '234',
'extraField': 'foo'
}]
}
}
)
def test_delete_ref_duplicates_with_none():
assert delete_ref_duplicates(None) is None
def test_delete_ref_duplicates_with_empty_dict():
assert {} == delete_ref_duplicates({})
def test_delete_ref_duplicates_with_simple_object():
data = {
'id': '123',
'name': 'foo',
'type': 'bar',
'values': ['a', 'b']
}
assert data == delete_ref_duplicates(data)
def test_delete_ref_duplicates_with_object_containing_refs():
data = {
'id': '123',
'name': 'foo',
'type': 'bar',
'refs': [
{'id': '123', 'type': 'baz'},
{'id': '234', 'type': 'baz'},
{'id': '234', 'type': 'foo'}
]
}
assert data == delete_ref_duplicates(data)
def test_delete_ref_duplicates_with_object_containing_duplicate_refs():
data = {
'id': '123',
'name': 'foo',
'type': 'bar',
'refs': [
{'id': '123', 'type': 'baz'},
{'id': '123', 'type': 'baz'},
{'id': '234', 'type': 'baz'},
{'id': '234', 'type': 'baz'},
{'id': '234', 'type': 'foo'}
]
}
assert {
'id': '123',
'name': 'foo',
'type': 'bar',
'refs': [
{'id': '123', 'type': 'baz'},
{'id': '234', 'type': 'baz'},
{'id': '234', 'type': 'foo'}
]
} == delete_ref_duplicates(data)
def test_delete_ref_duplicates_with_object_containing_duplicate_refs_in_nested_object():
data = {
'id': '123',
'name': 'foo',
'type': 'bar',
'children': {
'refs': [
{'id': '123', 'type': 'baz'},
{'id': '123', 'type': 'baz'},
{'id': '234', 'type': 'baz'},
{'id': '234', 'type': 'baz'},
{'id': '234', 'type': 'foo'}
]
}
}
assert {
'id': '123',
'name': 'foo',
'type': 'bar',
'children': {
'refs': [
{'id': '123', 'type': 'baz'},
{'id': '234', 'type': 'baz'},
{'id': '234', 'type': 'foo'}
]
}
} == delete_ref_duplicates(data)

View file

@ -80,12 +80,11 @@ class TestBaseConfigurationResource(object):
# we need evaluate it.
assert [objects[1]] == list(resource.get_objects_by_filter(
'test',
{ParamName.FILTERS: {'type': 1, 'foo': {'bar': 'buz'}}}))
{ParamName.FILTERS: {'name': 'obj2', 'type': 1, 'foo': {'bar': 'buz'}}}))
send_request_mock.assert_has_calls(
[
mock.call('/object/', 'get', {}, {},
{QueryParams.FILTER: "foo:{'bar': 'buz'};type:1", 'limit': 10, 'offset': 0})
mock.call('/object/', 'get', {}, {}, {QueryParams.FILTER: 'name:obj2', 'limit': 10, 'offset': 0})
]
)
@ -111,8 +110,7 @@ class TestBaseConfigurationResource(object):
{ParamName.FILTERS: {'type': 'foo'}}))
send_request_mock.assert_has_calls(
[
mock.call('/object/', 'get', {}, {},
{QueryParams.FILTER: "type:foo", 'limit': 10, 'offset': 0})
mock.call('/object/', 'get', {}, {}, {'limit': 10, 'offset': 0})
]
)
@ -136,10 +134,8 @@ class TestBaseConfigurationResource(object):
assert [{'name': 'obj1', 'type': 'foo'}, {'name': 'obj3', 'type': 'foo'}] == resp
send_request_mock.assert_has_calls(
[
mock.call('/object/', 'get', {}, {},
{QueryParams.FILTER: "type:foo", 'limit': 2, 'offset': 0}),
mock.call('/object/', 'get', {}, {},
{QueryParams.FILTER: "type:foo", 'limit': 2, 'offset': 2})
mock.call('/object/', 'get', {}, {}, {'limit': 2, 'offset': 0}),
mock.call('/object/', 'get', {}, {}, {'limit': 2, 'offset': 2})
]
)
@ -542,29 +538,14 @@ class TestOperationCheckerClass(unittest.TestCase):
operation_name, params, operation_spec
)
@patch.object(OperationChecker, "is_add_operation")
@patch.object(OperationChecker, "is_edit_operation")
@patch.object(OperationChecker, "is_get_list_operation")
def test_is_upsert_operation_supported_operation(self, is_add_mock, is_edit_mock, is_get_list_mock):
operations_spec = {
'add': 1,
'edit': 1,
'getList': 1
}
is_add_mock.side_effect = [1, 0, 0]
is_edit_mock.side_effect = [1, 0, 0]
is_get_list_mock.side_effect = [1, 0, 0]
def test_is_upsert_operation_supported_operation(self):
get_list_op_spec = {OperationField.METHOD: HTTPMethod.GET, OperationField.RETURN_MULTIPLE_ITEMS: True}
add_op_spec = {OperationField.METHOD: HTTPMethod.POST}
edit_op_spec = {OperationField.METHOD: HTTPMethod.PUT}
assert self._checker.is_upsert_operation_supported(operations_spec)
is_add_mock.side_effect = [1, 0, 0]
is_edit_mock.side_effect = [0, 1, 0]
is_get_list_mock.side_effect = [0, 0, 0]
assert not self._checker.is_upsert_operation_supported(operations_spec)
is_add_mock.side_effect = [1, 0, 0]
is_edit_mock.side_effect = [0, 0, 0]
is_get_list_mock.side_effect = [1, 0, 0]
assert not self._checker.is_upsert_operation_supported(operations_spec)
assert self._checker.is_upsert_operation_supported({'getList': get_list_op_spec, 'edit': edit_op_spec})
assert self._checker.is_upsert_operation_supported(
{'add': add_op_spec, 'getList': get_list_op_spec, 'edit': edit_op_spec})
assert not self._checker.is_upsert_operation_supported({'getList': get_list_op_spec})
assert not self._checker.is_upsert_operation_supported({'edit': edit_op_spec})
assert not self._checker.is_upsert_operation_supported({'getList': get_list_op_spec, 'add': add_op_spec})

File diff suppressed because one or more lines are too long

View file

@ -45,17 +45,20 @@ base = {
},
'paths': {
"/object/networks": {
"get": {"tags": ["NetworkObject"], "operationId": "getNetworkObjectList",
"responses": {"200": {"description": "", "schema": {"type": "object",
"title": "NetworkObjectList",
"properties": {"items": {
"type": "array",
"items": {
"$ref": "#/definitions/NetworkObjectWrapper"}},
"paging": {
"$ref": "#/definitions/Paging"}},
"required": ["items",
"paging"]}}},
"get": {"tags": ["NetworkObject"],
"operationId": "getNetworkObjectList",
"responses": {
"200": {
"description": "",
"schema": {"type": "object",
"title": "NetworkObjectList",
"properties": {
"items": {
"type": "array",
"items": {"$ref": "#/definitions/NetworkObjectWrapper"}},
"paging": {
"$ref": "#/definitions/Paging"}},
"required": ["items", "paging"]}}},
"parameters": [
{"name": "offset", "in": "query", "required": False, "type": "integer"},
{"name": "limit", "in": "query", "required": False, "type": "integer"},
@ -141,7 +144,8 @@ class TestFdmSwaggerParser(unittest.TestCase):
}
}
},
'returnMultipleItems': True
'returnMultipleItems': True,
"tags": ["NetworkObject"]
},
'addNetworkObject': {
'method': HTTPMethod.POST,
@ -149,7 +153,8 @@ class TestFdmSwaggerParser(unittest.TestCase):
'modelName': 'NetworkObject',
'parameters': {'path': {},
'query': {}},
'returnMultipleItems': False
'returnMultipleItems': False,
"tags": ["NetworkObject"]
},
'getNetworkObject': {
'method': HTTPMethod.GET,
@ -164,7 +169,8 @@ class TestFdmSwaggerParser(unittest.TestCase):
},
'query': {}
},
'returnMultipleItems': False
'returnMultipleItems': False,
"tags": ["NetworkObject"]
},
'editNetworkObject': {
'method': HTTPMethod.PUT,
@ -179,7 +185,8 @@ class TestFdmSwaggerParser(unittest.TestCase):
},
'query': {}
},
'returnMultipleItems': False
'returnMultipleItems': False,
"tags": ["NetworkObject"]
},
'deleteNetworkObject': {
'method': HTTPMethod.DELETE,
@ -194,7 +201,8 @@ class TestFdmSwaggerParser(unittest.TestCase):
},
'query': {}
},
'returnMultipleItems': False
'returnMultipleItems': False,
"tags": ["NetworkObject"]
}
}
assert sorted(['NetworkObject', 'NetworkObjectWrapper']) == sorted(self.fdm_data['models'].keys())
@ -302,7 +310,8 @@ class TestFdmSwaggerParser(unittest.TestCase):
'method': HTTPMethod.GET,
'url': '/v2/path1',
'modelName': 'Model1',
'returnMultipleItems': True
'returnMultipleItems': True,
'tags': []
},
'addSomeModel': {
'method': HTTPMethod.POST,
@ -312,13 +321,15 @@ class TestFdmSwaggerParser(unittest.TestCase):
'path': {},
'query': {}
},
'returnMultipleItems': False
'returnMultipleItems': False,
'tags': []
},
'getSomeModel': {
'method': HTTPMethod.GET,
'url': '/v2/path2/{id}',
'modelName': 'Model3',
'returnMultipleItems': False
'returnMultipleItems': False,
'tags': []
},
'editSomeModel': {
'method': HTTPMethod.PUT,
@ -328,19 +339,22 @@ class TestFdmSwaggerParser(unittest.TestCase):
'path': {},
'query': {}
},
'returnMultipleItems': False
'returnMultipleItems': False,
'tags': []
},
'deleteModel3': {
'method': HTTPMethod.DELETE,
'url': '/v2/path2/{id}',
'modelName': 'Model3',
'returnMultipleItems': False
'returnMultipleItems': False,
'tags': []
},
'deleteNoneModel': {
'method': HTTPMethod.DELETE,
'url': '/v2/path3',
'modelName': None,
'returnMultipleItems': False
'returnMultipleItems': False,
'tags': []
}
}
@ -350,7 +364,7 @@ class TestFdmSwaggerParser(unittest.TestCase):
assert {
'Model1': {
'getSomeModelList': expected_operations['getSomeModelList'],
'editSomeModel': expected_operations['editSomeModel']
'editSomeModel': expected_operations['editSomeModel'],
},
'Model2': {
'addSomeModel': expected_operations['addSomeModel']

View file

@ -413,6 +413,29 @@ class TestFdmSwaggerValidator(unittest.TestCase):
]
}) == sort_validator_rez(rez)
data = {
'objId': "123",
'parentId': "1",
'someParam': None,
'p_integer': None
}
valid, rez = getattr(validator, method)('getNetwork', data)
assert not valid
assert sort_validator_rez({
'invalid_type': [
{
'path': 'someParam',
'expected_type': 'string',
'actually_value': None
},
{
'path': 'p_integer',
'expected_type': 'integer',
'actually_value': None
}
]
}) == sort_validator_rez(rez)
def test_validate_path_params_method_with_empty_data(self):
self.validate_url_data_with_empty_data(method='validate_path_params', parameters_type='path')
@ -593,6 +616,16 @@ class TestFdmSwaggerValidator(unittest.TestCase):
assert valid
assert rez is None
def test_pass_only_required_fields_with_none_values(self):
data = {
'subType': 'NETWORK',
'type': 'networkobject',
'value': None
}
valid, rez = FdmSwaggerValidator(mock_data).validate_data('getNetworkObjectList', data)
assert not valid
assert {'required': ['value']} == rez
def test_pass_no_data_with_no_required_fields(self):
spec = copy.deepcopy(mock_data)
del spec['models']['NetworkObject']['required']
@ -725,6 +758,17 @@ class TestFdmSwaggerValidator(unittest.TestCase):
assert valid
assert rez is None
valid_data = {
"f_string": None,
"f_number": None,
"f_boolean": None,
"f_integer": None
}
valid, rez = FdmSwaggerValidator(local_mock_data).validate_data('getdata', valid_data)
assert valid
assert rez is None
def test_invalid_simple_types(self):
local_mock_data = {
'models': {

View file

@ -0,0 +1,75 @@
import json
import os
import unittest
from ansible.module_utils.network.ftd.fdm_swagger_client import FdmSwaggerValidator, FdmSwaggerParser
DIR_PATH = os.path.dirname(os.path.realpath(__file__))
TEST_DATA_FOLDER = os.path.join(DIR_PATH, 'test_data')
class TestFdmSwagger(unittest.TestCase):
def setUp(self):
self.init_mock_data()
def init_mock_data(self):
with open(os.path.join(TEST_DATA_FOLDER, 'ngfw_with_ex.json'), 'rb') as f:
self.base_data = json.loads(f.read().decode('utf-8'))
def test_with_all_data(self):
fdm_data = FdmSwaggerParser().parse_spec(self.base_data)
validator = FdmSwaggerValidator(fdm_data)
models = fdm_data['models']
operations = fdm_data['operations']
invalid = set({})
for operation in operations:
model_name = operations[operation]['modelName']
method = operations[operation]['method']
if method != 'get' and model_name in models:
if 'example' in models[model_name]:
example = models[model_name]['example']
try:
valid, rez = validator.validate_data(operation, example)
assert valid
except Exception:
invalid.add(model_name)
assert invalid == set(['TCPPortObject',
'UDPPortObject',
'ICMPv4PortObject',
'ICMPv6PortObject',
'StandardAccessList',
'ExtendedAccessList',
'ASPathList',
'RouteMap',
'StandardCommunityList',
'ExpandedCommunityList',
'IPV4PrefixList',
'IPV6PrefixList',
'PolicyList',
'SyslogServer',
'HAConfiguration',
'TestIdentitySource'])
def test_parse_all_data(self):
self.fdm_data = FdmSwaggerParser().parse_spec(self.base_data)
operations = self.fdm_data['operations']
without_model_name = []
expected_operations_counter = 0
for key in self.base_data['paths']:
operation = self.base_data['paths'][key]
for dummy in operation:
expected_operations_counter += 1
for key in operations:
operation = operations[key]
if not operation['modelName']:
without_model_name.append(operation['url'])
if operation['modelName'] == '_File' and 'download' not in operation['url']:
self.fail('File type can be defined for download operation only')
assert sorted(['/api/fdm/v2/operational/deploy/{objId}', '/api/fdm/v2/action/upgrade']) == sorted(
without_model_name)
assert sorted(self.fdm_data['model_operations'][None].keys()) == sorted(['deleteDeployment', 'startUpgrade'])
assert expected_operations_counter == len(operations)

View file

@ -27,7 +27,8 @@ from units.compat import mock
from ansible.module_utils.network.ftd.common import FtdServerError, HTTPMethod, ResponseParams, FtdConfigurationError
from ansible.module_utils.network.ftd.configuration import DUPLICATE_NAME_ERROR_MESSAGE, UNPROCESSABLE_ENTITY_STATUS, \
MULTIPLE_DUPLICATES_FOUND_ERROR, BaseConfigurationResource, FtdInvalidOperationNameError, QueryParams
MULTIPLE_DUPLICATES_FOUND_ERROR, BaseConfigurationResource, FtdInvalidOperationNameError, QueryParams, \
ADD_OPERATION_NOT_SUPPORTED_ERROR, ParamName
from ansible.module_utils.network.ftd.fdm_swagger_client import ValidationError
ADD_RESPONSE = {'status': 'Object added'}
@ -39,8 +40,8 @@ ARBITRARY_RESPONSE = {'status': 'Arbitrary request sent'}
class TestUpsertOperationUnitTests(unittest.TestCase):
def setUp(self):
conn = mock.MagicMock()
self._resource = BaseConfigurationResource(conn)
self._conn = mock.MagicMock()
self._resource = BaseConfigurationResource(self._conn)
def test_get_operation_name(self):
operation_a = mock.MagicMock()
@ -59,11 +60,7 @@ class TestUpsertOperationUnitTests(unittest.TestCase):
assert operation_a == self._resource._get_operation_name(checker_wrapper(operation_a), operations)
assert operation_b == self._resource._get_operation_name(checker_wrapper(operation_b), operations)
self.assertRaises(
FtdConfigurationError,
self._resource._get_operation_name, checker_wrapper(None), operations
)
assert self._resource._get_operation_name(checker_wrapper(None), operations) is None
@mock.patch.object(BaseConfigurationResource, "_get_operation_name")
@mock.patch.object(BaseConfigurationResource, "add_object")
@ -79,6 +76,19 @@ class TestUpsertOperationUnitTests(unittest.TestCase):
model_operations)
add_object_mock.assert_called_once_with(add_op_name, params)
@mock.patch.object(BaseConfigurationResource, "_get_operation_name")
@mock.patch.object(BaseConfigurationResource, "add_object")
def test_add_upserted_object_with_no_add_operation(self, add_object_mock, get_operation_mock):
model_operations = mock.MagicMock()
get_operation_mock.return_value = None
with pytest.raises(FtdConfigurationError) as exc_info:
self._resource._add_upserted_object(model_operations, mock.MagicMock())
assert ADD_OPERATION_NOT_SUPPORTED_ERROR in str(exc_info.value)
get_operation_mock.assert_called_once_with(self._resource._operation_checker.is_add_operation, model_operations)
add_object_mock.assert_not_called()
@mock.patch.object(BaseConfigurationResource, "_get_operation_name")
@mock.patch.object(BaseConfigurationResource, "edit_object")
@mock.patch("ansible.module_utils.network.ftd.configuration.copy_identity_properties")
@ -112,139 +122,175 @@ class TestUpsertOperationUnitTests(unittest.TestCase):
params
)
@mock.patch.object(BaseConfigurationResource, "get_operation_specs_by_model_name")
@mock.patch("ansible.module_utils.network.ftd.configuration.OperationChecker.is_upsert_operation_supported")
@mock.patch("ansible.module_utils.network.ftd.configuration._extract_model_from_upsert_operation")
def test_is_upsert_operation_supported(self, extract_model_mock, is_upsert_supported_mock, get_operation_spec_mock):
op_name = mock.MagicMock()
result = self._resource.is_upsert_operation_supported(op_name)
assert result == is_upsert_supported_mock.return_value
extract_model_mock.assert_called_once_with(op_name)
get_operation_spec_mock.assert_called_once_with(extract_model_mock.return_value)
is_upsert_supported_mock.assert_called_once_with(get_operation_spec_mock.return_value)
@mock.patch.object(BaseConfigurationResource, "is_upsert_operation_supported")
@mock.patch.object(BaseConfigurationResource, "get_operation_specs_by_model_name")
@mock.patch.object(BaseConfigurationResource, "_find_object_matching_params")
@mock.patch.object(BaseConfigurationResource, "_add_upserted_object")
@mock.patch.object(BaseConfigurationResource, "_edit_upserted_object")
@mock.patch("ansible.module_utils.network.ftd.configuration._extract_model_from_upsert_operation")
def test_upsert_object_succesfully_added(self, extract_model_mock, edit_mock, add_mock, get_operation_mock,
is_upsert_supported_mock):
op_name = mock.MagicMock()
def test_upsert_object_successfully_added(self, edit_mock, add_mock, find_object, get_operation_mock,
is_upsert_supported_mock):
params = mock.MagicMock()
is_upsert_supported_mock.return_value = True
find_object.return_value = None
result = self._resource.upsert_object(op_name, params)
result = self._resource.upsert_object('upsertFoo', params)
assert result == add_mock.return_value
is_upsert_supported_mock.assert_called_once_with(op_name)
extract_model_mock.assert_called_once_with(op_name)
get_operation_mock.assert_called_once_with(extract_model_mock.return_value)
self._conn.get_model_spec.assert_called_once_with('Foo')
is_upsert_supported_mock.assert_called_once_with(get_operation_mock.return_value)
get_operation_mock.assert_called_once_with('Foo')
find_object.assert_called_once_with('Foo', params)
add_mock.assert_called_once_with(get_operation_mock.return_value, params)
edit_mock.assert_not_called()
@mock.patch.object(BaseConfigurationResource, "is_upsert_operation_supported")
@mock.patch("ansible.module_utils.network.ftd.configuration.equal_objects")
@mock.patch("ansible.module_utils.network.ftd.configuration.OperationChecker.is_upsert_operation_supported")
@mock.patch.object(BaseConfigurationResource, "get_operation_specs_by_model_name")
@mock.patch.object(BaseConfigurationResource, "_find_object_matching_params")
@mock.patch.object(BaseConfigurationResource, "_add_upserted_object")
@mock.patch.object(BaseConfigurationResource, "_edit_upserted_object")
@mock.patch("ansible.module_utils.network.ftd.configuration._extract_model_from_upsert_operation")
def test_upsert_object_succesfully_edited(self, extract_model_mock, edit_mock, add_mock, get_operation_mock,
is_upsert_supported_mock):
op_name = mock.MagicMock()
def test_upsert_object_successfully_edited(self, edit_mock, add_mock, find_object, get_operation_mock,
is_upsert_supported_mock, equal_objects_mock):
params = mock.MagicMock()
existing_obj = mock.MagicMock()
is_upsert_supported_mock.return_value = True
error = FtdConfigurationError("Obj duplication error")
error.obj = mock.MagicMock()
find_object.return_value = existing_obj
equal_objects_mock.return_value = False
add_mock.side_effect = error
result = self._resource.upsert_object(op_name, params)
result = self._resource.upsert_object('upsertFoo', params)
assert result == edit_mock.return_value
is_upsert_supported_mock.assert_called_once_with(op_name)
extract_model_mock.assert_called_once_with(op_name)
get_operation_mock.assert_called_once_with(extract_model_mock.return_value)
add_mock.assert_called_once_with(get_operation_mock.return_value, params)
edit_mock.assert_called_once_with(get_operation_mock.return_value, error.obj, params)
self._conn.get_model_spec.assert_called_once_with('Foo')
get_operation_mock.assert_called_once_with('Foo')
is_upsert_supported_mock.assert_called_once_with(get_operation_mock.return_value)
add_mock.assert_not_called()
equal_objects_mock.assert_called_once_with(existing_obj, params[ParamName.DATA])
edit_mock.assert_called_once_with(get_operation_mock.return_value, existing_obj, params)
@mock.patch.object(BaseConfigurationResource, "is_upsert_operation_supported")
@mock.patch("ansible.module_utils.network.ftd.configuration.equal_objects")
@mock.patch("ansible.module_utils.network.ftd.configuration.OperationChecker.is_upsert_operation_supported")
@mock.patch.object(BaseConfigurationResource, "get_operation_specs_by_model_name")
@mock.patch.object(BaseConfigurationResource, "_find_object_matching_params")
@mock.patch.object(BaseConfigurationResource, "_add_upserted_object")
@mock.patch.object(BaseConfigurationResource, "_edit_upserted_object")
@mock.patch("ansible.module_utils.network.ftd.configuration._extract_model_from_upsert_operation")
def test_upsert_object_not_supported(self, extract_model_mock, edit_mock, add_mock, get_operation_mock,
def test_upsert_object_returned_without_modifications(self, edit_mock, add_mock, find_object, get_operation_mock,
is_upsert_supported_mock, equal_objects_mock):
params = mock.MagicMock()
existing_obj = mock.MagicMock()
is_upsert_supported_mock.return_value = True
find_object.return_value = existing_obj
equal_objects_mock.return_value = True
result = self._resource.upsert_object('upsertFoo', params)
assert result == existing_obj
self._conn.get_model_spec.assert_called_once_with('Foo')
get_operation_mock.assert_called_once_with('Foo')
is_upsert_supported_mock.assert_called_once_with(get_operation_mock.return_value)
add_mock.assert_not_called()
equal_objects_mock.assert_called_once_with(existing_obj, params[ParamName.DATA])
edit_mock.assert_not_called()
@mock.patch("ansible.module_utils.network.ftd.configuration.OperationChecker.is_upsert_operation_supported")
@mock.patch.object(BaseConfigurationResource, "get_operation_specs_by_model_name")
@mock.patch.object(BaseConfigurationResource, "_find_object_matching_params")
@mock.patch.object(BaseConfigurationResource, "_add_upserted_object")
@mock.patch.object(BaseConfigurationResource, "_edit_upserted_object")
def test_upsert_object_not_supported(self, edit_mock, add_mock, find_object, get_operation_mock,
is_upsert_supported_mock):
op_name = mock.MagicMock()
params = mock.MagicMock()
is_upsert_supported_mock.return_value = False
self.assertRaises(
FtdInvalidOperationNameError,
self._resource.upsert_object, op_name, params
self._resource.upsert_object, 'upsertFoo', params
)
is_upsert_supported_mock.assert_called_once_with(op_name)
extract_model_mock.assert_not_called()
get_operation_mock.assert_not_called()
self._conn.get_model_spec.assert_called_once_with('Foo')
get_operation_mock.assert_called_once_with('Foo')
is_upsert_supported_mock.assert_called_once_with(get_operation_mock.return_value)
find_object.assert_not_called()
add_mock.assert_not_called()
edit_mock.assert_not_called()
@mock.patch.object(BaseConfigurationResource, "is_upsert_operation_supported")
@mock.patch("ansible.module_utils.network.ftd.configuration.OperationChecker.is_upsert_operation_supported")
@mock.patch.object(BaseConfigurationResource, "get_operation_specs_by_model_name")
@mock.patch.object(BaseConfigurationResource, "_find_object_matching_params")
@mock.patch.object(BaseConfigurationResource, "_add_upserted_object")
@mock.patch.object(BaseConfigurationResource, "_edit_upserted_object")
@mock.patch("ansible.module_utils.network.ftd.configuration._extract_model_from_upsert_operation")
def test_upsert_object_neither_added_nor_edited(self, extract_model_mock, edit_mock, add_mock, get_operation_mock,
def test_upsert_object_when_model_not_supported(self, edit_mock, add_mock, find_object, get_operation_mock,
is_upsert_supported_mock):
op_name = mock.MagicMock()
params = mock.MagicMock()
self._conn.get_model_spec.return_value = None
self.assertRaises(
FtdInvalidOperationNameError,
self._resource.upsert_object, 'upsertNonExisting', params
)
self._conn.get_model_spec.assert_called_once_with('NonExisting')
get_operation_mock.assert_not_called()
is_upsert_supported_mock.assert_not_called()
find_object.assert_not_called()
add_mock.assert_not_called()
edit_mock.assert_not_called()
@mock.patch("ansible.module_utils.network.ftd.configuration.equal_objects")
@mock.patch("ansible.module_utils.network.ftd.configuration.OperationChecker.is_upsert_operation_supported")
@mock.patch.object(BaseConfigurationResource, "get_operation_specs_by_model_name")
@mock.patch.object(BaseConfigurationResource, "_find_object_matching_params")
@mock.patch.object(BaseConfigurationResource, "_add_upserted_object")
@mock.patch.object(BaseConfigurationResource, "_edit_upserted_object")
def test_upsert_object_with_fatal_error_during_edit(self, edit_mock, add_mock, find_object, get_operation_mock,
is_upsert_supported_mock, equal_objects_mock):
params = mock.MagicMock()
existing_obj = mock.MagicMock()
is_upsert_supported_mock.return_value = True
error = FtdConfigurationError("Obj duplication error")
error.obj = mock.MagicMock()
add_mock.side_effect = error
find_object.return_value = existing_obj
equal_objects_mock.return_value = False
edit_mock.side_effect = FtdConfigurationError("Some object edit error")
self.assertRaises(
FtdConfigurationError,
self._resource.upsert_object, op_name, params
self._resource.upsert_object, 'upsertFoo', params
)
is_upsert_supported_mock.assert_called_once_with(op_name)
extract_model_mock.assert_called_once_with(op_name)
get_operation_mock.assert_called_once_with(extract_model_mock.return_value)
add_mock.assert_called_once_with(get_operation_mock.return_value, params)
edit_mock.assert_called_once_with(get_operation_mock.return_value, error.obj, params)
is_upsert_supported_mock.assert_called_once_with(get_operation_mock.return_value)
self._conn.get_model_spec.assert_called_once_with('Foo')
get_operation_mock.assert_called_once_with('Foo')
find_object.assert_called_once_with('Foo', params)
add_mock.assert_not_called()
edit_mock.assert_called_once_with(get_operation_mock.return_value, existing_obj, params)
@mock.patch.object(BaseConfigurationResource, "is_upsert_operation_supported")
@mock.patch("ansible.module_utils.network.ftd.configuration.OperationChecker.is_upsert_operation_supported")
@mock.patch.object(BaseConfigurationResource, "get_operation_specs_by_model_name")
@mock.patch.object(BaseConfigurationResource, "_find_object_matching_params")
@mock.patch.object(BaseConfigurationResource, "_add_upserted_object")
@mock.patch.object(BaseConfigurationResource, "_edit_upserted_object")
@mock.patch("ansible.module_utils.network.ftd.configuration._extract_model_from_upsert_operation")
def test_upsert_object_with_fatal_error_during_add(self, extract_model_mock, edit_mock, add_mock,
get_operation_mock, is_upsert_supported_mock):
op_name = mock.MagicMock()
def test_upsert_object_with_fatal_error_during_add(self, edit_mock, add_mock, find_object, get_operation_mock,
is_upsert_supported_mock):
params = mock.MagicMock()
is_upsert_supported_mock.return_value = True
find_object.return_value = None
error = FtdConfigurationError("Obj duplication error")
add_mock.side_effect = error
self.assertRaises(
FtdConfigurationError,
self._resource.upsert_object, op_name, params
self._resource.upsert_object, 'upsertFoo', params
)
is_upsert_supported_mock.assert_called_once_with(op_name)
extract_model_mock.assert_called_once_with(op_name)
get_operation_mock.assert_called_once_with(extract_model_mock.return_value)
is_upsert_supported_mock.assert_called_once_with(get_operation_mock.return_value)
self._conn.get_model_spec.assert_called_once_with('Foo')
get_operation_mock.assert_called_once_with('Foo')
find_object.assert_called_once_with('Foo', params)
add_mock.assert_called_once_with(get_operation_mock.return_value, params)
edit_mock.assert_not_called()
@ -289,13 +335,28 @@ class TestUpsertOperationFunctionalTests(object):
def get_operation_spec(name):
return operations[name]
def request_handler(url_path=None, http_method=None, body_params=None, path_params=None, query_params=None):
if http_method == HTTPMethod.POST:
assert url_path == url
assert body_params == params['data']
assert query_params == {}
assert path_params == params['path_params']
return {
ResponseParams.SUCCESS: True,
ResponseParams.RESPONSE: ADD_RESPONSE
}
elif http_method == HTTPMethod.GET:
return {
ResponseParams.SUCCESS: True,
ResponseParams.RESPONSE: {'items': []}
}
else:
assert False
connection_mock.get_operation_spec = get_operation_spec
connection_mock.get_operation_specs_by_model_name.return_value = operations
connection_mock.send_request.return_value = {
ResponseParams.SUCCESS: True,
ResponseParams.RESPONSE: ADD_RESPONSE
}
connection_mock.send_request = request_handler
params = {
'operation': 'upsertObject',
'data': {'id': '123', 'name': 'testObject', 'type': 'object'},
@ -305,13 +366,62 @@ class TestUpsertOperationFunctionalTests(object):
result = self._resource_execute_operation(params, connection=connection_mock)
connection_mock.send_request.assert_called_once_with(url_path=url,
http_method=HTTPMethod.POST,
path_params=params['path_params'],
query_params={},
body_params=params['data'])
assert ADD_RESPONSE == result
def test_module_should_fail_when_no_model(self, connection_mock):
connection_mock.get_model_spec.return_value = None
params = {
'operation': 'upsertObject',
'data': {'id': '123', 'name': 'testObject', 'type': 'object'},
'path_params': {'objId': '123'},
'register_as': 'test_var'
}
with pytest.raises(FtdInvalidOperationNameError) as exc_info:
self._resource_execute_operation(params, connection=connection_mock)
assert 'upsertObject' == exc_info.value.operation_name
def test_module_should_fail_when_no_add_operation_and_no_object(self, connection_mock):
url = '/test'
operations = {
'getObjectList': {
'method': HTTPMethod.GET,
'url': url,
'modelName': 'Object',
'returnMultipleItems': True},
'editObject': {
'method': HTTPMethod.PUT,
'modelName': 'Object',
'url': '/test/{objId}'},
'otherObjectOperation': {
'method': HTTPMethod.GET,
'modelName': 'Object',
'url': '/test/{objId}',
'returnMultipleItems': False
}}
def get_operation_spec(name):
return operations[name]
connection_mock.get_operation_spec = get_operation_spec
connection_mock.get_operation_specs_by_model_name.return_value = operations
connection_mock.send_request.return_value = {
ResponseParams.SUCCESS: True,
ResponseParams.RESPONSE: {'items': []}
}
params = {
'operation': 'upsertObject',
'data': {'id': '123', 'name': 'testObject', 'type': 'object'},
'path_params': {'objId': '123'},
'register_as': 'test_var'
}
with pytest.raises(FtdConfigurationError) as exc_info:
self._resource_execute_operation(params, connection=connection_mock)
assert ADD_OPERATION_NOT_SUPPORTED_ERROR in str(exc_info.value)
# test when object exists but with different fields(except id)
def test_module_should_update_object_when_upsert_operation_and_object_exists(self, connection_mock):
url = '/test'

View file

@ -1,4 +1,4 @@
# Copyright (c) 2018 Cisco and/or its affiliates.
# Copyright (c) 2018-2019 Cisco and/or its affiliates.
#
# This file is part of Ansible
#
@ -105,10 +105,10 @@ class TestFtdConfiguration(object):
def test_module_should_run_successful(self, resource_mock):
operation_name = 'test name'
resource_mock.return_value = 'ok'
resource_mock.return_value = {'result': 'ok'}
result = self._run_module({'operation': operation_name})
assert result['response'] == 'ok'
assert result['response'] == {'result': 'ok'}
def _run_module(self, module_args):
set_module_args(module_args)

View file

@ -1,31 +1,13 @@
# Copyright (c) 2018 Cisco and/or its affiliates.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import absolute_import
import pytest
from ansible.module_utils import basic
from ansible.module_utils.network.ftd.common import HTTPMethod
from ansible.module_utils.network.ftd.fdm_swagger_client import OperationField
from ansible.modules.network.ftd import ftd_file_upload
from units.modules.utils import set_module_args, exit_json, fail_json, AnsibleFailJson, AnsibleExitJson
from ansible.modules.network.ftd import ftd_file_upload
from ansible.module_utils.network.ftd.fdm_swagger_client import OperationField
from ansible.module_utils.network.ftd.common import HTTPMethod
class TestFtdFileUpload(object):
module = ftd_file_upload

View file

@ -1,4 +1,5 @@
# Copyright (c) 2018 Cisco and/or its affiliates.
# Copyright (c) 2018 Cisco and/or its affiliates.
#
# This file is part of Ansible
#
@ -21,20 +22,20 @@ import json
from ansible.module_utils.six.moves.urllib.error import HTTPError
from units.compat import mock
from units.compat import unittest
from units.compat.builtins import BUILTINS
from units.compat.mock import mock_open, patch
from ansible.errors import AnsibleConnectionFailure
from ansible.module_utils.connection import ConnectionError
from ansible.module_utils.network.ftd.common import HTTPMethod, ResponseParams
from ansible.module_utils.network.ftd.fdm_swagger_client import SpecProp, FdmSwaggerParser
from ansible.module_utils.six import BytesIO, StringIO
from ansible.plugins.httpapi.ftd import HttpApi
from ansible.module_utils.six import BytesIO, PY3, StringIO
from ansible.plugins.httpapi.ftd import HttpApi, BASE_HEADERS, TOKEN_PATH_TEMPLATE, DEFAULT_API_VERSIONS
EXPECTED_BASE_HEADERS = {
'Accept': 'application/json',
'Content-Type': 'application/json'
}
if PY3:
BUILTINS_NAME = 'builtins'
else:
BUILTINS_NAME = '__builtin__'
class FakeFtdHttpApiPlugin(HttpApi):
@ -48,6 +49,9 @@ class FakeFtdHttpApiPlugin(HttpApi):
def get_option(self, var):
return self.hostvars[var]
def set_option(self, var, val):
self.hostvars[var] = val
class TestFtdHttpApi(unittest.TestCase):
@ -84,7 +88,7 @@ class TestFtdHttpApi(unittest.TestCase):
expected_body = json.dumps({'grant_type': 'refresh_token', 'refresh_token': 'REFRESH_TOKEN'})
self.connection_mock.send.assert_called_once_with(mock.ANY, expected_body, headers=mock.ANY, method=mock.ANY)
def test_login_should_use_host_variable_when_set(self):
def test_login_should_use_env_variable_when_set(self):
temp_token_path = self.ftd_plugin.hostvars['token_path']
self.ftd_plugin.hostvars['token_path'] = '/testFakeLoginUrl'
self.connection_mock.send.return_value = self._connection_response(
@ -146,7 +150,7 @@ class TestFtdHttpApi(unittest.TestCase):
assert {ResponseParams.SUCCESS: True, ResponseParams.STATUS_CODE: 200,
ResponseParams.RESPONSE: exp_resp} == resp
self.connection_mock.send.assert_called_once_with('/test/123?at=0', '{"name": "foo"}', method=HTTPMethod.PUT,
headers=EXPECTED_BASE_HEADERS)
headers=BASE_HEADERS)
def test_send_request_should_return_empty_dict_when_no_response_data(self):
self.connection_mock.send.return_value = self._connection_response(None)
@ -155,7 +159,7 @@ class TestFtdHttpApi(unittest.TestCase):
assert {ResponseParams.SUCCESS: True, ResponseParams.STATUS_CODE: 200, ResponseParams.RESPONSE: {}} == resp
self.connection_mock.send.assert_called_once_with('/test', None, method=HTTPMethod.GET,
headers=EXPECTED_BASE_HEADERS)
headers=BASE_HEADERS)
def test_send_request_should_return_error_info_when_http_error_raises(self):
self.connection_mock.send.side_effect = HTTPError('http://testhost.com', 500, '', {},
@ -198,7 +202,7 @@ class TestFtdHttpApi(unittest.TestCase):
self.connection_mock.send.return_value = self._connection_response('File content')
open_mock = mock_open()
with patch('%s.open' % BUILTINS, open_mock):
with patch('%s.open' % BUILTINS_NAME, open_mock):
self.ftd_plugin.download_file('/files/1', '/tmp/test.txt')
open_mock.assert_called_once_with('/tmp/test.txt', 'wb')
@ -213,7 +217,7 @@ class TestFtdHttpApi(unittest.TestCase):
self.connection_mock.send.return_value = response, response_data
open_mock = mock_open()
with patch('%s.open' % BUILTINS, open_mock):
with patch('%s.open' % BUILTINS_NAME, open_mock):
self.ftd_plugin.download_file('/files/1', '/tmp/')
open_mock.assert_called_once_with('/tmp/%s' % filename, 'wb')
@ -226,11 +230,11 @@ class TestFtdHttpApi(unittest.TestCase):
self.connection_mock.send.return_value = self._connection_response({'id': '123'})
open_mock = mock_open()
with patch('%s.open' % BUILTINS, open_mock):
with patch('%s.open' % BUILTINS_NAME, open_mock):
resp = self.ftd_plugin.upload_file('/tmp/test.txt', '/files')
assert {'id': '123'} == resp
exp_headers = dict(EXPECTED_BASE_HEADERS)
exp_headers = dict(BASE_HEADERS)
exp_headers['Content-Length'] = len('--Encoded data--')
exp_headers['Content-Type'] = 'multipart/form-data'
self.connection_mock.send.assert_called_once_with('/files', data='--Encoded data--',
@ -244,7 +248,7 @@ class TestFtdHttpApi(unittest.TestCase):
self.connection_mock.send.return_value = self._connection_response('invalidJsonResponse')
open_mock = mock_open()
with patch('%s.open' % BUILTINS, open_mock):
with patch('%s.open' % BUILTINS_NAME, open_mock):
with self.assertRaises(ConnectionError) as res:
self.ftd_plugin.upload_file('/tmp/test.txt', '/files')
@ -271,7 +275,7 @@ class TestFtdHttpApi(unittest.TestCase):
assert self.ftd_plugin.get_model_spec('NonExistingTestModel') is None
@patch.object(FdmSwaggerParser, 'parse_spec')
def test_get_model_spec(self, parse_spec_mock):
def test_get_operation_spec_by_model_name(self, parse_spec_mock):
self.connection_mock.send.return_value = self._connection_response(None)
operation1 = {'modelName': 'TestModel'}
op_model_name_is_none = {'modelName': None}
@ -315,3 +319,96 @@ class TestFtdHttpApi(unittest.TestCase):
response_text = json.dumps(response) if type(response) is dict else response
response_data = BytesIO(response_text.encode() if response_text else ''.encode())
return response_mock, response_data
def test_get_list_of_supported_api_versions_with_failed_http_request(self):
error_msg = "Invalid Credentials"
fp = mock.MagicMock()
fp.read.return_value = '{{"error-msg": "{0}"}}'.format(error_msg)
send_mock = mock.MagicMock(side_effect=HTTPError('url', 400, 'msg', 'hdrs', fp))
with mock.patch.object(self.ftd_plugin.connection, 'send', send_mock):
with self.assertRaises(ConnectionError) as res:
self.ftd_plugin._get_supported_api_versions()
assert error_msg in str(res.exception)
def test_get_list_of_supported_api_versions_with_buggy_response(self):
error_msg = "Non JSON value"
http_response_mock = mock.MagicMock()
http_response_mock.getvalue.return_value = error_msg
send_mock = mock.MagicMock(return_value=(None, http_response_mock))
with mock.patch.object(self.ftd_plugin.connection, 'send', send_mock):
with self.assertRaises(ConnectionError) as res:
self.ftd_plugin._get_supported_api_versions()
assert error_msg in str(res.exception)
def test_get_list_of_supported_api_versions_with_positive_response(self):
http_response_mock = mock.MagicMock()
http_response_mock.getvalue.return_value = '{"supportedVersions": ["v1"]}'
send_mock = mock.MagicMock(return_value=(None, http_response_mock))
with mock.patch.object(self.ftd_plugin.connection, 'send', send_mock):
supported_versions = self.ftd_plugin._get_supported_api_versions()
assert supported_versions == ['v1']
@patch('ansible.plugins.httpapi.ftd.HttpApi._get_api_token_path', mock.MagicMock(return_value=None))
@patch('ansible.plugins.httpapi.ftd.HttpApi._get_known_token_paths')
def test_lookup_login_url_with_empty_response(self, get_known_token_paths_mock):
payload = mock.MagicMock()
get_known_token_paths_mock.return_value = []
self.assertRaises(
ConnectionError,
self.ftd_plugin._lookup_login_url,
payload
)
@patch('ansible.plugins.httpapi.ftd.HttpApi._get_known_token_paths')
@patch('ansible.plugins.httpapi.ftd.HttpApi._send_login_request')
def test_lookup_login_url_with_failed_request(self, api_request_mock, get_known_token_paths_mock):
payload = mock.MagicMock()
url = mock.MagicMock()
get_known_token_paths_mock.return_value = [url]
api_request_mock.side_effect = ConnectionError('Error message')
with mock.patch.object(self.ftd_plugin.connection, 'queue_message') as display_mock:
self.assertRaises(
ConnectionError,
self.ftd_plugin._lookup_login_url,
payload
)
assert display_mock.called
@patch('ansible.plugins.httpapi.ftd.HttpApi._get_api_token_path', mock.MagicMock(return_value=None))
@patch('ansible.plugins.httpapi.ftd.HttpApi._get_known_token_paths')
@patch('ansible.plugins.httpapi.ftd.HttpApi._send_login_request')
@patch('ansible.plugins.httpapi.ftd.HttpApi._set_api_token_path')
def test_lookup_login_url_with_positive_result(self, set_api_token_mock, api_request_mock,
get_known_token_paths_mock):
payload = mock.MagicMock()
url = mock.MagicMock()
get_known_token_paths_mock.return_value = [url]
response_mock = mock.MagicMock()
api_request_mock.return_value = response_mock
resp = self.ftd_plugin._lookup_login_url(payload)
set_api_token_mock.assert_called_once_with(url)
assert resp == response_mock
@patch('ansible.plugins.httpapi.ftd.HttpApi._get_supported_api_versions')
def test_get_known_token_paths_with_positive_response(self, get_list_of_supported_api_versions_mock):
test_versions = ['v1', 'v2']
get_list_of_supported_api_versions_mock.return_value = test_versions
result = self.ftd_plugin._get_known_token_paths()
assert result == [TOKEN_PATH_TEMPLATE.format(version) for version in test_versions]
@patch('ansible.plugins.httpapi.ftd.HttpApi._get_supported_api_versions')
def test_get_known_token_paths_with_failed_api_call(self, get_list_of_supported_api_versions_mock):
get_list_of_supported_api_versions_mock.side_effect = ConnectionError('test error message')
result = self.ftd_plugin._get_known_token_paths()
assert result == [TOKEN_PATH_TEMPLATE.format(version) for version in DEFAULT_API_VERSIONS]
def test_set_api_token_path(self):
url = mock.MagicMock()
self.ftd_plugin._set_api_token_path(url)
assert self.ftd_plugin._get_api_token_path() == url