Matt Martz 8bd4e2a144 cert validation fixes - Attempt 2 (#55953)
* Attempt 2 of cert validation fixes

* Remove unused code

* Cleanup the tmp cert using atexit

* Fix linting issues

* Only add SSLValidationHandler when not HAS_SSLCONTEXT

* Catch value errors on non PEM certs

* Only catch NotImplementedError to avoid masking issues

* set self._context even with PyOpenSSLContext for conformity

* Fix error building

* normalize how we interact with the context we create

* Remove unused code

* Address test for py3.7 message difference

* open_url should pass the ca_path through

* Account for new error in url lookup test

* Guard some code behind whether or not we are validating certs

* Make _make_context public

* Move atexit.register up to where the tmp file is created
2019-05-31 16:35:25 -04:00

220 lines
8.2 KiB

# -*- coding: utf-8 -*-
# (c) 2018 Matt Martz <>
# GNU General Public License v3.0+ (see COPYING or
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import socket
from ansible.module_utils.six import StringIO
from ansible.module_utils.six.moves.http_cookiejar import Cookie
from ansible.module_utils.six.moves.http_client import HTTPMessage
from ansible.module_utils.urls import fetch_url, urllib_error, ConnectionError, NoSSLError, httplib
import pytest
from mock import MagicMock
class AnsibleModuleExit(Exception):
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
class ExitJson(AnsibleModuleExit):
class FailJson(AnsibleModuleExit):
def open_url_mock(mocker):
return mocker.patch('ansible.module_utils.urls.open_url')
def fake_ansible_module():
return FakeAnsibleModule()
class FakeAnsibleModule:
def __init__(self):
self.params = {}
self.tmpdir = None
def exit_json(self, *args, **kwargs):
raise ExitJson(*args, **kwargs)
def fail_json(self, *args, **kwargs):
raise FailJson(*args, **kwargs)
def test_fetch_url_no_urlparse(mocker, fake_ansible_module):
mocker.patch('ansible.module_utils.urls.HAS_URLPARSE', new=False)
with pytest.raises(FailJson):
fetch_url(fake_ansible_module, '')
def test_fetch_url(open_url_mock, fake_ansible_module):
r, info = fetch_url(fake_ansible_module, '')
dummy, kwargs = open_url_mock.call_args
open_url_mock.assert_called_once_with('', client_cert=None, client_key=None, cookies=kwargs['cookies'], data=None,
follow_redirects='urllib2', force=False, force_basic_auth='', headers=None,
http_agent='ansible-httpget', last_mod_time=None, method=None, timeout=10, url_password='', url_username='',
use_proxy=True, validate_certs=True, use_gssapi=False, unix_socket=None, ca_path=None)
def test_fetch_url_params(open_url_mock, fake_ansible_module):
fake_ansible_module.params = {
'validate_certs': False,
'url_username': 'user',
'url_password': 'passwd',
'http_agent': 'ansible-test',
'force_basic_auth': True,
'follow_redirects': 'all',
'client_cert': 'client.pem',
'client_key': 'client.key',
r, info = fetch_url(fake_ansible_module, '')
dummy, kwargs = open_url_mock.call_args
open_url_mock.assert_called_once_with('', client_cert='client.pem', client_key='client.key', cookies=kwargs['cookies'], data=None,
follow_redirects='all', force=False, force_basic_auth=True, headers=None,
http_agent='ansible-test', last_mod_time=None, method=None, timeout=10, url_password='passwd', url_username='user',
use_proxy=True, validate_certs=False, use_gssapi=False, unix_socket=None, ca_path=None)
def test_fetch_url_cookies(mocker, fake_ansible_module):
def make_cookies(*args, **kwargs):
cookies = kwargs['cookies']
r = MagicMock()
r.headers = HTTPMessage()
add_header = r.headers.add_header
except TypeError:
# PY2
r.headers = HTTPMessage(StringIO())
add_header = r.headers.addheader = r.headers
for name, value in (('Foo', 'bar'), ('Baz', 'qux')):
cookie = Cookie(
add_header('Set-Cookie', '%s=%s' % (name, value))
return r
mocker = mocker.patch('ansible.module_utils.urls.open_url', new=make_cookies)
r, info = fetch_url(fake_ansible_module, '')
assert info['cookies'] == {'Baz': 'qux', 'Foo': 'bar'}
# Python sorts cookies in order of most specific (ie. longest) path first
# items with the same path are reversed from response order
assert info['cookies_string'] == 'Baz=qux; Foo=bar'
# The key here has a `-` as opposed to what we see in the `uri` module that converts to `_`
# Note: this is response order, which differs from cookies_string
assert info['set-cookie'] == 'Foo=bar, Baz=qux'
def test_fetch_url_nossl(open_url_mock, fake_ansible_module, mocker):
mocker.patch('ansible.module_utils.urls.get_distribution', return_value='notredhat')
open_url_mock.side_effect = NoSSLError
with pytest.raises(FailJson) as excinfo:
fetch_url(fake_ansible_module, '')
assert 'python-ssl' not in excinfo.value.kwargs['msg']
mocker.patch('ansible.module_utils.urls.get_distribution', return_value='redhat')
open_url_mock.side_effect = NoSSLError
with pytest.raises(FailJson) as excinfo:
fetch_url(fake_ansible_module, '')
assert 'python-ssl' in excinfo.value.kwargs['msg']
assert'' == excinfo.value.kwargs['url']
assert excinfo.value.kwargs['status'] == -1
def test_fetch_url_connectionerror(open_url_mock, fake_ansible_module):
open_url_mock.side_effect = ConnectionError('TESTS')
with pytest.raises(FailJson) as excinfo:
fetch_url(fake_ansible_module, '')
assert excinfo.value.kwargs['msg'] == 'TESTS'
assert'' == excinfo.value.kwargs['url']
assert excinfo.value.kwargs['status'] == -1
open_url_mock.side_effect = ValueError('TESTS')
with pytest.raises(FailJson) as excinfo:
fetch_url(fake_ansible_module, '')
assert excinfo.value.kwargs['msg'] == 'TESTS'
assert'' == excinfo.value.kwargs['url']
assert excinfo.value.kwargs['status'] == -1
def test_fetch_url_httperror(open_url_mock, fake_ansible_module):
open_url_mock.side_effect = urllib_error.HTTPError(
'Internal Server Error',
{'Content-Type': 'application/json'},
r, info = fetch_url(fake_ansible_module, '')
assert info == {'msg': 'HTTP Error 500: Internal Server Error', 'body': 'TESTS',
'status': 500, 'url': '', 'content-type': 'application/json'}
def test_fetch_url_urlerror(open_url_mock, fake_ansible_module):
open_url_mock.side_effect = urllib_error.URLError('TESTS')
r, info = fetch_url(fake_ansible_module, '')
assert info == {'msg': 'Request failed: <urlopen error TESTS>', 'status': -1, 'url': ''}
def test_fetch_url_socketerror(open_url_mock, fake_ansible_module):
open_url_mock.side_effect = socket.error('TESTS')
r, info = fetch_url(fake_ansible_module, '')
assert info == {'msg': 'Connection failure: TESTS', 'status': -1, 'url': ''}
def test_fetch_url_exception(open_url_mock, fake_ansible_module):
open_url_mock.side_effect = Exception('TESTS')
r, info = fetch_url(fake_ansible_module, '')
exception = info.pop('exception')
assert info == {'msg': 'An unknown error occurred: TESTS', 'status': -1, 'url': ''}
assert "Exception: TESTS" in exception
def test_fetch_url_badstatusline(open_url_mock, fake_ansible_module):
open_url_mock.side_effect = httplib.BadStatusLine('TESTS')
r, info = fetch_url(fake_ansible_module, '')
assert info == {'msg': 'Connection failure: connection was closed before a valid response was received: TESTS', 'status': -1, 'url': ''}