ansible/test/units/plugins/httpapi/test_checkpoint.py

87 lines
3.2 KiB
Python
Raw Normal View History

Checkpoint httpapi plugin (#49929) * Add checkpoint httpapi plugin and access rule facts module * WIP checkpoint_access_rule module * Add publish and install policy, plus fix empty json object request for publish * Refactor publish and install_policy onto module_utils * Add update resource logic * Add checkpoint_host_facts module * Return code and response on get_acess_rule function * Add checkpoint_host module * Add checkpoint_run_script module * Add checkpoint_task_facts module * Show all tasks if no task id is passed Note, this is only available on v1.3 of Checkpoint WS API * Add update logic to checkpoint host * Add full details on get task call * Add checkpoint httpapi plugin * Fix pep8 * Use auth instead of sid property and return False on handle_httperror method * Fix version in docstring * Remove constructor * Remove Accept from base headers * Do not override http error handler and assign Checkpoint sid to connection _auth There is scaffolding in the base class to autoappend the token, given it is assigned to connection _send * Use new connection queue message method instead of display * Remove unused display * Catch ValueError, since it's a parent of JSONDecodeError * Make static methods that are not used outside the class regular methods * Add missing self to previously static methods * Fix logout Was carrying copy pasta from ftd plugin * Remove send_auth_request * Use BASE_HEADERS constant * Simplify copyright header on httpapi plugin * Remove access rule module * Remove unused imports * Add unit test * Fix pep8 * Add test * Add test * Fix pep8
2019-01-07 14:02:29 +01:00
# (c) 2018 Red Hat Inc.
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
Checkpoint httpapi plugin (#49929) * Add checkpoint httpapi plugin and access rule facts module * WIP checkpoint_access_rule module * Add publish and install policy, plus fix empty json object request for publish * Refactor publish and install_policy onto module_utils * Add update resource logic * Add checkpoint_host_facts module * Return code and response on get_acess_rule function * Add checkpoint_host module * Add checkpoint_run_script module * Add checkpoint_task_facts module * Show all tasks if no task id is passed Note, this is only available on v1.3 of Checkpoint WS API * Add update logic to checkpoint host * Add full details on get task call * Add checkpoint httpapi plugin * Fix pep8 * Use auth instead of sid property and return False on handle_httperror method * Fix version in docstring * Remove constructor * Remove Accept from base headers * Do not override http error handler and assign Checkpoint sid to connection _auth There is scaffolding in the base class to autoappend the token, given it is assigned to connection _send * Use new connection queue message method instead of display * Remove unused display * Catch ValueError, since it's a parent of JSONDecodeError * Make static methods that are not used outside the class regular methods * Add missing self to previously static methods * Fix logout Was carrying copy pasta from ftd plugin * Remove send_auth_request * Use BASE_HEADERS constant * Simplify copyright header on httpapi plugin * Remove access rule module * Remove unused imports * Add unit test * Fix pep8 * Add test * Add test * Fix pep8
2019-01-07 14:02:29 +01:00
import json
from ansible.module_utils.six.moves.urllib.error import HTTPError
from units.compat import mock
from units.compat import unittest
from ansible.errors import AnsibleConnectionFailure
from ansible.module_utils.connection import ConnectionError
from ansible.module_utils.six import BytesIO, StringIO
from ansible.plugins.httpapi.checkpoint import HttpApi
EXPECTED_BASE_HEADERS = {
'Content-Type': 'application/json'
}
class FakeCheckpointHttpApiPlugin(HttpApi):
def __init__(self, conn):
super(FakeCheckpointHttpApiPlugin, self).__init__(conn)
self.hostvars = {
'domain': None
}
def get_option(self, var):
return self.hostvars[var]
def set_option(self, var, val):
self.hostvars[var] = val
Checkpoint httpapi plugin (#49929) * Add checkpoint httpapi plugin and access rule facts module * WIP checkpoint_access_rule module * Add publish and install policy, plus fix empty json object request for publish * Refactor publish and install_policy onto module_utils * Add update resource logic * Add checkpoint_host_facts module * Return code and response on get_acess_rule function * Add checkpoint_host module * Add checkpoint_run_script module * Add checkpoint_task_facts module * Show all tasks if no task id is passed Note, this is only available on v1.3 of Checkpoint WS API * Add update logic to checkpoint host * Add full details on get task call * Add checkpoint httpapi plugin * Fix pep8 * Use auth instead of sid property and return False on handle_httperror method * Fix version in docstring * Remove constructor * Remove Accept from base headers * Do not override http error handler and assign Checkpoint sid to connection _auth There is scaffolding in the base class to autoappend the token, given it is assigned to connection _send * Use new connection queue message method instead of display * Remove unused display * Catch ValueError, since it's a parent of JSONDecodeError * Make static methods that are not used outside the class regular methods * Add missing self to previously static methods * Fix logout Was carrying copy pasta from ftd plugin * Remove send_auth_request * Use BASE_HEADERS constant * Simplify copyright header on httpapi plugin * Remove access rule module * Remove unused imports * Add unit test * Fix pep8 * Add test * Add test * Fix pep8
2019-01-07 14:02:29 +01:00
class TestCheckpointHttpApi(unittest.TestCase):
def setUp(self):
self.connection_mock = mock.Mock()
self.checkpoint_plugin = FakeCheckpointHttpApiPlugin(self.connection_mock)
self.checkpoint_plugin._load_name = 'httpapi'
def test_login_raises_exception_when_username_and_password_are_not_provided(self):
with self.assertRaises(AnsibleConnectionFailure) as res:
self.checkpoint_plugin.login(None, None)
assert 'Username and password are required' in str(res.exception)
def test_login_raises_exception_when_invalid_response(self):
self.connection_mock.send.return_value = self._connection_response(
{'NOSIDKEY': 'NOSIDVALUE'}
)
with self.assertRaises(ConnectionError) as res:
self.checkpoint_plugin.login('foo', 'bar')
assert 'Server returned response without token info during connection authentication' in str(res.exception)
def test_send_request_should_return_error_info_when_http_error_raises(self):
self.connection_mock.send.side_effect = HTTPError('http://testhost.com', 500, '', {},
StringIO('{"errorMessage": "ERROR"}'))
resp = self.checkpoint_plugin.send_request('/test', None)
assert resp == (500, {'errorMessage': 'ERROR'})
def test_login_to_global_domain(self):
temp_domain = self.checkpoint_plugin.hostvars['domain']
self.checkpoint_plugin.hostvars['domain'] = 'test_domain'
self.connection_mock.send.return_value = self._connection_response(
{'sid': 'SID', 'uid': 'UID'}
)
self.checkpoint_plugin.login('USERNAME', 'PASSWORD')
self.connection_mock.send.assert_called_once_with('/web_api/login', mock.ANY, headers=mock.ANY,
method=mock.ANY)
self.checkpoint_plugin.hostvars['domain'] = temp_domain
Checkpoint httpapi plugin (#49929) * Add checkpoint httpapi plugin and access rule facts module * WIP checkpoint_access_rule module * Add publish and install policy, plus fix empty json object request for publish * Refactor publish and install_policy onto module_utils * Add update resource logic * Add checkpoint_host_facts module * Return code and response on get_acess_rule function * Add checkpoint_host module * Add checkpoint_run_script module * Add checkpoint_task_facts module * Show all tasks if no task id is passed Note, this is only available on v1.3 of Checkpoint WS API * Add update logic to checkpoint host * Add full details on get task call * Add checkpoint httpapi plugin * Fix pep8 * Use auth instead of sid property and return False on handle_httperror method * Fix version in docstring * Remove constructor * Remove Accept from base headers * Do not override http error handler and assign Checkpoint sid to connection _auth There is scaffolding in the base class to autoappend the token, given it is assigned to connection _send * Use new connection queue message method instead of display * Remove unused display * Catch ValueError, since it's a parent of JSONDecodeError * Make static methods that are not used outside the class regular methods * Add missing self to previously static methods * Fix logout Was carrying copy pasta from ftd plugin * Remove send_auth_request * Use BASE_HEADERS constant * Simplify copyright header on httpapi plugin * Remove access rule module * Remove unused imports * Add unit test * Fix pep8 * Add test * Add test * Fix pep8
2019-01-07 14:02:29 +01:00
@staticmethod
def _connection_response(response, status=200):
response_mock = mock.Mock()
response_mock.getcode.return_value = status
response_text = json.dumps(response) if type(response) is dict else response
response_data = BytesIO(response_text.encode() if response_text else ''.encode())
return response_mock, response_data