ansible/test/units/plugins/lookup/test_onepassword.py
Sam Doran 8cd8d17980
Add ability to unlock 1Password vault to lookup plugins (#44923)
* Add ability to use login to 1Password vault to 1Password lookups

* Adjust unit tests

* Add changelog
2018-08-30 16:24:06 -04:00

320 lines
11 KiB
Python

# (c) 2018, Scott Buchanan <sbuchanan@ri.pn>
# (c) 2016, Andrew Zenk <azenk@umn.edu> (test_lastpass.py used as starting point)
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import json
import datetime
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
from argparse import ArgumentParser
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import patch
from ansible.errors import AnsibleError
from ansible.plugins.lookup.onepassword import OnePass, LookupModule
from ansible.plugins.lookup.onepassword_raw import LookupModule as OnePasswordRawLookup
# Intentionally excludes metadata leaf nodes that would exist in real output if not relevant.
MOCK_ENTRIES = [
{
'vault_name': 'Acme "Quot\'d" Servers',
'queries': [
'0123456789',
'Mock "Quot\'d" Server'
],
'output': {
'uuid': '0123456789',
'vaultUuid': '2468',
'overview': {
'title': 'Mock "Quot\'d" Server'
},
'details': {
'sections': [{
'title': '',
'fields': [
{'t': 'username', 'v': 'jamesbond'},
{'t': 'password', 'v': 't0pS3cret'},
{'t': 'notes', 'v': 'Test note with\nmultiple lines and trailing space.\n\n'},
{'t': 'tricksy "quot\'d" field\\', 'v': '"quot\'d" value'}
]
}]
}
}
},
{
'vault_name': 'Acme Logins',
'queries': [
'9876543210',
'Mock Website',
'acme.com'
],
'output': {
'uuid': '9876543210',
'vaultUuid': '1357',
'overview': {
'title': 'Mock Website',
'URLs': [
{'l': 'website', 'u': 'https://acme.com/login'}
]
},
'details': {
'sections': [{
'title': '',
'fields': [
{'t': 'password', 'v': 't0pS3cret'}
]
}]
}
}
},
{
'vault_name': 'Acme Logins',
'queries': [
'864201357'
],
'output': {
'uuid': '864201357',
'vaultUuid': '1357',
'overview': {
'title': 'Mock Something'
},
'details': {
'fields': [
{
'value': 'jbond@mi6.gov.uk',
'name': 'emailAddress'
},
{
'name': 'password',
'value': 'vauxhall'
}
]
}
}
},
]
def get_mock_query_generator(require_field=None):
def _process_field(field, section_title=None):
field_name = field.get('name', field.get('t'))
field_value = field.get('value', field.get('v'))
if require_field is None or field_name == require_field:
return entry, query, section_title, field_name, field_value
for entry in MOCK_ENTRIES:
for query in entry['queries']:
for field in entry['output']['details'].get('fields', []):
fixture = _process_field(field)
if fixture:
yield fixture
for section in entry['output']['details'].get('sections', []):
for field in section['fields']:
fixture = _process_field(field, section['title'])
if fixture:
yield fixture
def get_one_mock_query(require_field=None):
generator = get_mock_query_generator(require_field)
return next(generator)
class MockOnePass(OnePass):
_mock_logged_out = False
_mock_timed_out = False
def _lookup_mock_entry(self, key, vault=None):
for entry in MOCK_ENTRIES:
if vault is not None and vault.lower() != entry['vault_name'].lower() and vault.lower() != entry['output']['vaultUuid'].lower():
continue
match_fields = [
entry['output']['uuid'],
entry['output']['overview']['title']
]
# Note that exactly how 1Password matches on domains in non-trivial cases is neither documented
# nor obvious, so this may not precisely match the real behavior.
urls = entry['output']['overview'].get('URLs')
if urls is not None:
match_fields += [urlparse(url['u']).netloc for url in urls]
if key in match_fields:
return entry['output']
def _run(self, args, expected_rc=0, command_input=None, ignore_errors=False):
parser = ArgumentParser()
command_parser = parser.add_subparsers(dest='command')
get_parser = command_parser.add_parser('get')
get_options = ArgumentParser(add_help=False)
get_options.add_argument('--vault')
get_type_parser = get_parser.add_subparsers(dest='object_type')
get_type_parser.add_parser('account', parents=[get_options])
get_item_parser = get_type_parser.add_parser('item', parents=[get_options])
get_item_parser.add_argument('item_id')
args = parser.parse_args(args)
def mock_exit(output='', error='', rc=0):
if rc != expected_rc:
raise AnsibleError(error)
if error != '':
now = datetime.date.today()
error = '[LOG] {0} (ERROR) {1}'.format(now.strftime('%Y/%m/%d %H:$M:$S'), error)
return rc, output, error
if args.command == 'get':
if self._mock_logged_out:
return mock_exit(error='You are not currently signed in. Please run `op signin --help` for instructions', rc=1)
if self._mock_timed_out:
return mock_exit(error='401: Authentication required.', rc=1)
if args.object_type == 'item':
mock_entry = self._lookup_mock_entry(args.item_id, args.vault)
if mock_entry is None:
return mock_exit(error='Item {0} not found'.format(args.item_id))
return mock_exit(output=json.dumps(mock_entry))
if args.object_type == 'account':
# Since we don't actually ever use this output, don't bother mocking output.
return mock_exit()
raise AnsibleError('Unsupported command string passed to OnePass mock: {0}'.format(args))
class LoggedOutMockOnePass(MockOnePass):
_mock_logged_out = True
class TimedOutMockOnePass(MockOnePass):
_mock_timed_out = True
class TestOnePass(unittest.TestCase):
def test_onepassword_cli_path(self):
op = MockOnePass(path='/dev/null')
self.assertEqual('/dev/null', op.cli_path)
def test_onepassword_logged_in(self):
op = MockOnePass()
try:
op.assert_logged_in()
except:
self.fail()
def test_onepassword_logged_out(self):
op = LoggedOutMockOnePass()
with self.assertRaises(AnsibleError):
op.assert_logged_in()
def test_onepassword_timed_out(self):
op = TimedOutMockOnePass()
with self.assertRaises(AnsibleError):
op.assert_logged_in()
def test_onepassword_get(self):
op = MockOnePass()
op._logged_in = True
query_generator = get_mock_query_generator()
for dummy, query, dummy, field_name, field_value in query_generator:
self.assertEqual(field_value, op.get_field(query, field_name))
def test_onepassword_get_raw(self):
op = MockOnePass()
op._logged_in = True
for entry in MOCK_ENTRIES:
for query in entry['queries']:
self.assertEqual(json.dumps(entry['output']), op.get_raw(query))
def test_onepassword_get_not_found(self):
op = MockOnePass()
op._logged_in = True
self.assertEqual('', op.get_field('a fake query', 'a fake field'))
def test_onepassword_get_with_section(self):
op = MockOnePass()
op._logged_in = True
dummy, query, section_title, field_name, field_value = get_one_mock_query()
self.assertEqual(field_value, op.get_field(query, field_name, section=section_title))
def test_onepassword_get_with_vault(self):
op = MockOnePass()
op._logged_in = True
entry, query, dummy, field_name, field_value = get_one_mock_query()
for vault_query in [entry['vault_name'], entry['output']['vaultUuid']]:
self.assertEqual(field_value, op.get_field(query, field_name, vault=vault_query))
def test_onepassword_get_with_wrong_vault(self):
op = MockOnePass()
op._logged_in = True
dummy, query, dummy, field_name, dummy = get_one_mock_query()
self.assertEqual('', op.get_field(query, field_name, vault='a fake vault'))
def test_onepassword_get_diff_case(self):
op = MockOnePass()
op._logged_in = True
entry, query, section_title, field_name, field_value = get_one_mock_query()
self.assertEqual(
field_value,
op.get_field(
query,
field_name.upper(),
vault=entry['vault_name'].upper(),
section=section_title.upper()
)
)
@patch('ansible.plugins.lookup.onepassword.OnePass', MockOnePass)
class TestLookupModule(unittest.TestCase):
def test_onepassword_plugin_multiple(self):
lookup_plugin = LookupModule()
entry = MOCK_ENTRIES[0]
field = entry['output']['details']['sections'][0]['fields'][0]
self.assertEqual(
[field['v']] * len(entry['queries']),
lookup_plugin.run(entry['queries'], field=field['t'])
)
def test_onepassword_plugin_default_field(self):
lookup_plugin = LookupModule()
dummy, query, dummy, dummy, field_value = get_one_mock_query('password')
self.assertEqual([field_value], lookup_plugin.run([query]))
@patch('ansible.plugins.lookup.onepassword_raw.OnePass', MockOnePass)
class TestOnePasswordRawLookup(unittest.TestCase):
def test_onepassword_raw_plugin_multiple(self):
raw_lookup_plugin = OnePasswordRawLookup()
entry = MOCK_ENTRIES[0]
raw_value = entry['output']
self.assertEqual(
[raw_value] * len(entry['queries']),
raw_lookup_plugin.run(entry['queries'])
)