diff --git a/lib/ansible/plugins/lookup/laps_password.py b/lib/ansible/plugins/lookup/laps_password.py new file mode 100644 index 00000000000..50a6cfee947 --- /dev/null +++ b/lib/ansible/plugins/lookup/laps_password.py @@ -0,0 +1,357 @@ +# (c) 2019 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +DOCUMENTATION = """ +lookup: laps_password +author: Jordan Borean (@jborean93) +version_added: "2.8" +short_description: Retrieves the LAPS password for a server. +description: +- This lookup returns the LAPS password set for a server from the Active Directory database. +- See U(https://github.com/jborean93/ansible-lookup-laps_password) for more information around installing + pre-requisites and testing. +options: + _terms: + description: + - The host name to retrieve the LAPS password for. + - This is the C(Common Name (CN)) of the host. + required: True + type: str + allow_plaintext: + description: + - When set to C(yes), will allow traffic to be sent unencrypted. + - It is highly recommended to not touch this to avoid any credentials being exposed over the network. + - Use C(scheme=ldaps), C(auth=gssapi), or C(start_tls=yes) to ensure the traffic is encrypted. + default: no + type: bool + auth: + description: + - The type of authentication to use when connecting to the Active Directory server + - When using C(simple), the I(username) and I(password) options must be set. If not using C(scheme=ldaps) or + C(start_tls=True) then these credentials are exposed in plaintext in the network traffic. + - It is recommended ot use C(gssapi) as it will encrypt the traffic automatically. + - When using C(gssapi), run C(kinit) before running Ansible to get a valid Kerberos ticket. + - You cannot use C(gssapi) when either C(scheme=ldaps) or C(start_tls=True) is set. + choices: + - simple + - gssapi + default: gssapi + type: str + cacert_file: + description: + - The path to a CA certificate PEM file to use for certificate validation. + - Certificate validation is used when C(scheme=ldaps) or C(start_tls=yes). + - This may fail on hosts with an older OpenLDAP install like MacOS, this will have to be updated before + reinstalling python-ldap to get working again. + type: str + domain: + description: + - The domain to search in to retrieve the LAPS password. + - This could either be a Windows domain name visible to the Ansible controller from DNS or a specific domain + controller FQDN. + - Supports either just the domain/host name or an explicit LDAP URI with the domain/host already filled in. + - If the URI is set, I(port) and I(scheme) are ignored. + required: True + type: str + password: + description: + - The password for C(username). + - Required when C(username) is set. + type: str + port: + description: + - The LDAP port to communicate over. + - If I(kdc) is already an LDAP URI then this is ignored. + type: int + scheme: + description: + - The LDAP scheme to use. + - When using C(ldap), it is recommended to set C(auth=gssapi), or C(start_tls=yes), otherwise traffic will be in + plaintext. + - The Active Directory host must be configured for C(ldaps) with a certificate before it can be used. + - If I(kdc) is already an LDAP URI then this is ignored. + choices: + - ldap + - ldaps + default: ldap + search_base: + description: + - Changes the search base used when searching for the host in Active Directory. + - Will default to search in the C(defaultNamingContext) of the Active Directory server. + - If multiple matches are found then a more explicit search_base is required so only 1 host is found. + - If searching a larger Active Directory database, it is recommended to narrow the search_base for performance + reasons. + type: str + start_tls: + description: + - When C(scheme=ldap), will use the StartTLS extension to encrypt traffic sent over the wire. + - This requires the Active Directory to be set up with a certificate that supports StartTLS. + - This is ignored when C(scheme=ldaps) as the traffic is already encrypted. + type: bool + default: no + username: + description: + - Required when using C(auth=simple). + - The username to authenticate with. + - Recommended to use the username in the UPN format, e.g. C(username@DOMAIN.COM). + - This is required when C(auth=simple) and is not supported when C(auth=gssapi). + - Call C(kinit) outside of Ansible if C(auth=gssapi) is required. + type: str + validate_certs: + description: + - When using C(scheme=ldaps) or C(start_tls=yes), this controls the certificate validation behaviour. + - C(demand) will fail if no certificate or an invalid certificate is provided. + - C(try) will fail for invalid certificates but will continue if no certificate is provided. + - C(allow) will request and check a certificate but will continue even if it is invalid. + - C(never) will not request a certificate from the server so no validation occurs. + default: demand + choices: + - never + - allow + - try + - demand + type: str +requirements: +- python-ldap +notes: +- If a host was found but had no LAPS password attribute C(ms-Mcs-AdmPwd), the lookup will fail. +- Due to the sensitive nature of the data travelling across the network, it is highly recommended to run with either + C(auth=gssapi), C(scheme=ldaps), or C(start_tls=yes). +- Failing to run with one of the above settings will result in the account credentials as well as the LAPS password to + be sent in plaintext. +- Some scenarios may not work when running on a host with an older OpenLDAP install like MacOS. It is recommended to + install the latest OpenLDAP version and build python-ldap against this, see + U(https://keathmilligan.net/python-ldap-and-macos/) for more information. +""" + +EXAMPLES = """ +# This isn't mandatory but it is a way to call kinit from within Ansible before calling the lookup +- name: call kinit to retrieve Kerberos token + expect: + command: kinit username@ANSIBLE.COM + responses: + (?i)password: SecretPass1 + no_log: True + +- name: Get the LAPS password using Kerberos auth, relies on kinit already being called + set_fact: + ansible_password: "{{ lookup('laps_password', 'SERVER', domain='dc01.ansible.com') }}" + +- name: Specific the domain host using an explicit LDAP URI + set_fact: + ansible_password: "{{ lookup('laps_password', 'SERVER', domain='ldap://ansible.com:389') }}" + +- name: Use Simple auth over LDAPS + set_fact: + ansible_password: "{{ lookup('laps_password', 'server', + domain='dc01.ansible.com', + auth='simple', + scheme='ldaps', + username='username@ANSIBLE.COM', + password='SuperSecret123') }}" + +- name: Use Simple auth with LDAP and StartTLS + set_fact: + ansible_password: "{{ lookup('laps_password', 'app01', + domain='dc01.ansible.com', + auth='simple', + start_tls=True, + username='username@ANSIBLE.COM', + password='SuperSecret123') }}" + +- name: Narrow down the search base to a an OU + set_fact: + ansible_password: "{{ lookup('laps_password', 'sql10', + domain='dc01.ansible.com', + search_base='OU=Databases,DC=ansible,DC=com') }}" + +- name: Set certificate file to use when validating the TLS certificate + set_fact: + ansible_password: "{{ lookup('laps_password', 'windows-pc', + domain='dc01.ansible.com', + start_tls=True, + cacert_file='/usr/local/share/certs/ad.pem') }}" +""" + +RETURN = """ +_raw: + description: + - The LAPS password(s) for the host(s) requested. + type: str +""" + +import os +import traceback + +from ansible.errors import AnsibleLookupError +from ansible.module_utils._text import to_bytes, to_native, to_text +from ansible.module_utils.basic import missing_required_lib +from ansible.plugins.lookup import LookupBase + +LDAP_IMP_ERR = None +try: + import ldap + import ldapurl + HAS_LDAP = True +except ImportError: + LDAP_IMP_ERR = traceback.format_exc() + HAS_LDAP = False + + +def get_laps_password(conn, cn, search_base): + search_filter = u"(&(objectClass=computer)(CN=%s))" % to_text(cn) + + ldap_results = conn.search_s(to_text(search_base), ldap.SCOPE_SUBTREE, search_filter, + attrlist=[u"distinguishedName", u"ms-Mcs-AdmPwd"]) + + # Filter out non server hosts, search_s seems to return 3 extra entries + # that are not computer classes, they do not have a distinguished name + # set in the returned results + valid_results = [attr for dn, attr in ldap_results if dn] + + if len(valid_results) == 0: + raise AnsibleLookupError("Failed to find the server '%s' in the base '%s'" % (cn, search_base)) + elif len(valid_results) > 1: + found_servers = [to_native(attr['distinguishedName'][0]) for attr in valid_results] + raise AnsibleLookupError("Found too many results for the server '%s' in the base '%s'. Specify a more " + "explicit search base for the server required. Found servers '%s'" + % (cn, search_base, "', '".join(found_servers))) + + password = valid_results[0].get('ms-Mcs-AdmPwd', None) + if not password: + distinguished_name = to_native(valid_results[0]['distinguishedName'][0]) + raise AnsibleLookupError("The server '%s' did not have the LAPS attribute 'ms-Mcs-AdmPwd'" % distinguished_name) + + return to_native(password[0]) + + +class LookupModule(LookupBase): + + def run(self, terms, variables=None, **kwargs): + if not HAS_LDAP: + msg = missing_required_lib("python-ldap", url="https://pypi.org/project/python-ldap/") + msg += ". Import Error: %s" % LDAP_IMP_ERR + raise AnsibleLookupError(msg) + + # Load the variables and direct args into the lookup options + self.set_options(var_options=variables, direct=kwargs) + domain = self.get_option('domain') + port = self.get_option('port') + scheme = self.get_option('scheme') + start_tls = self.get_option('start_tls') + validate_certs = self.get_option('validate_certs') + cacert_file = self.get_option('cacert_file') + search_base = self.get_option('search_base') + username = self.get_option('username') + password = self.get_option('password') + auth = self.get_option('auth') + allow_plaintext = self.get_option('allow_plaintext') + + # Validate and set input values + # https://www.openldap.org/lists/openldap-software/200202/msg00456.html + validate_certs_map = { + 'never': ldap.OPT_X_TLS_NEVER, + 'allow': ldap.OPT_X_TLS_ALLOW, + 'try': ldap.OPT_X_TLS_TRY, + 'demand': ldap.OPT_X_TLS_DEMAND, # Same as OPT_X_TLS_HARD + } + validate_certs_value = validate_certs_map.get(validate_certs, None) + if validate_certs_value is None: + valid_keys = list(validate_certs_map.keys()) + valid_keys.sort() + raise AnsibleLookupError("Invalid validate_certs value '%s': valid values are '%s'" + % (validate_certs, "', '".join(valid_keys))) + + if auth not in ['gssapi', 'simple']: + raise AnsibleLookupError("Invalid auth value '%s': expecting either 'gssapi', or 'simple'" % auth) + elif auth == 'gssapi': + if not ldap.SASL_AVAIL: + raise AnsibleLookupError("Cannot use auth=gssapi when SASL is not configured with the local LDAP " + "install") + if username or password: + raise AnsibleLookupError("Explicit credentials are not supported when auth='gssapi'. Call kinit " + "outside of Ansible") + elif auth == 'simple' and not (username and password): + raise AnsibleLookupError("The username and password values are required when auth=simple") + + if ldapurl.isLDAPUrl(domain): + ldap_url = ldapurl.LDAPUrl(ldapUrl=domain) + else: + port = port if port else 389 if scheme == 'ldap' else 636 + ldap_url = ldapurl.LDAPUrl(hostport="%s:%d" % (domain, port), urlscheme=scheme) + + # We have encryption if using LDAPS, or StartTLS is used, or we auth with SASL/GSSAPI + encrypted = ldap_url.urlscheme == 'ldaps' or start_tls or auth == 'gssapi' + if not encrypted and not allow_plaintext: + raise AnsibleLookupError("Current configuration will result in plaintext traffic exposing credentials. " + "Set auth=gssapi, scheme=ldaps, start_tls=True, or allow_plaintext=True to " + "continue") + + if ldap_url.urlscheme == 'ldaps' or start_tls: + # We cannot use conn.set_option as OPT_X_TLS_NEWCTX (required to use the new context) is not supported on + # older distros like EL7. Setting it on the ldap object works instead + if not ldap.TLS_AVAIL: + raise AnsibleLookupError("Cannot use TLS as the local LDAP installed has not been configured to support it") + + ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, validate_certs_value) + if cacert_file: + cacert_path = os.path.expanduser(os.path.expandvars(cacert_file)) + if not os.path.exists(to_bytes(cacert_path)): + raise AnsibleLookupError("The cacert_file specified '%s' does not exist" % to_native(cacert_path)) + + try: + # While this is a path, python-ldap expects a str/unicode and not bytes + ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, to_text(cacert_path)) + except ValueError: + # https://keathmilligan.net/python-ldap-and-macos/ + raise AnsibleLookupError("Failed to set path to cacert file, this is a known issue with older " + "OpenLDAP libraries on the host. Update OpenLDAP and reinstall " + "python-ldap to continue") + + conn_url = ldap_url.initializeUrl() + conn = ldap.initialize(conn_url, bytes_mode=False) + conn.set_option(ldap.OPT_PROTOCOL_VERSION, 3) + conn.set_option(ldap.OPT_REFERRALS, 0) # Allow us to search from the base + + # Make sure we run StartTLS before doing the bind to protect the credentials + if start_tls: + try: + conn.start_tls_s() + except ldap.LDAPError as err: + raise AnsibleLookupError("Failed to send StartTLS to LDAP host '%s': %s" + % (conn_url, to_native(err))) + + if auth == 'simple': + try: + conn.bind_s(to_text(username), to_text(password)) + except ldap.LDAPError as err: + raise AnsibleLookupError("Failed to simple bind against LDAP host '%s': %s" + % (conn_url, to_native(err))) + else: + try: + conn.sasl_gssapi_bind_s() + except ldap.AUTH_UNKNOWN as err: + # The SASL GSSAPI binding is not installed, e.g. cyrus-sasl-gssapi. Give a better error message than + # what python-ldap provides + raise AnsibleLookupError("Failed to do a sasl bind against LDAP host '%s', the GSSAPI mech is not " + "installed: %s" % (conn_url, to_native(err))) + except ldap.LDAPError as err: + raise AnsibleLookupError("Failed to do a sasl bind against LDAP host '%s': %s" + % (conn_url, to_native(err))) + + try: + if not search_base: + root_dse = conn.read_rootdse_s() + search_base = root_dse['defaultNamingContext'][0] + + ret = [] + # TODO: change method to search for all servers in 1 request instead of multiple requests + for server in terms: + ret.append(get_laps_password(conn, server, search_base)) + finally: + conn.unbind_s() + + return ret diff --git a/test/units/plugins/lookup/test_laps_password.py b/test/units/plugins/lookup/test_laps_password.py new file mode 100644 index 00000000000..a778e8b465e --- /dev/null +++ b/test/units/plugins/lookup/test_laps_password.py @@ -0,0 +1,517 @@ +# -*- coding: utf-8 -*- +# (c) 2019, Jordan Borean +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os +import platform +import pytest +import sys + +from units.compat.mock import MagicMock + +from ansible.errors import AnsibleLookupError +from ansible.plugins.loader import lookup_loader + + +class FakeLDAPError(Exception): + pass + + +class FakeLDAPAuthUnknownError(Exception): + pass + + +class FakeLdap(object): + SASL_AVAIL = 1 + TLS_AVAIL = 1 + + SCOPE_SUBTREE = 2 + + OPT_PROTOCOL_VERSION = 17 + OPT_REFERRALS = 8 + + OPT_X_TLS_NEVER = 0 + OPT_X_TLS_DEMAND = 2 + OPT_X_TLS_ALLOW = 3 + OPT_X_TLS_TRY = 4 + + OPT_X_TLS_CACERTFILE = 24578 + OPT_X_TLS_REQUIRE_CERT = 24582 + + LDAPError = FakeLDAPError + AUTH_UNKNOWN = FakeLDAPAuthUnknownError + + @staticmethod + def initialize(uri, bytes_mode=None, **kwargs): + return MagicMock() + + @staticmethod + def set_option(option, invalue): + pass + + +class FakeLdapUrl(object): + + def __init__(self, ldapUrl=None, urlscheme='ldap', hostport='', **kwargs): + url = ldapUrl if ldapUrl else "%s://%s" % (urlscheme, hostport) + self.urlscheme = url.split('://', 2)[0].lower() + self._url = url + + def initializeUrl(self): + return self._url + + +def fake_is_ldap_url(s): + s_lower = s.lower() + return s_lower.startswith("ldap://") or s_lower.startswith("ldaps://") or s_lower.startswith("ldapi://") + + +@pytest.fixture(autouse=True) +def laps_password(): + """Imports and the laps_password lookup with a mocks laps module for testing""" + + # Build the fake ldap and ldapurl Python modules + fake_ldap_obj = FakeLdap() + fake_ldap_url_obj = MagicMock() + fake_ldap_url_obj.isLDAPUrl.side_effect = fake_is_ldap_url + fake_ldap_url_obj.LDAPUrl.side_effect = FakeLdapUrl + + # Take a snapshot of sys.modules before we manipulate it + orig_modules = sys.modules.copy() + try: + sys.modules["ldap"] = fake_ldap_obj + sys.modules["ldapurl"] = fake_ldap_url_obj + + from ansible.plugins.lookup import laps_password + + # ensure the changes to these globals aren't persisted after each test + orig_has_ldap = laps_password.HAS_LDAP + orig_ldap_imp_err = laps_password.LDAP_IMP_ERR + + yield laps_password + + laps_password.HAS_LDAP = orig_has_ldap + laps_password.LDAP_IMP_ERR = orig_ldap_imp_err + finally: + # Restore sys.modules back to our pre-shenanigans + sys.modules = orig_modules + + +def test_missing_ldap(laps_password): + laps_password.HAS_LDAP = False + laps_password.LDAP_IMP_ERR = "no import for you!" + + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('laps_password').run(["host"], domain="test") + + assert str(err.value) == "Failed to import the required Python library (python-ldap) on %s's Python %s. See " \ + "https://pypi.org/project/python-ldap/ for more info. Please read module documentation " \ + "and install in the appropriate location. " \ + "Import Error: no import for you!" % (platform.node(), sys.executable) + + +def test_invalid_cert_mapping(): + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('laps_password').run(["host"], domain="test", validate_certs="incorrect") + + assert str(err.value) == "Invalid validate_certs value 'incorrect': valid values are 'allow', 'demand', " \ + "'never', 'try'" + + +def test_invalid_auth(): + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('laps_password').run(["host"], domain="test", auth="fail") + + assert str(err.value) == "Invalid auth value 'fail': expecting either 'gssapi', or 'simple'" + + +def test_gssapi_without_sasl(monkeypatch, ): + monkeypatch.setattr("ldap.SASL_AVAIL", 0) + + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('laps_password').run(["host"], domain="test") + + assert str(err.value) == "Cannot use auth=gssapi when SASL is not configured with the local LDAP install" + + +def test_simple_auth_without_credentials(): + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('laps_password').run(["host"], domain="test", auth="simple") + + assert str(err.value) == "The username and password values are required when auth=simple" + + +def test_gssapi_auth_with_credentials(): + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('laps_password').run(["host"], domain="test", auth="gssapi", username="u", password="p") + + assert str(err.value) == "Explicit credentials are not supported when auth='gssapi'. Call kinit outside of Ansible" + + +def test_not_encrypted_without_override(): + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('laps_password').run(["host"], domain="dc01", auth="simple", username="test", password="test") + + assert str(err.value) == "Current configuration will result in plaintext traffic exposing credentials. Set " \ + "auth=gssapi, scheme=ldaps, start_tls=True, or allow_plaintext=True to continue" + + +def test_ldaps_without_tls(monkeypatch, ): + monkeypatch.setattr("ldap.TLS_AVAIL", 0) + + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('laps_password').run(["host"], domain="dc01", scheme="ldaps") + + assert str(err.value) == "Cannot use TLS as the local LDAP installed has not been configured to support it" + + +def test_start_tls_without_tls(monkeypatch, ): + monkeypatch.setattr("ldap.TLS_AVAIL", 0) + + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('laps_password').run(["host"], domain="dc01", start_tls=True) + + assert str(err.value) == "Cannot use TLS as the local LDAP installed has not been configured to support it" + + +def test_normal_run(monkeypatch, laps_password): + def get_laps_password(conn, cn, search_base): + return "CN=%s,%s" % (cn, search_base) + + mock_ldap = MagicMock() + mock_ldap.return_value.read_rootdse_s.return_value = {"defaultNamingContext": ["DC=domain,DC=com"]} + monkeypatch.setattr("ldap.initialize", mock_ldap) + + mock_get_laps_password = MagicMock(side_effect=get_laps_password) + monkeypatch.setattr(laps_password, "get_laps_password", mock_get_laps_password) + + actual = lookup_loader.get('laps_password').run(["host1", "host2"], domain="dc01") + assert actual == ["CN=host1,DC=domain,DC=com", "CN=host2,DC=domain,DC=com"] + + # Verify the call count to get_laps_password + assert mock_get_laps_password.call_count == 2 + + # Verify the initialize() method call + assert mock_ldap.call_count == 1 + assert mock_ldap.call_args[0] == ("ldap://dc01:389",) + assert mock_ldap.call_args[1] == {"bytes_mode": False} + + # Verify the number of calls made to the mocked LDAP object + assert mock_ldap.mock_calls[1][0] == "().set_option" + assert mock_ldap.mock_calls[1][1] == (FakeLdap.OPT_PROTOCOL_VERSION, 3) + + assert mock_ldap.mock_calls[2][0] == "().set_option" + assert mock_ldap.mock_calls[2][1] == (FakeLdap.OPT_REFERRALS, 0) + + assert mock_ldap.mock_calls[3][0] == '().sasl_gssapi_bind_s' + assert mock_ldap.mock_calls[3][1] == () + + assert mock_ldap.mock_calls[4][0] == "().read_rootdse_s" + assert mock_ldap.mock_calls[4][1] == () + + assert mock_ldap.mock_calls[5][0] == "().unbind_s" + assert mock_ldap.mock_calls[5][1] == () + + +def test_run_with_simple_auth_and_search_base(monkeypatch, laps_password): + def get_laps_password(conn, cn, search_base): + return "CN=%s,%s" % (cn, search_base) + + mock_ldap = MagicMock() + monkeypatch.setattr("ldap.initialize", mock_ldap) + + mock_get_laps_password = MagicMock(side_effect=get_laps_password) + monkeypatch.setattr(laps_password, "get_laps_password", mock_get_laps_password) + + actual = lookup_loader.get('laps_password').run(["host1", "host2"], domain="dc01", auth="simple", username="user", + password="pass", allow_plaintext=True, + search_base="OU=Workstations,DC=domain,DC=com") + assert actual == ["CN=host1,OU=Workstations,DC=domain,DC=com", "CN=host2,OU=Workstations,DC=domain,DC=com"] + + # Verify the call count to get_laps_password + assert mock_get_laps_password.call_count == 2 + + # Verify the initialize() method call + assert mock_ldap.call_count == 1 + assert mock_ldap.call_args[0] == ("ldap://dc01:389",) + assert mock_ldap.call_args[1] == {"bytes_mode": False} + + # Verify the number of calls made to the mocked LDAP object + assert mock_ldap.mock_calls[1][0] == "().set_option" + assert mock_ldap.mock_calls[1][1] == (FakeLdap.OPT_PROTOCOL_VERSION, 3) + + assert mock_ldap.mock_calls[2][0] == "().set_option" + assert mock_ldap.mock_calls[2][1] == (FakeLdap.OPT_REFERRALS, 0) + + assert mock_ldap.mock_calls[3][0] == '().bind_s' + assert mock_ldap.mock_calls[3][1] == (u"user", u"pass") + + assert mock_ldap.mock_calls[4][0] == "().unbind_s" + assert mock_ldap.mock_calls[4][1] == () + + +@pytest.mark.parametrize("kwargs, expected", [ + [{"domain": "dc01"}, "ldap://dc01:389"], + [{"domain": "dc02", "port": 1234}, "ldap://dc02:1234"], + [{"domain": "dc03", "scheme": "ldaps"}, "ldaps://dc03:636"], + # Verifies that an explicit URI ignores port and scheme + [{"domain": "ldap://dc04", "port": 1234, "scheme": "ldaps"}, "ldap://dc04"], +]) +def test_uri_options(monkeypatch, kwargs, expected): + mock_ldap = MagicMock() + monkeypatch.setattr("ldap.initialize", mock_ldap) + + lookup_loader.get('laps_password').run([], **kwargs) + + assert mock_ldap.call_count == 1 + assert mock_ldap.call_args[0] == (expected,) + assert mock_ldap.call_args[1] == {"bytes_mode": False} + + +@pytest.mark.parametrize("validate, expected", [ + ["never", FakeLdap.OPT_X_TLS_NEVER], + ["allow", FakeLdap.OPT_X_TLS_ALLOW], + ["try", FakeLdap.OPT_X_TLS_TRY], + ["demand", FakeLdap.OPT_X_TLS_DEMAND], +]) +def test_certificate_validation(monkeypatch, validate, expected): + mock_ldap_option = MagicMock() + monkeypatch.setattr(FakeLdap, "set_option", mock_ldap_option) + + mock_ldap = MagicMock() + monkeypatch.setattr("ldap.initialize", mock_ldap) + + lookup_loader.get('laps_password').run([], domain="dc01", start_tls=True, validate_certs=validate) + + assert mock_ldap_option.mock_calls[0][1] == (FakeLdap.OPT_X_TLS_REQUIRE_CERT, expected) + + assert mock_ldap.mock_calls[3][0] == "().start_tls_s" + assert mock_ldap.mock_calls[3][1] == () + + assert mock_ldap.mock_calls[4][0] == "().sasl_gssapi_bind_s" + assert mock_ldap.mock_calls[4][1] == () + + +def test_certificate_validate_with_custom_cacert(monkeypatch): + mock_ldap_option = MagicMock() + monkeypatch.setattr(FakeLdap, "set_option", mock_ldap_option) + + mock_ldap = MagicMock() + monkeypatch.setattr("ldap.initialize", mock_ldap) + monkeypatch.setattr(os.path, 'exists', lambda x: True) + + lookup_loader.get('laps_password').run([], domain="dc01", scheme="ldaps", cacert_file="cacert.pem") + + assert mock_ldap_option.mock_calls[0][1] == (FakeLdap.OPT_X_TLS_REQUIRE_CERT, FakeLdap.OPT_X_TLS_DEMAND) + assert mock_ldap_option.mock_calls[1][1] == (FakeLdap.OPT_X_TLS_CACERTFILE, u"cacert.pem") + + assert mock_ldap.mock_calls[3][0] == "().sasl_gssapi_bind_s" + assert mock_ldap.mock_calls[3][1] == () + + +def test_certificate_validate_with_custom_cacert_fail(monkeypatch): + def set_option(self, key, value): + if key == FakeLdap.OPT_X_TLS_CACERTFILE: + raise ValueError("set_option() failed") + + monkeypatch.setattr(FakeLdap, "set_option", set_option) + monkeypatch.setattr(os.path, 'exists', lambda x: True) + + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('laps_password').run([], domain="dc01", scheme="ldaps", cacert_file="cacert.pem") + + assert str(err.value) == "Failed to set path to cacert file, this is a known issue with older OpenLDAP " \ + "libraries on the host. Update OpenLDAP and reinstall python-ldap to continue" + + +@pytest.mark.parametrize("path", [ + "cacert.pem", + "~/.certs/cacert.pem", + "~/.certs/$USER/cacert.pem", +]) +def test_certificate_invalid_path(monkeypatch, path): + monkeypatch.setattr(os.path, 'exists', lambda x: False) + expected_path = os.path.expanduser(os.path.expandvars(path)) + + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('laps_password').run([], domain="dc01", scheme="ldaps", cacert_file=path) + + assert str(err.value) == "The cacert_file specified '%s' does not exist" % expected_path + + +def test_simple_auth_with_ldaps(monkeypatch): + mock_ldap_option = MagicMock() + monkeypatch.setattr(FakeLdap, "set_option", mock_ldap_option) + + mock_ldap = MagicMock() + monkeypatch.setattr("ldap.initialize", mock_ldap) + + lookup_loader.get('laps_password').run([], domain="dc01", scheme="ldaps", auth="simple", username="user", + password="pass") + + assert mock_ldap_option.mock_calls[0][1] == (FakeLdap.OPT_X_TLS_REQUIRE_CERT, FakeLdap.OPT_X_TLS_DEMAND) + + assert mock_ldap.mock_calls[3][0] == '().bind_s' + assert mock_ldap.mock_calls[3][1] == (u"user", u"pass") + + assert mock_ldap.mock_calls[4][0] == "().read_rootdse_s" + assert mock_ldap.mock_calls[4][1] == () + + +def test_simple_auth_with_start_tls(monkeypatch): + mock_ldap_option = MagicMock() + monkeypatch.setattr(FakeLdap, "set_option", mock_ldap_option) + + mock_ldap = MagicMock() + monkeypatch.setattr("ldap.initialize", mock_ldap) + + lookup_loader.get('laps_password').run([], domain="dc01", start_tls=True, auth="simple", username="user", + password="pass") + + assert mock_ldap_option.mock_calls[0][1] == (FakeLdap.OPT_X_TLS_REQUIRE_CERT, FakeLdap.OPT_X_TLS_DEMAND) + + assert mock_ldap.mock_calls[3][0] == "().start_tls_s" + assert mock_ldap.mock_calls[3][1] == () + + assert mock_ldap.mock_calls[4][0] == '().bind_s' + assert mock_ldap.mock_calls[4][1] == (u"user", u"pass") + + assert mock_ldap.mock_calls[5][0] == "().read_rootdse_s" + assert mock_ldap.mock_calls[5][1] == () + + +def test_start_tls_ldap_error(monkeypatch): + mock_ldap = MagicMock() + mock_ldap.return_value.start_tls_s.side_effect = FakeLDAPError("fake error") + monkeypatch.setattr("ldap.initialize", mock_ldap) + + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('laps_password').run([], domain="dc01", start_tls=True) + + assert str(err.value) == "Failed to send StartTLS to LDAP host 'ldap://dc01:389': fake error" + + +def test_simple_bind_ldap_error(monkeypatch): + mock_ldap = MagicMock() + mock_ldap.return_value.bind_s.side_effect = FakeLDAPError("fake error") + monkeypatch.setattr("ldap.initialize", mock_ldap) + + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('laps_password').run([], domain="dc01", auth="simple", username="user", password="pass", + allow_plaintext=True) + + assert str(err.value) == "Failed to simple bind against LDAP host 'ldap://dc01:389': fake error" + + +def test_sasl_bind_ldap_error(monkeypatch): + mock_ldap = MagicMock() + mock_ldap.return_value.sasl_gssapi_bind_s.side_effect = FakeLDAPError("fake error") + monkeypatch.setattr("ldap.initialize", mock_ldap) + + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('laps_password').run([], domain="dc01") + + assert str(err.value) == "Failed to do a sasl bind against LDAP host 'ldap://dc01:389': fake error" + + +def test_sasl_bind_ldap_no_mechs_error(monkeypatch): + mock_ldap = MagicMock() + mock_ldap.return_value.sasl_gssapi_bind_s.side_effect = FakeLDAPAuthUnknownError("no mechs") + monkeypatch.setattr("ldap.initialize", mock_ldap) + + with pytest.raises(AnsibleLookupError) as err: + lookup_loader.get('laps_password').run([], domain="dc01") + + assert str(err.value) == "Failed to do a sasl bind against LDAP host 'ldap://dc01:389', the GSSAPI mech is " \ + "not installed: no mechs" + + +def test_get_password_valid(laps_password): + mock_conn = MagicMock() + mock_conn.search_s.return_value = [ + ("CN=server,DC=domain,DC=local", + {"ms-Mcs-AdmPwd": ["pass"], "distinguishedName": ["CN=server,DC=domain,DC=local"]}), + # Replicates the 3 extra entries AD returns that aren't server objects + (None, ["ldap://ForestDnsZones.domain.com/DC=ForestDnsZones,DC=domain,DC=com"]), + (None, ["ldap://DomainDnsZones.domain.com/DC=DomainDnsZones,DC=domain,DC=com"]), + (None, ["ldap://domain.com/CN=Configuration,DC=domain,DC=com"]), + ] + + actual = laps_password.get_laps_password(mock_conn, "server", "DC=domain,DC=local") + assert actual == "pass" + + assert len(mock_conn.method_calls) == 1 + assert mock_conn.method_calls[0][0] == "search_s" + assert mock_conn.method_calls[0][1] == ("DC=domain,DC=local", FakeLdap.SCOPE_SUBTREE, + "(&(objectClass=computer)(CN=server))") + assert mock_conn.method_calls[0][2] == {"attrlist": ["distinguishedName", "ms-Mcs-AdmPwd"]} + + +def test_get_password_laps_not_configured(laps_password): + mock_conn = MagicMock() + mock_conn.search_s.return_value = [ + ("CN=server,DC=domain,DC=local", {"distinguishedName": ["CN=server,DC=domain,DC=local"]}), + (None, ["ldap://ForestDnsZones.domain.com/DC=ForestDnsZones,DC=domain,DC=com"]), + (None, ["ldap://DomainDnsZones.domain.com/DC=DomainDnsZones,DC=domain,DC=com"]), + (None, ["ldap://domain.com/CN=Configuration,DC=domain,DC=com"]), + ] + + with pytest.raises(AnsibleLookupError) as err: + laps_password.get_laps_password(mock_conn, "server2", "DC=test,DC=local") + assert str(err.value) == \ + "The server 'CN=server,DC=domain,DC=local' did not have the LAPS attribute 'ms-Mcs-AdmPwd'" + + assert len(mock_conn.method_calls) == 1 + assert mock_conn.method_calls[0][0] == "search_s" + assert mock_conn.method_calls[0][1] == ("DC=test,DC=local", FakeLdap.SCOPE_SUBTREE, + "(&(objectClass=computer)(CN=server2))") + assert mock_conn.method_calls[0][2] == {"attrlist": ["distinguishedName", "ms-Mcs-AdmPwd"]} + + +def test_get_password_no_results(laps_password): + mock_conn = MagicMock() + mock_conn.search_s.return_value = [ + (None, ["ldap://ForestDnsZones.domain.com/DC=ForestDnsZones,DC=domain,DC=com"]), + (None, ["ldap://DomainDnsZones.domain.com/DC=DomainDnsZones,DC=domain,DC=com"]), + (None, ["ldap://domain.com/CN=Configuration,DC=domain,DC=com"]), + ] + + with pytest.raises(AnsibleLookupError) as err: + laps_password.get_laps_password(mock_conn, "server", "DC=domain,DC=local") + assert str(err.value) == "Failed to find the server 'server' in the base 'DC=domain,DC=local'" + + assert len(mock_conn.method_calls) == 1 + assert mock_conn.method_calls[0][0] == "search_s" + assert mock_conn.method_calls[0][1] == ("DC=domain,DC=local", FakeLdap.SCOPE_SUBTREE, + "(&(objectClass=computer)(CN=server))") + assert mock_conn.method_calls[0][2] == {"attrlist": ["distinguishedName", "ms-Mcs-AdmPwd"]} + + +def test_get_password_multiple_results(laps_password): + mock_conn = MagicMock() + mock_conn.search_s.return_value = [ + ("CN=server,OU=Workstations,DC=domain,DC=local", + {"ms-Mcs-AdmPwd": ["pass"], "distinguishedName": ["CN=server,OU=Workstations,DC=domain,DC=local"]}), + ("CN=server,OU=Servers,DC=domain,DC=local", + {"ms-Mcs-AdmPwd": ["pass"], "distinguishedName": ["CN=server,OU=Servers,DC=domain,DC=local"]}), + (None, ["ldap://ForestDnsZones.domain.com/DC=ForestDnsZones,DC=domain,DC=com"]), + (None, ["ldap://DomainDnsZones.domain.com/DC=DomainDnsZones,DC=domain,DC=com"]), + (None, ["ldap://domain.com/CN=Configuration,DC=domain,DC=com"]), + ] + + with pytest.raises(AnsibleLookupError) as err: + laps_password.get_laps_password(mock_conn, "server", "DC=domain,DC=local") + assert str(err.value) == \ + "Found too many results for the server 'server' in the base 'DC=domain,DC=local'. Specify a more explicit " \ + "search base for the server required. Found servers 'CN=server,OU=Workstations,DC=domain,DC=local', " \ + "'CN=server,OU=Servers,DC=domain,DC=local'" + + assert len(mock_conn.method_calls) == 1 + assert mock_conn.method_calls[0][0] == "search_s" + assert mock_conn.method_calls[0][1] == ("DC=domain,DC=local", FakeLdap.SCOPE_SUBTREE, + "(&(objectClass=computer)(CN=server))") + assert mock_conn.method_calls[0][2] == {"attrlist": ["distinguishedName", "ms-Mcs-AdmPwd"]}