Make fetch_url check the server's certificate on https connections

This commit is contained in:
Toshio Kuratomi 2015-05-28 12:35:37 -07:00
parent e59d4f3b51
commit afc19894e1
2 changed files with 52 additions and 17 deletions

View file

@ -50,6 +50,15 @@ try:
except: except:
HAS_SSL=False HAS_SSL=False
HAS_MATCH_HOSTNAME = True
try:
from ssl import match_hostname, CertificateError
except ImportError:
try:
from backports.ssl_match_hostname import match_hostname, CertificateError
except ImportError:
HAS_MATCH_HOSTNAME = False
import httplib import httplib
import os import os
import re import re
@ -293,11 +302,13 @@ class SSLValidationHandler(urllib2.BaseHandler):
connect_result = s.recv(4096) connect_result = s.recv(4096)
self.validate_proxy_response(connect_result) self.validate_proxy_response(connect_result)
ssl_s = ssl.wrap_socket(s, ca_certs=tmp_ca_cert_path, cert_reqs=ssl.CERT_REQUIRED) ssl_s = ssl.wrap_socket(s, ca_certs=tmp_ca_cert_path, cert_reqs=ssl.CERT_REQUIRED)
match_hostname(ssl_s.getpeercert(), self.hostname)
else: else:
self.module.fail_json(msg='Unsupported proxy scheme: %s. Currently ansible only supports HTTP proxies.' % proxy_parts.get('scheme')) self.module.fail_json(msg='Unsupported proxy scheme: %s. Currently ansible only supports HTTP proxies.' % proxy_parts.get('scheme'))
else: else:
s.connect((self.hostname, self.port)) s.connect((self.hostname, self.port))
ssl_s = ssl.wrap_socket(s, ca_certs=tmp_ca_cert_path, cert_reqs=ssl.CERT_REQUIRED) ssl_s = ssl.wrap_socket(s, ca_certs=tmp_ca_cert_path, cert_reqs=ssl.CERT_REQUIRED)
match_hostname(ssl_s.getpeercert(), self.hostname)
# close the ssl connection # close the ssl connection
#ssl_s.unwrap() #ssl_s.unwrap()
s.close() s.close()
@ -311,6 +322,9 @@ class SSLValidationHandler(urllib2.BaseHandler):
'Use validate_certs=no or make sure your managed systems have a valid CA certificate installed. ' + \ 'Use validate_certs=no or make sure your managed systems have a valid CA certificate installed. ' + \
'Paths checked for this platform: %s' % ", ".join(paths_checked) 'Paths checked for this platform: %s' % ", ".join(paths_checked)
) )
except CertificateError:
self.module.fail_json(msg="SSL Certificate does not belong to %s. Make sure the url has a certificate that belongs to it or use validate_certs=no (insecure)" % self.hostname)
try: try:
# cleanup the temp file created, don't worry # cleanup the temp file created, don't worry
# if it fails for some reason # if it fails for some reason
@ -363,28 +377,29 @@ def fetch_url(module, url, data=None, headers=None, method=None,
# FIXME: change the following to use the generic_urlparse function # FIXME: change the following to use the generic_urlparse function
# to remove the indexed references for 'parsed' # to remove the indexed references for 'parsed'
parsed = urlparse.urlparse(url) parsed = urlparse.urlparse(url)
if parsed[0] == 'https': if parsed[0] == 'https' and validate_certs:
if not HAS_SSL and validate_certs: if not HAS_SSL:
if distribution == 'Redhat': if distribution == 'Redhat':
module.fail_json(msg='SSL validation is not available in your version of python. You can use validate_certs=no, however this is unsafe and not recommended. You can also install python-ssl from EPEL') module.fail_json(msg='SSL validation is not available in your version of python. You can use validate_certs=no, however this is unsafe and not recommended. You can also install python-ssl from EPEL')
else: else:
module.fail_json(msg='SSL validation is not available in your version of python. You can use validate_certs=no, however this is unsafe and not recommended') module.fail_json(msg='SSL validation is not available in your version of python. You can use validate_certs=no, however this is unsafe and not recommended')
if not HAS_MATCH_HOSTNAME:
module.fail_json(msg='Available SSL validation does not check that the certificate matches the hostname. You can install backports.ssl_match_hostname or update your managed machine to python-2.7.9 or newer. You could also use validate_certs=no, however this is unsafe and not recommended')
elif validate_certs: # do the cert validation
# do the cert validation netloc = parsed[1]
netloc = parsed[1] if '@' in netloc:
if '@' in netloc: netloc = netloc.split('@', 1)[1]
netloc = netloc.split('@', 1)[1] if ':' in netloc:
if ':' in netloc: hostname, port = netloc.split(':', 1)
hostname, port = netloc.split(':', 1) port = int(port)
port = int(port) else:
else: hostname = netloc
hostname = netloc port = 443
port = 443 # create the SSL validation handler and
# create the SSL validation handler and # add it to the list of handlers
# add it to the list of handlers ssl_handler = SSLValidationHandler(module, hostname, port)
ssl_handler = SSLValidationHandler(module, hostname, port) handlers.append(ssl_handler)
handlers.append(ssl_handler)
if parsed[0] != 'ftp': if parsed[0] != 'ftp':
username = module.params.get('url_username', '') username = module.params.get('url_username', '')

View file

@ -25,3 +25,23 @@
that: that:
- result.changed - result.changed
- '"OK" in result.msg' - '"OK" in result.msg'
- name: test https fetch to a site with invalid domain
get_url:
url: "https://kennethreitz.org/"
dest: "{{ output_dir }}/shouldnotexist.html"
ignore_errors: True
register: result
- stat:
path: "{{ output_dir }}/shouldnotexist.html"
register: stat_result
- debug: var=result
- name: Assert that the file was not downloaded
assert:
that:
- "result.failed == true"
- "'Certificate does not belong to ' in result.msg"
- "stat_result.stat.exists == false"