luks_device.py: Allow manipulate LUKS containers with label or UUID (#61603)

* luks_device.py: Allow manipulate LUKS containers with label or UUID

- Allow create a LUKS2 container format with label support
- Allow manipulate (open, close, modify) an LUKS container based on
  both label (LUKS2 format) or UUID instead of using devices only.

Fixes: #58973
Signed-off-by: Alexandre Mulatinho <alex@mulatinho.net>

* test_luks_device.py: organizing tests to support labels

- Add label on some tests and fix errors reported by Shippable

Signed-off-by: Alexandre Mulatinho <alex@mulatinho.net>

* luks_device.py: adjusting versions and messages

- Modifying version_added from 2.9 to 2.10
- Fixing some messages
- Created a changelog fragment
- Moving blkid from scope

Fixes #58973
Signed-off-by: Alexandre Mulatinho <alex@mulatinho.net>
This commit is contained in:
Alexandre Mulatinho 2019-09-03 00:40:09 -03:00 committed by Felix Fontein
parent 6f7cd8370a
commit e4d72dd981
3 changed files with 196 additions and 76 deletions

View file

@ -0,0 +1,2 @@
minor_changes:
- luks_device - added support to use UUIDs, and labels with LUKS2 containers

View file

@ -100,11 +100,28 @@ options:
the container can no longer be opened!"
type: bool
default: no
label:
description:
- "This option allow the user to create a LUKS2 format container
with label support, respectively to identify the container by
label on later usages."
- "Will only be used on container creation, or when I(device) is
not specified."
type: str
version_added: "2.10"
uuid:
description:
- "With this option user can identify the LUKS container by UUID."
- "Will only be used when I(device) and I(label) are not specified."
type: str
version_added: "2.10"
requirements:
- "cryptsetup"
- "wipefs"
- "wipefs (when I(state) is C(absent))"
- "lsblk"
- "blkid (when I(label) or I(uuid) options are used)"
author:
"Jan Pokorny (@japokorn)"
@ -158,6 +175,26 @@ EXAMPLES = '''
luks_device:
device: "/dev/loop0"
state: "absent"
- name: create a container with label
luks_device:
device: "/dev/loop0"
state: "present"
keyfile: "/vault/keyfile"
label: personalLabelName
- name: open the LUKS container based on label without device; name it "mycrypt"
luks_device:
label: "personalLabelName"
state: "opened"
name: "mycrypt"
keyfile: "/vault/keyfile"
- name: close container based on UUID
luks_device:
uuid: 03ecd578-fad4-4e6c-9348-842e3e8fa340
state: "closed"
name: "mycrypt"
'''
RETURN = '''
@ -197,6 +234,30 @@ class Handler(object):
def _run_command(self, command):
return self._module.run_command(command)
def get_device_by_uuid(self, uuid):
''' Returns the device that holds UUID passed by user
'''
self._blkid_bin = self._module.get_bin_path('blkid', True)
uuid = self._module.params['uuid']
if uuid is None:
return None
result = self._run_command([self._blkid_bin, '--uuid', uuid])
if result[RETURN_CODE] != 0:
return None
return result[STDOUT].strip()
def get_device_by_label(self, label):
''' Returns the device that holds label passed by user
'''
self._blkid_bin = self._module.get_bin_path('blkid', True)
label = self._module.params['label']
if label is None:
return None
result = self._run_command([self._blkid_bin, '--label', label])
if result[RETURN_CODE] != 0:
return None
return result[STDOUT].strip()
def generate_luks_name(self, device):
''' Generate name for luks based on device UUID ('luks-<UUID>').
Raises ValueError when obtaining of UUID fails.
@ -256,9 +317,13 @@ class CryptHandler(Handler):
def run_luks_create(self, device, keyfile, keysize):
# create a new luks container; use batch mode to auto confirm
label = self._module.params.get('label')
options = []
if keysize is not None:
options.append('--key-size=' + str(keysize))
if label is not None:
# create luks container v2 with label
options.extend(['--type', 'luks2', '--label', label])
args = [self._cryptsetup_bin, 'luksFormat']
args.extend(options)
args.extend(['-q', device, keyfile])
@ -346,14 +411,30 @@ class ConditionsHandler(Handler):
def __init__(self, module, crypthandler):
super(ConditionsHandler, self).__init__(module)
self._crypthandler = crypthandler
self.device = self.get_device_name()
def get_device_name(self):
device = self._module.params.get('device')
label = self._module.params.get('label')
uuid = self._module.params.get('uuid')
name = self._module.params.get('name')
if device is None and label is not None:
device = self.get_device_by_label(label)
elif device is None and uuid is not None:
device = self.get_device_by_uuid(uuid)
elif device is None and name is not None:
device = self._crypthandler.get_container_device_by_name(name)
return device
def luks_create(self):
return (self._module.params['device'] is not None and
return (self.device is not None and
self._module.params['keyfile'] is not None and
self._module.params['state'] in ('present',
'opened',
'closed') and
not self._crypthandler.is_luks(self._module.params['device']))
not self._crypthandler.is_luks(self.device))
def opened_luks_name(self):
''' If luks is already opened, return its name.
@ -365,8 +446,7 @@ class ConditionsHandler(Handler):
return None
# try to obtain luks name - it may be already opened
name = self._crypthandler.get_container_name_by_device(
self._module.params['device'])
name = self._crypthandler.get_container_name_by_device(self.device)
if name is None:
# container is not open
@ -386,8 +466,8 @@ class ConditionsHandler(Handler):
return name
def luks_open(self):
if (self._module.params['device'] is None or
self._module.params['keyfile'] is None or
if (self._module.params['keyfile'] is None or
self.device is None or
self._module.params['state'] != 'opened'):
# conditions for open not fulfilled
return False
@ -399,28 +479,26 @@ class ConditionsHandler(Handler):
return False
def luks_close(self):
if ((self._module.params['name'] is None and
self._module.params['device'] is None) or
if ((self._module.params['name'] is None and self.device is None) or
self._module.params['state'] != 'closed'):
# conditions for close not fulfilled
return False
if self._module.params['device'] is not None:
name = self._crypthandler.get_container_name_by_device(
self._module.params['device'])
if self.device is not None:
name = self._crypthandler.get_container_name_by_device(self.device)
# successfully getting name based on device means that luks is open
luks_is_open = name is not None
if self._module.params['name'] is not None:
device = self._crypthandler.get_container_device_by_name(
self.device = self._crypthandler.get_container_device_by_name(
self._module.params['name'])
# successfully getting device based on name means that luks is open
luks_is_open = device is not None
luks_is_open = self.device is not None
return luks_is_open
def luks_add_key(self):
if (self._module.params['device'] is None or
if (self.device is None or
self._module.params['keyfile'] is None or
self._module.params['new_keyfile'] is None):
# conditions for adding a key not fulfilled
@ -433,7 +511,7 @@ class ConditionsHandler(Handler):
return True
def luks_remove_key(self):
if (self._module.params['device'] is None or
if (self.device is None or
self._module.params['remove_keyfile'] is None):
# conditions for removing a key not fulfilled
return False
@ -445,9 +523,9 @@ class ConditionsHandler(Handler):
return True
def luks_remove(self):
return (self._module.params['device'] is not None and
return (self.device is not None and
self._module.params['state'] == 'absent' and
self._crypthandler.is_luks(self._module.params['device']))
self._crypthandler.is_luks(self.device))
def run_module():
@ -460,7 +538,9 @@ def run_module():
new_keyfile=dict(type='path'),
remove_keyfile=dict(type='path'),
force_remove_last_key=dict(type='bool', default=False),
keysize=dict(type='int')
keysize=dict(type='int'),
label=dict(type='str'),
uuid=dict(type='str'),
)
# seed the result dict in the object
@ -491,7 +571,7 @@ def run_module():
if conditions.luks_create():
if not module.check_mode:
try:
crypt.run_luks_create(module.params['device'],
crypt.run_luks_create(conditions.device,
module.params['keyfile'],
module.params['keysize'])
except ValueError as e:
@ -510,12 +590,12 @@ def run_module():
name = module.params['name']
if name is None:
try:
name = crypt.generate_luks_name(module.params['device'])
name = crypt.generate_luks_name(conditions.device)
except ValueError as e:
module.fail_json(msg="luks_device error: %s" % e)
if not module.check_mode:
try:
crypt.run_luks_open(module.params['device'],
crypt.run_luks_open(conditions.device,
module.params['keyfile'],
name)
except ValueError as e:
@ -527,10 +607,10 @@ def run_module():
# luks close
if conditions.luks_close():
if module.params['device'] is not None:
if conditions.device is not None:
try:
name = crypt.get_container_name_by_device(
module.params['device'])
conditions.device)
except ValueError as e:
module.fail_json(msg="luks_device error: %s" % e)
else:
@ -540,6 +620,7 @@ def run_module():
crypt.run_luks_close(name)
except ValueError as e:
module.fail_json(msg="luks_device error: %s" % e)
result['name'] = name
result['changed'] = True
if module.check_mode:
module.exit_json(**result)
@ -548,7 +629,7 @@ def run_module():
if conditions.luks_add_key():
if not module.check_mode:
try:
crypt.run_luks_add_key(module.params['device'],
crypt.run_luks_add_key(conditions.device,
module.params['keyfile'],
module.params['new_keyfile'])
except ValueError as e:
@ -561,9 +642,10 @@ def run_module():
if conditions.luks_remove_key():
if not module.check_mode:
try:
crypt.run_luks_remove_key(module.params['device'],
last_key = module.params['force_remove_last_key']
crypt.run_luks_remove_key(conditions.device,
module.params['remove_keyfile'],
force_remove_last_key=module.params['force_remove_last_key'])
force_remove_last_key=last_key)
except ValueError as e:
module.fail_json(msg="luks_device error: %s" % e)
result['changed'] = True
@ -574,7 +656,7 @@ def run_module():
if conditions.luks_remove():
if not module.check_mode:
try:
crypt.run_luks_remove(module.params['device'])
crypt.run_luks_remove(conditions.device)
except ValueError as e:
module.fail_json(msg="luks_device error: %s" % e)
result['changed'] = True

View file

@ -3,6 +3,8 @@ __metaclass__ = type
import pytest
from ansible.modules.crypto import luks_device
from units.compat.mock import patch
from ansible.module_utils import basic
class DummyModule(object):
@ -62,15 +64,16 @@ def test_run_luks_remove(monkeypatch):
# ===== ConditionsHandler methods data and tests =====
# device, key, state, is_luks, expected
# device, key, state, is_luks, label, expected
LUKS_CREATE_DATA = (
("dummy", "key", "present", False, True),
(None, "key", "present", False, False),
("dummy", None, "present", False, False),
("dummy", "key", "absent", False, False),
("dummy", "key", "opened", True, False),
("dummy", "key", "closed", True, False),
("dummy", "key", "present", True, False))
("dummy", "key", "present", False, None, True),
(None, "key", "present", False, None, False),
(None, "key", "present", False, "labelName", True),
("dummy", None, "present", False, None, False),
("dummy", "key", "absent", False, None, False),
("dummy", "key", "opened", True, None, False),
("dummy", "key", "closed", True, None, False),
("dummy", "key", "present", True, None, False))
# device, state, is_luks, expected
LUKS_REMOVE_DATA = (
@ -90,47 +93,57 @@ LUKS_OPEN_DATA = (
("dummy", "key", "opened", "name", "name", False),
("dummy", "key", "opened", "beer", "name", "exception"))
# device, dev_by_name, name, name_by_dev, state, expected
# device, dev_by_name, name, name_by_dev, state, label, expected
LUKS_CLOSE_DATA = (
("dummy", "dummy", "name", "name", "present", False),
("dummy", "dummy", "name", "name", "absent", False),
("dummy", "dummy", "name", "name", "opened", False),
("dummy", "dummy", "name", "name", "closed", True),
(None, "dummy", "name", "name", "closed", True),
("dummy", "dummy", None, "name", "closed", True),
(None, "dummy", None, "name", "closed", False))
("dummy", "dummy", "name", "name", "present", None, False),
("dummy", "dummy", "name", "name", "absent", None, False),
("dummy", "dummy", "name", "name", "opened", None, False),
("dummy", "dummy", "name", "name", "closed", None, True),
(None, "dummy", "name", "name", "closed", None, True),
("dummy", "dummy", None, "name", "closed", None, True),
(None, "dummy", None, "name", "closed", None, False))
# device, key, new_key, state, expected
# device, key, new_key, state, label, expected
LUKS_ADD_KEY_DATA = (
("dummy", "key", "new_key", "present", True),
(None, "key", "new_key", "present", False),
("dummy", None, "new_key", "present", False),
("dummy", "key", None, "present", False),
("dummy", "key", "new_key", "absent", "exception"))
("dummy", "key", "new_key", "present", None, True),
(None, "key", "new_key", "present", "labelName", True),
(None, "key", "new_key", "present", None, False),
("dummy", None, "new_key", "present", None, False),
("dummy", "key", None, "present", None, False),
("dummy", "key", "new_key", "absent", None, "exception"))
# device, remove_key, state, expected
# device, remove_key, state, label, expected
LUKS_REMOVE_KEY_DATA = (
("dummy", "key", "present", True),
(None, "key", "present", False),
("dummy", None, "present", False),
("dummy", "key", "absent", "exception"))
("dummy", "key", "present", None, True),
(None, "key", "present", None, False),
(None, "key", "present", "labelName", True),
("dummy", None, "present", None, False),
("dummy", "key", "absent", None, "exception"))
@pytest.mark.parametrize("device, keyfile, state, is_luks, expected",
((d[0], d[1], d[2], d[3], d[4])
@pytest.mark.parametrize("device, keyfile, state, is_luks, label, expected",
((d[0], d[1], d[2], d[3], d[4], d[5])
for d in LUKS_CREATE_DATA))
def test_luks_create(device, keyfile, state, is_luks, expected, monkeypatch):
def test_luks_create(device, keyfile, state, is_luks, label, expected,
monkeypatch):
module = DummyModule()
module.params["device"] = device
module.params["keyfile"] = keyfile
module.params["state"] = state
module.params["label"] = label
monkeypatch.setattr(luks_device.CryptHandler, "is_luks",
lambda x, y: is_luks)
crypt = luks_device.CryptHandler(module)
conditions = luks_device.ConditionsHandler(module, crypt)
assert conditions.luks_create() == expected
if device is None:
monkeypatch.setattr(luks_device.Handler, "get_device_by_label",
lambda x, y: [0, "/dev/dummy", ""])
try:
conditions = luks_device.ConditionsHandler(module, crypt)
assert conditions.luks_create() == expected
except ValueError:
assert expected == "exception"
@pytest.mark.parametrize("device, state, is_luks, expected",
@ -145,8 +158,11 @@ def test_luks_remove(device, state, is_luks, expected, monkeypatch):
monkeypatch.setattr(luks_device.CryptHandler, "is_luks",
lambda x, y: is_luks)
crypt = luks_device.CryptHandler(module)
conditions = luks_device.ConditionsHandler(module, crypt)
assert conditions.luks_remove() == expected
try:
conditions = luks_device.ConditionsHandler(module, crypt)
assert conditions.luks_remove() == expected
except ValueError:
assert expected == "exception"
@pytest.mark.parametrize("device, keyfile, state, name, "
@ -164,24 +180,30 @@ def test_luks_open(device, keyfile, state, name, name_by_dev,
monkeypatch.setattr(luks_device.CryptHandler,
"get_container_name_by_device",
lambda x, y: name_by_dev)
monkeypatch.setattr(luks_device.CryptHandler,
"get_container_device_by_name",
lambda x, y: device)
monkeypatch.setattr(luks_device.Handler, "_run_command",
lambda x, y: [0, device, ""])
crypt = luks_device.CryptHandler(module)
conditions = luks_device.ConditionsHandler(module, crypt)
try:
conditions = luks_device.ConditionsHandler(module, crypt)
assert conditions.luks_open() == expected
except ValueError:
assert expected == "exception"
@pytest.mark.parametrize("device, dev_by_name, name, name_by_dev, "
"state, expected",
((d[0], d[1], d[2], d[3], d[4], d[5])
"state, label, expected",
((d[0], d[1], d[2], d[3], d[4], d[5], d[6])
for d in LUKS_CLOSE_DATA))
def test_luks_close(device, dev_by_name, name, name_by_dev, state,
expected, monkeypatch):
label, expected, monkeypatch):
module = DummyModule()
module.params["device"] = device
module.params["name"] = name
module.params["state"] = state
module.params["label"] = label
monkeypatch.setattr(luks_device.CryptHandler,
"get_container_name_by_device",
@ -190,39 +212,53 @@ def test_luks_close(device, dev_by_name, name, name_by_dev, state,
"get_container_device_by_name",
lambda x, y: dev_by_name)
crypt = luks_device.CryptHandler(module)
conditions = luks_device.ConditionsHandler(module, crypt)
assert conditions.luks_close() == expected
try:
conditions = luks_device.ConditionsHandler(module, crypt)
assert conditions.luks_close() == expected
except ValueError:
assert expected == "exception"
@pytest.mark.parametrize("device, keyfile, new_keyfile, state, expected",
((d[0], d[1], d[2], d[3], d[4])
@pytest.mark.parametrize("device, keyfile, new_keyfile, state, label, expected",
((d[0], d[1], d[2], d[3], d[4], d[5])
for d in LUKS_ADD_KEY_DATA))
def test_luks_add_key(device, keyfile, new_keyfile, state, expected, monkeypatch):
def test_luks_add_key(device, keyfile, new_keyfile, state, label, expected, monkeypatch):
module = DummyModule()
module.params["device"] = device
module.params["keyfile"] = keyfile
module.params["new_keyfile"] = new_keyfile
module.params["state"] = state
module.params["label"] = label
monkeypatch.setattr(luks_device.Handler, "get_device_by_label",
lambda x, y: [0, "/dev/dummy", ""])
conditions = luks_device.ConditionsHandler(module, module)
try:
conditions = luks_device.ConditionsHandler(module, module)
assert conditions.luks_add_key() == expected
except ValueError:
assert expected == "exception"
@pytest.mark.parametrize("device, remove_keyfile, state, expected",
((d[0], d[1], d[2], d[3])
@pytest.mark.parametrize("device, remove_keyfile, state, label, expected",
((d[0], d[1], d[2], d[3], d[4])
for d in LUKS_REMOVE_KEY_DATA))
def test_luks_remove_key(device, remove_keyfile, state, expected, monkeypatch):
def test_luks_remove_key(device, remove_keyfile, state, label, expected, monkeypatch):
module = DummyModule()
module.params["device"] = device
module.params["remove_keyfile"] = remove_keyfile
module.params["state"] = state
module.params["label"] = label
conditions = luks_device.ConditionsHandler(module, module)
monkeypatch.setattr(luks_device.Handler, "get_device_by_label",
lambda x, y: [0, "/dev/dummy", ""])
monkeypatch.setattr(luks_device.Handler, "_run_command",
lambda x, y: [0, device, ""])
crypt = luks_device.CryptHandler(module)
try:
conditions = luks_device.ConditionsHandler(module, crypt)
assert conditions.luks_remove_key() == expected
except ValueError:
assert expected == "exception"