Pamd Updates (#25817)

* Fix for #25522
Fix for #25855
Fix for #23963

* Minor fix

* Updates per bcoca.  Fix in regex for silvinux.
This commit is contained in:
Ken Evensen 2017-07-10 11:41:01 -04:00 committed by Brian Coca
parent 07498ebed3
commit f31d3ddeb7
2 changed files with 395 additions and 91 deletions

View file

@ -81,6 +81,7 @@ options:
- after - after
- args_present - args_present
- args_absent - args_absent
- absent
description: description:
- The default of 'updated' will modify an existing rule if type, - The default of 'updated' will modify an existing rule if type,
control and module_path all match an existing rule. With 'before', control and module_path all match an existing rule. With 'before',
@ -89,7 +90,9 @@ options:
after an existing rule matching type, control and module_path. With after an existing rule matching type, control and module_path. With
either 'before' or 'after' new_type, new_control, and new_module_path either 'before' or 'after' new_type, new_control, and new_module_path
must all be specified. If state is 'args_absent' or 'args_present', must all be specified. If state is 'args_absent' or 'args_present',
new_type, new_control, and new_module_path will be ignored. new_type, new_control, and new_module_path will be ignored. State
'absent' will remove the rule. The 'absent' state was added in version
2.4 and is only available in Ansible versions >= 2.4.
path: path:
default: /etc/pam.d/ default: /etc/pam.d/
description: description:
@ -124,7 +127,8 @@ EXAMPLES = """
new_module_path: pam_faillock.so new_module_path: pam_faillock.so
state: before state: before
- name: Insert a new rule pam_wheel.so with argument 'use_uid' after an existing rule pam_rootok.so - name: Insert a new rule pam_wheel.so with argument 'use_uid' after an \
existing rule pam_rootok.so
pamd: pamd:
name: su name: su
type: auth type: auth
@ -186,8 +190,39 @@ EXAMPLES = """
""" """
RETURN = ''' RETURN = '''
change_count:
description: How many rules were changed
type: int
sample: 1
returned: success
version_added: 2.4
new_rule:
description: The changes to the rule
type: string
sample: None None None sha512 shadow try_first_pass use_authtok
returned: success
version_added: 2.4
updated_rule_(n):
description: The rule(s) that was/were changed
type: string
sample:
- password sufficient pam_unix.so sha512 shadow try_first_pass
use_authtok
returned: success
version_added: 2.4
action:
description:
- "That action that was taken and is one of: update_rule,
insert_before_rule, insert_after_rule, args_present, args_absent,
absent."
returned: always
type: string
sample: "update_rule"
version_added: 2.4
dest: dest:
description: path to pam.d service that was changed description:
- "Path to pam.d service that was changed. This is only available in
Ansible version 2.3 and was removed in 2.4."
returned: success returned: success
type: string type: string
sample: "/etc/pam.d/system-auth" sample: "/etc/pam.d/system-auth"
@ -196,6 +231,9 @@ dest:
from ansible.module_utils.basic import AnsibleModule from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.pycompat24 import get_exception from ansible.module_utils.pycompat24 import get_exception
import os
import re
import time
# The PamdRule class encapsulates a rule in a pam.d service # The PamdRule class encapsulates a rule in a pam.d service
@ -221,21 +259,40 @@ class PamdRule(object):
@classmethod @classmethod
def rulefromstring(cls, stringline): def rulefromstring(cls, stringline):
split_line = stringline.split() pattern = None
rule_type = split_line[0] rule_type = ''
rule_control = split_line[1] rule_control = ''
rule_module_path = ''
rule_module_args = ''
complicated = False
if rule_control.startswith('['): if '[' in stringline:
rule_control = stringline[stringline.index('['): pattern = re.compile(
stringline.index(']') + 1] r"""([\-A-Za-z0-9_]+)\s* # Rule Type
\[([A-Za-z0-9_=\s]+)\]\s* # Rule Control
if "]" in split_line[2]: ([A-Za-z0-9_\.]+)\s* # Rule Path
rule_module_path = split_line[3] ([A-Za-z0-9_=<>\-\s]*)""", # Rule Args
rule_module_args = split_line[4:] re.X)
complicated = True
else: else:
rule_module_path = split_line[2] pattern = re.compile(
rule_module_args = split_line[3:] r"""([\-A-Za-z0-9_]+)\s* # Rule Type
([A-Za-z0-9_]+)\s* # Rule Control
([A-Za-z0-9_\.]+)\s* # Rule Path
([A-Za-z0-9_=<>\-\s]*)""", # Rule Args
re.X)
result = pattern.match(stringline)
rule_type = result.group(1)
if complicated:
rule_control = '[' + result.group(2) + ']'
else:
rule_control = result.group(2)
rule_module_path = result.group(3)
if result.group(4) is not None:
rule_module_args = result.group(4)
return cls(rule_type, rule_control, rule_module_path, rule_module_args) return cls(rule_type, rule_control, rule_module_path, rule_module_args)
@ -257,28 +314,80 @@ class PamdRule(object):
# PamdService encapsulates an entire service and contains one or more rules # PamdService encapsulates an entire service and contains one or more rules
class PamdService(object): class PamdService(object):
def __init__(self, path, name, ansible): def __init__(self, ansible=None):
self.path = path
self.name = name if ansible is not None:
self.check = ansible.check_mode self.check = ansible.check_mode
self.check = False
self.ansible = ansible self.ansible = ansible
self.fname = self.path + "/" + self.name
self.preamble = [] self.preamble = []
self.rules = [] self.rules = []
self.fname = None
if ansible is not None:
self.path = self.ansible.params["path"]
self.name = self.ansible.params["name"]
def load_rules_from_file(self):
self.fname = self.path + "/" + self.name
stringline = ''
try: try:
for line in open(self.fname, 'r'): for line in open(self.fname, 'r'):
if line.startswith('#') and not line.isspace(): stringline += line.rstrip()
self.preamble.append(line.rstrip()) stringline += '\n'
elif not line.startswith('#') and not line.isspace(): self.load_rules_from_string(stringline)
self.rules.append(PamdRule.rulefromstring
(stringline=line.rstrip())) except IOError:
except Exception:
e = get_exception() e = get_exception()
self.ansible.fail_json(msg='Unable to open/read PAM module file ' + self.ansible.fail_json(msg='Unable to open/read PAM module \
'%s with error %s' % (self.fname, str(e))) file %s with error %s. And line %s' %
(self.fname, str(e), stringline))
def load_rules_from_string(self, stringvalue):
for line in stringvalue.splitlines():
stringline = line.rstrip()
if line.startswith('#') and not line.isspace():
self.preamble.append(line.rstrip())
elif (not line.startswith('#') and
not line.isspace() and
len(line) != 0):
self.rules.append(PamdRule.rulefromstring(stringline))
def write(self):
if self.fname is None:
self.fname = self.path + "/" + self.name
# If the file is a symbollic link, we'll write to the source.
pamd_file = os.path.realpath(self.fname)
temp_file = "/tmp/" + self.name + "_" + time.strftime("%y%m%d%H%M%S")
try:
f = open(temp_file, 'w')
f.write(str(self))
f.close()
except IOError:
self.ansible.fail_json(msg='Unable to create temporary \
file %s' % self.temp_file)
self.ansible.atomic_move(temp_file, pamd_file)
def __str__(self): def __str__(self):
return self.fname stringvalue = ''
previous_rule = None
for amble in self.preamble:
stringvalue += amble
stringvalue += '\n'
for rule in self.rules:
if (previous_rule is not None and
(previous_rule.rule_type.replace('-', '') !=
rule.rule_type.replace('-', ''))):
stringvalue += '\n'
stringvalue += str(rule).rstrip()
stringvalue += '\n'
previous_rule = rule
if stringvalue.endswith('\n'):
stringvalue = stringvalue[:-1]
return stringvalue
def update_rule(service, old_rule, new_rule): def update_rule(service, old_rule, new_rule):
@ -306,8 +415,8 @@ def update_rule(service, old_rule, new_rule):
changed = True changed = True
try: try:
if (new_rule.rule_module_args is not None and if (new_rule.rule_module_args is not None and
new_rule.rule_module_args != new_rule.get_module_args_as_string() !=
rule.rule_module_args): rule.get_module_args_as_string()):
rule.rule_module_args = new_rule.rule_module_args rule.rule_module_args = new_rule.rule_module_args
changed = True changed = True
except AttributeError: except AttributeError:
@ -339,7 +448,9 @@ def insert_before_rule(service, old_rule, new_rule):
new_rule.rule_control != new_rule.rule_control !=
service.rules[index - 1].rule_control or service.rules[index - 1].rule_control or
new_rule.rule_module_path != new_rule.rule_module_path !=
service.rules[index - 1].rule_module_path): service.rules[index - 1].rule_module_path or
new_rule.rule_module_args !=
service.rules[index - 1].rule_module_args):
service.rules.insert(index, new_rule) service.rules.insert(index, new_rule)
changed = True changed = True
if changed: if changed:
@ -364,7 +475,9 @@ def insert_after_rule(service, old_rule, new_rule):
new_rule.rule_control != new_rule.rule_control !=
service.rules[index + 1].rule_control or service.rules[index + 1].rule_control or
new_rule.rule_module_path != new_rule.rule_module_path !=
service.rules[index + 1].rule_module_path): service.rules[index + 1].rule_module_path or
new_rule.rule_module_args !=
service.rules[index + 1].rule_module_args):
service.rules.insert(index + 1, new_rule) service.rules.insert(index + 1, new_rule)
changed = True changed = True
if changed: if changed:
@ -440,20 +553,17 @@ def add_module_arguments(service, old_rule, module_args):
return changed, result return changed, result
def write_rules(service): def remove_rule(service, old_rule):
previous_rule = None result = {'action': 'absent'}
changed = False
f = open(service.fname, 'w') change_count = 0
for amble in service.preamble:
f.write(amble + '\n')
for rule in service.rules: for rule in service.rules:
if (previous_rule is not None and if (old_rule.rule_type == rule.rule_type and
previous_rule.rule_type != rule.rule_type): old_rule.rule_control == rule.rule_control and
f.write('\n') old_rule.rule_module_path == rule.rule_module_path):
f.write(str(rule) + '\n') service.rules.remove(rule)
previous_rule = rule changed = True
f.close() return changed, result
def main(): def main():
@ -474,13 +584,20 @@ def main():
module_arguments=dict(required=False, type='list'), module_arguments=dict(required=False, type='list'),
state=dict(required=False, default="updated", state=dict(required=False, default="updated",
choices=['before', 'after', 'updated', choices=['before', 'after', 'updated',
'args_absent', 'args_present']), 'args_absent', 'args_present', 'absent']),
path=dict(required=False, default='/etc/pam.d', type='str') path=dict(required=False, default='/etc/pam.d', type='str')
), ),
supports_check_mode=True, supports_check_mode=True,
required_if=[ required_if=[
("state", "args_present", ["module_arguments"]), ("state", "args_present", ["module_arguments"]),
("state", "args_absent", ["module_arguments"]) ("state", "args_absent", ["module_arguments"]),
("state", "before", ["new_control"]),
("state", "before", ["new_type"]),
("state", "before", ["new_module_path"]),
("state", "after", ["new_control"]),
("state", "after", ["new_type"]),
("state", "after", ["new_module_path"])
] ]
) )
@ -498,7 +615,8 @@ def main():
path = module.params['path'] path = module.params['path']
pamd = PamdService(path, service, module) pamd = PamdService(module)
pamd.load_rules_from_file()
old_rule = PamdRule(old_type, old_rule = PamdRule(old_type,
old_control, old_control,
@ -508,50 +626,33 @@ def main():
new_module_path, new_module_path,
module_arguments) module_arguments)
try: if state == 'updated':
if state == 'updated': change, result = update_rule(pamd,
change, result = update_rule(pamd, old_rule,
old_rule, new_rule)
new_rule) elif state == 'before':
elif state == 'before': change, result = insert_before_rule(pamd,
if (new_rule.rule_control is None or old_rule,
new_rule.rule_type is None or new_rule)
new_rule.rule_module_path is None): elif state == 'after':
change, result = insert_after_rule(pamd,
old_rule,
new_rule)
elif state == 'args_absent':
change, result = remove_module_arguments(pamd,
old_rule,
module_arguments)
elif state == 'args_present':
change, result = add_module_arguments(pamd,
old_rule,
module_arguments)
elif state == 'absent':
change, result = remove_rule(pamd,
old_rule)
module.fail_json(msg='When inserting a new rule before ' + if not module.check_mode and change:
'or after an existing rule, new_type, ' + pamd.write()
'new_control and new_module_path must ' +
'all be set.')
change, result = insert_before_rule(pamd,
old_rule,
new_rule)
elif state == 'after':
if (new_rule.rule_control is None or
new_rule.rule_type is None or
new_rule.rule_module_path is None):
module.fail_json(msg='When inserting a new rule before' +
'or after an existing rule, new_type,' +
' new_control and new_module_path must' +
' all be set.')
change, result = insert_after_rule(pamd,
old_rule,
new_rule)
elif state == 'args_absent':
change, result = remove_module_arguments(pamd,
old_rule,
module_arguments)
elif state == 'args_present':
change, result = add_module_arguments(pamd,
old_rule,
module_arguments)
if not module.check_mode:
write_rules(pamd)
except Exception:
e = get_exception()
module.fail_json(msg='error running changing pamd: %s' % str(e))
facts = {} facts = {}
facts['pamd'] = {'changed': change, 'result': result} facts['pamd'] = {'changed': change, 'result': result}

View file

@ -0,0 +1,203 @@
from ansible.compat.tests import unittest
from ansible.modules.system.pamd import PamdRule
from ansible.modules.system.pamd import PamdService
from ansible.modules.system.pamd import update_rule
from ansible.modules.system.pamd import insert_before_rule
from ansible.modules.system.pamd import insert_after_rule
from ansible.modules.system.pamd import remove_module_arguments
from ansible.modules.system.pamd import add_module_arguments
from ansible.modules.system.pamd import remove_rule
import re
class PamdRuleTestCase(unittest.TestCase):
def test_simple(self):
simple = "auth required pam_env.so".rstrip()
module = PamdRule.rulefromstring(stringline=simple)
module_string = re.sub(' +', ' ', str(module).replace('\t', ' '))
self.assertEqual(simple, module_string.rstrip())
self.assertEqual('', module.get_module_args_as_string())
def test_simple_more(self):
simple = "auth required pam_tally2.so deny=5 onerr=fail".rstrip()
module = PamdRule.rulefromstring(stringline=simple)
module_string = re.sub(' +', ' ', str(module).replace('\t', ' '))
self.assertEqual(simple, module_string.rstrip())
self.assertEqual('deny=5 onerr=fail',
module.get_module_args_as_string())
def test_complicated_rule(self):
complicated = "-auth [default=1 success=ok] pam_localuser.so".rstrip()
module = PamdRule.rulefromstring(stringline=complicated)
module_string = re.sub(' +', ' ', str(module).replace('\t', ' '))
self.assertEqual(complicated, module_string.rstrip())
self.assertEqual('', module.get_module_args_as_string())
def test_more_complicated_rule(self):
complicated = "auth"
complicated += " [success=done ignore=ignore default=die]"
complicated += " pam_unix.so"
complicated += " try_first_pass".rstrip()
module = PamdRule.rulefromstring(stringline=complicated)
module_string = re.sub(' +', ' ', str(module).replace('\t', ' '))
self.assertEqual(complicated, module_string.rstrip())
self.assertEqual('try_first_pass', module.get_module_args_as_string())
def test_less_than_in_args(self):
rule = "auth requisite pam_succeed_if.so uid >= 1025 quiet_success"
module = PamdRule.rulefromstring(stringline=rule)
module_string = re.sub(' +', ' ', str(module).replace('\t', ' '))
self.assertEqual(rule, module_string.rstrip())
self.assertEqual('uid >= 1025 quiet_success', module.get_module_args_as_string())
class PamdServiceTestCase(unittest.TestCase):
def setUp(self):
self.system_auth_string = """#%PAM-1.0
# This file is auto-generated.
# User changes will be destroyed the next time authconfig is run.
auth required pam_env.so
auth sufficient pam_unix.so nullok try_first_pass
auth requisite pam_succeed_if.so uid
auth required pam_deny.so
account required pam_unix.so
account sufficient pam_localuser.so
account sufficient pam_succeed_if.so uid
account required pam_permit.so
password requisite pam_pwquality.so try_first_pass local_users_only retry=3 authtok_type=
password sufficient pam_unix.so sha512 shadow nullok try_first_pass use_authtok
password required pam_deny.so
session optional pam_keyinit.so revoke
session required pam_limits.so
-session optional pam_systemd.so
session [success=1 default=ignore] pam_succeed_if.so service in crond quiet use_uid
session [success=1 test=me default=ignore] pam_succeed_if.so service in crond quiet use_uid
session required pam_unix.so"""
self.pamd = PamdService()
self.pamd.load_rules_from_string(self.system_auth_string)
def test_load_rule_from_string(self):
self.assertEqual(self.system_auth_string, str(self.pamd))
def test_update_rule_type(self):
old_rule = PamdRule.rulefromstring('auth required pam_env.so')
new_rule = PamdRule.rulefromstring('session required pam_env.so')
update_rule(self.pamd, old_rule, new_rule)
self.assertIn(str(new_rule).rstrip(), str(self.pamd))
self.assertNotIn(str(old_rule).rstrip(), str(self.pamd))
def test_update_rule_control_simple(self):
old_rule = PamdRule.rulefromstring('auth required pam_env.so')
new_rule = PamdRule.rulefromstring('auth sufficent pam_env.so')
update_rule(self.pamd, old_rule, new_rule)
self.assertIn(str(new_rule).rstrip(), str(self.pamd))
self.assertNotIn(str(old_rule).rstrip(), str(self.pamd))
def test_update_rule_control_complex(self):
old_rule = PamdRule.rulefromstring('session [success=1 default=ignore] pam_succeed_if.so service in crond quiet use_uid')
new_rule = PamdRule.rulefromstring('session [success=2 test=me default=ignore] pam_succeed_if.so service in crond quiet use_uid')
update_rule(self.pamd, old_rule, new_rule)
self.assertIn(str(new_rule).rstrip(), str(self.pamd))
self.assertNotIn(str(old_rule).rstrip(), str(self.pamd))
def test_update_rule_control_more_complex(self):
old_rule = PamdRule.rulefromstring('session [success=1 test=me default=ignore] pam_succeed_if.so service in crond quiet use_uid')
new_rule = PamdRule.rulefromstring('session [success=2 test=me default=ignore] pam_succeed_if.so service in crond quiet use_uid')
update_rule(self.pamd, old_rule, new_rule)
self.assertIn(str(new_rule).rstrip(), str(self.pamd))
self.assertNotIn(str(old_rule).rstrip(), str(self.pamd))
def test_update_rule_module_path(self):
old_rule = PamdRule.rulefromstring('auth required pam_env.so')
new_rule = PamdRule.rulefromstring('session required pam_limits.so')
update_rule(self.pamd, old_rule, new_rule)
self.assertIn(str(new_rule).rstrip(), str(self.pamd))
self.assertNotIn(str(old_rule).rstrip(), str(self.pamd))
def test_update_rule_module_args(self):
old_rule = PamdRule.rulefromstring('auth sufficient pam_unix.so nullok try_first_pass')
new_rule = PamdRule.rulefromstring('auth sufficient pam_unix.so uid uid')
update_rule(self.pamd, old_rule, new_rule)
self.assertIn(str(new_rule).rstrip(), str(self.pamd))
self.assertNotIn(str(old_rule).rstrip(), str(self.pamd))
def test_update_first_three(self):
old_rule = PamdRule.rulefromstring('auth required pam_env.so')
new_rule = PamdRule.rulefromstring('one two three')
update_rule(self.pamd, old_rule, new_rule)
self.assertIn(str(new_rule).rstrip(), str(self.pamd))
self.assertNotIn(str(old_rule).rstrip(), str(self.pamd))
def test_update_first_three_with_module_args(self):
old_rule = PamdRule.rulefromstring('auth sufficient pam_unix.so nullok try_first_pass')
new_rule = PamdRule.rulefromstring('one two three')
update_rule(self.pamd, old_rule, new_rule)
self.assertIn(str(new_rule).rstrip(), str(self.pamd))
self.assertNotIn(str(old_rule).rstrip(), str(self.pamd))
def test_update_all_four(self):
old_rule = PamdRule.rulefromstring('auth sufficient pam_unix.so nullok try_first_pass')
new_rule = PamdRule.rulefromstring('one two three four five')
update_rule(self.pamd, old_rule, new_rule)
self.assertIn(str(new_rule).rstrip(), str(self.pamd))
self.assertNotIn(str(old_rule).rstrip(), str(self.pamd))
def test_insert_before_rule(self):
old_rule = PamdRule.rulefromstring('account required pam_unix.so')
new_rule = PamdRule.rulefromstring('account required pam_permit.so')
insert_before_rule(self.pamd, old_rule, new_rule)
line_to_test = str(new_rule).rstrip()
line_to_test += '\n'
line_to_test += str(old_rule).rstrip()
self.assertIn(line_to_test, str(self.pamd))
def test_insert_after_rule(self):
old_rule = PamdRule.rulefromstring('account required pam_unix.so')
new_rule = PamdRule.rulefromstring('account required pam_permit.so arg1 arg2 arg3')
insert_after_rule(self.pamd, old_rule, new_rule)
line_to_test = str(old_rule).rstrip()
line_to_test += '\n'
line_to_test += str(new_rule).rstrip()
self.assertIn(line_to_test, str(self.pamd))
def test_remove_module_arguments_one(self):
old_rule = PamdRule.rulefromstring('auth sufficient pam_unix.so nullok try_first_pass')
new_rule = PamdRule.rulefromstring('auth sufficient pam_unix.so try_first_pass')
args_to_remove = ['nullok']
remove_module_arguments(self.pamd, old_rule, args_to_remove)
self.assertIn(str(new_rule).rstrip(), str(self.pamd))
self.assertNotIn(str(old_rule).rstrip(), str(self.pamd))
def test_remove_module_arguments_two(self):
old_rule = PamdRule.rulefromstring('session [success=1 default=ignore] pam_succeed_if.so service in crond quiet use_uid')
new_rule = PamdRule.rulefromstring('session [success=1 default=ignore] pam_succeed_if.so in quiet use_uid')
args_to_remove = ['service', 'crond']
remove_module_arguments(self.pamd, old_rule, args_to_remove)
self.assertIn(str(new_rule).rstrip(), str(self.pamd))
self.assertNotIn(str(old_rule).rstrip(), str(self.pamd))
def test_add_module_arguments_where_none_existed(self):
old_rule = PamdRule.rulefromstring('account required pam_unix.so')
new_rule = PamdRule.rulefromstring('account required pam_unix.so arg1 arg2= arg3=arg3')
args_to_add = ['arg1', 'arg2=', 'arg3=arg3']
add_module_arguments(self.pamd, old_rule, args_to_add)
self.assertIn(str(new_rule).rstrip(), str(self.pamd))
def test_add_module_arguments_where_some_existed(self):
old_rule = PamdRule.rulefromstring('auth sufficient pam_unix.so nullok try_first_pass')
new_rule = PamdRule.rulefromstring('auth sufficient pam_unix.so nullok try_first_pass arg1 arg2= arg3=arg3')
args_to_add = ['arg1', 'arg2=', 'arg3=arg3']
add_module_arguments(self.pamd, old_rule, args_to_add)
self.assertIn(str(new_rule).rstrip(), str(self.pamd))
def test_remove_rule(self):
old_rule = PamdRule.rulefromstring('account required pam_unix.so')
remove_rule(self.pamd, old_rule)
self.assertNotIn(str(old_rule).rstrip(), str(self.pamd))