Fixed to the lineinfile module.

Added the backrefs parameter to the lineinfile module.
Added tests for the backrefs functionality of the lineinfile module.
This commit is contained in:
tin 2013-03-26 00:22:33 +01:00
parent 868af1a892
commit f9b70822d2
2 changed files with 216 additions and 95 deletions

View file

@ -90,7 +90,7 @@ options:
get the original file back if you somehow clobbered it incorrectly.
others:
description:
- All arguments accepted by the M(file) module also work here.
- All arguments accepted by the M(file) module also work here.
required: false
"""
@ -109,7 +109,7 @@ EXAMPLES = """
lineinfile dest=/tmp/grub.conf state=present regexp='^(splashimage=.*)$' line="#\\1"
"""
def check_file_attrs(module, changed, message):
@ -121,9 +121,11 @@ def check_file_attrs(module, changed, message):
changed = True
message += "ownership, perms or SE linux context changed"
return [ message, changed ]
return message, changed
def present(module, dest, regexp, line, insertafter, insertbefore, create, backup):
def present(module, dest, regexp, line, insertafter, insertbefore, create,
backup, backrefs):
if os.path.isdir(dest):
module.fail_json(rc=256, msg='Destination %s is a directory !' % dest)
@ -142,10 +144,10 @@ def present(module, dest, regexp, line, insertafter, insertbefore, create, backu
msg = ""
mre = re.compile(regexp)
if insertafter is not None and insertafter not in ('BOF', 'EOF'):
if insertafter not in (None, 'BOF', 'EOF'):
insre = re.compile(insertafter)
elif insertbefore is not None and insertbefore not in ('BOF'):
elif insertbefore not in (None, 'BOF'):
insre = re.compile(insertbefore)
else:
insre = None
@ -154,12 +156,12 @@ def present(module, dest, regexp, line, insertafter, insertbefore, create, backu
# index[1] is the line num where insertafter/inserbefore has been found
index = [-1, -1]
m = None
for lineno in range(0, len(lines)):
match_found = mre.search(lines[lineno])
for lineno, cur_line in enumerate(lines):
match_found = mre.search(cur_line)
if match_found:
index[0] = lineno
m = match_found
elif insre is not None and insre.search(lines[lineno]):
elif insre is not None and insre.search(cur_line):
if insertafter:
# + 1 for the next line
index[1] = lineno + 1
@ -167,15 +169,24 @@ def present(module, dest, regexp, line, insertafter, insertbefore, create, backu
# + 1 for the previous line
index[1] = lineno
msg = ''
changed = False
# Regexp matched a line in the file
if index[0] != -1:
if lines[index[0]] == m.expand(line) + os.linesep:
msg = ''
changed = False
if backrefs:
new_line = m.expand(line)
else:
lines[index[0]] = m.expand(line) + os.linesep
# Don't do backref expansion if not asked.
new_line = line
if lines[index[0]] != new_line + os.linesep:
lines[index[0]] = new_line + os.linesep
msg = 'line replaced'
changed = True
elif backrefs:
# Do absolutely nothing, since it's not safe generating the line
# without the regexp matching to populate the backrefs.
pass
# Add it to the beginning of the file
elif insertbefore == 'BOF' or insertafter == 'BOF':
lines.insert(0, line + os.linesep)
@ -188,11 +199,10 @@ def present(module, dest, regexp, line, insertafter, insertbefore, create, backu
lines.append(line + os.linesep)
msg = 'line added'
changed = True
# Do nothing if regexp didn't match
# Do nothing if insert* didn't match
elif index[1] == -1:
msg = ''
changed = False
# insertafter/insertbefore= matched
pass
# insert* matched, but not the regexp
else:
lines.insert(index[1], line + os.linesep)
msg = 'line added'
@ -205,9 +215,10 @@ def present(module, dest, regexp, line, insertafter, insertbefore, create, backu
f.writelines(lines)
f.close()
[ msg, changed ] = check_file_attrs(module, changed, msg)
msg, changed = check_file_attrs(module, changed, msg)
module.exit_json(changed=changed, msg=msg)
def absent(module, dest, regexp, backup):
if os.path.isdir(dest):
@ -242,36 +253,46 @@ def absent(module, dest, regexp, backup):
if changed:
msg = "%s line(s) removed" % len(found)
[ msg, changed ] = check_file_attrs(module, changed, msg)
msg, changed = check_file_attrs(module, changed, msg)
module.exit_json(changed=changed, found=len(found), msg=msg)
def main():
module = AnsibleModule(
argument_spec = dict(
argument_spec=dict(
dest=dict(required=True, aliases=['name', 'destfile']),
state=dict(default='present', choices=['absent', 'present']),
regexp=dict(required=True),
line=dict(aliases=['value']),
insertafter=dict(default=None),
insertbefore=dict(default=None),
backrefs=dict(default=False, type='bool'),
create=dict(default=False, type='bool'),
backup=dict(default=False, type='bool'),
),
mutually_exclusive = [['insertbefore', 'insertafter']],
add_file_common_args = True,
supports_check_mode = True
mutually_exclusive=[['insertbefore', 'insertafter']],
add_file_common_args=True,
supports_check_mode=True
)
params = module.params
create = module.params['create']
backup = module.params['backup']
dest = os.path.expanduser(params['dest'])
backrefs = module.params['backrefs']
dest = os.path.expanduser(params['dest'])
if params['state'] == 'present':
if 'line' not in params:
module.fail_json(msg='line= is required with state=present')
# Deal with the insertafter default value manually, to avoid errors
# because of the mutually_exclusive mechanism.
ins_bef, ins_aft = params['insertbefore'], params['insertafter']
if ins_bef is None and ins_aft is None:
ins_aft = 'EOF'
present(module, dest, params['regexp'], params['line'],
params['insertafter'], params['insertbefore'], create, backup)
ins_aft, ins_bef, create, backup, backrefs)
else:
absent(module, dest, params['regexp'], backup)
@ -279,4 +300,3 @@ def main():
#<<INCLUDE_ANSIBLE_MODULE_COMMON>>
main()

View file

@ -10,10 +10,10 @@ import os
import shutil
import time
import tempfile
import urllib2
from nose.plugins.skip import SkipTest
def get_binary(name):
for directory in os.environ["PATH"].split(os.pathsep):
path = os.path.join(directory, name)
@ -21,6 +21,7 @@ def get_binary(name):
return path
return None
class TestRunner(unittest.TestCase):
def setUp(self):
@ -110,70 +111,70 @@ class TestRunner(unittest.TestCase):
data_out = file(output).read()
assert data_in == data_out
assert 'failed' not in result
assert result['changed'] == True
assert result['changed'] is True
assert 'md5sum' in result
result = self._run('copy', [
"src=%s" % input_,
"dest=%s" % output,
])
assert result['changed'] == False
assert result['changed'] is False
def test_command(self):
# test command module, change trigger, etc
result = self._run('command', [ "/bin/echo", "hi" ])
result = self._run('command', ["/bin/echo", "hi"])
assert "failed" not in result
assert "msg" not in result
assert result['rc'] == 0
assert result['stdout'] == 'hi'
assert result['stderr'] == ''
result = self._run('command', [ "false" ])
result = self._run('command', ["false"])
assert result['rc'] == 1
assert 'failed' not in result
result = self._run('command', [ "/usr/bin/this_does_not_exist", "splat" ])
result = self._run('command', ["/usr/bin/this_does_not_exist", "splat"])
assert 'msg' in result
assert 'failed' in result
result = self._run('shell', [ "/bin/echo", "$HOME" ])
result = self._run('shell', ["/bin/echo", "$HOME"])
assert 'failed' not in result
assert result['rc'] == 0
result = self._run('command', [ "creates='/tmp/ansible command test'", "chdir=/tmp", "touch", "'ansible command test'" ])
result = self._run('command', ["creates='/tmp/ansible command test'", "chdir=/tmp", "touch", "'ansible command test'"])
assert 'changed' in result
assert result['rc'] == 0
result = self._run('command', [ "creates='/tmp/ansible command test'", "false" ])
result = self._run('command', ["creates='/tmp/ansible command test'", "false"])
assert 'skipped' in result
result = self._run('shell', [ "removes=/tmp/ansible\\ command\\ test", "chdir=/tmp", "rm -f 'ansible command test'; echo $?" ])
result = self._run('shell', ["removes=/tmp/ansible\\ command\\ test", "chdir=/tmp", "rm -f 'ansible command test'; echo $?"])
assert 'changed' in result
assert result['rc'] == 0
assert result['stdout'] == '0'
result = self._run('shell', [ "removes=/tmp/ansible\\ command\\ test", "false" ])
result = self._run('shell', ["removes=/tmp/ansible\\ command\\ test", "false"])
assert 'skipped' in result
def test_git(self):
self._run('file',['path=/tmp/gitdemo','state=absent'])
self._run('file',['path=/tmp/gd','state=absent'])
self._run('command',['git init gitdemo', 'chdir=/tmp'])
self._run('command',['touch a', 'chdir=/tmp/gitdemo'])
self._run('command',['git add *', 'chdir=/tmp/gitdemo'])
self._run('command',['git commit -m "test commit 2"', 'chdir=/tmp/gitdemo'])
self._run('command',['touch b', 'chdir=/tmp/gitdemo'])
self._run('command',['git add *', 'chdir=/tmp/gitdemo'])
self._run('command',['git commit -m "test commit 2"', 'chdir=/tmp/gitdemo'])
result = self._run('git', [ "repo=\"file:///tmp/gitdemo\"", "dest=/tmp/gd" ])
self._run('file', ['path=/tmp/gitdemo', 'state=absent'])
self._run('file', ['path=/tmp/gd', 'state=absent'])
self._run('command', ['git init gitdemo', 'chdir=/tmp'])
self._run('command', ['touch a', 'chdir=/tmp/gitdemo'])
self._run('command', ['git add *', 'chdir=/tmp/gitdemo'])
self._run('command', ['git commit -m "test commit 2"', 'chdir=/tmp/gitdemo'])
self._run('command', ['touch b', 'chdir=/tmp/gitdemo'])
self._run('command', ['git add *', 'chdir=/tmp/gitdemo'])
self._run('command', ['git commit -m "test commit 2"', 'chdir=/tmp/gitdemo'])
result = self._run('git', ["repo=\"file:///tmp/gitdemo\"", "dest=/tmp/gd"])
assert result['changed']
# test the force option not set
self._run('file',['path=/tmp/gd/a','state=absent'])
result = self._run('git', [ "repo=\"file:///tmp/gitdemo\"", "dest=/tmp/gd", "force=no" ])
self._run('file', ['path=/tmp/gd/a', 'state=absent'])
result = self._run('git', ["repo=\"file:///tmp/gitdemo\"", "dest=/tmp/gd", "force=no"])
assert result['failed']
# test the force option when set
result = self._run('git', [ "repo=\"file:///tmp/gitdemo\"", "dest=/tmp/gd", "force=yes" ])
result = self._run('git', ["repo=\"file:///tmp/gitdemo\"", "dest=/tmp/gd", "force=yes"])
assert result['changed']
def test_file(self):
filedemo = tempfile.mkstemp()[1]
assert self._run('file', ['dest=' + filedemo, 'state=directory'])['failed']
@ -191,7 +192,6 @@ class TestRunner(unittest.TestCase):
assert not os.path.exists(filedemo)
assert not self._run('file', ['dest=' + filedemo, 'state=absent'])['changed']
filedemo = tempfile.mkdtemp()
assert self._run('file', ['dest=' + filedemo, 'state=file'])['failed']
assert os.path.isdir(filedemo)
@ -209,7 +209,6 @@ class TestRunner(unittest.TestCase):
assert not os.path.exists(filedemo)
assert not self._run('file', ['dest=' + filedemo, 'state=absent'])['changed']
tmp_dir = tempfile.mkdtemp()
filedemo = os.path.join(tmp_dir, 'link')
os.symlink('/dev/zero', filedemo)
@ -234,7 +233,7 @@ class TestRunner(unittest.TestCase):
if not os.path.exists(large_path):
raise SkipTest
# Ensure reading a large amount of output from a command doesn't hang.
result = self._run('command', [ "/bin/cat", large_path ])
result = self._run('command', ["/bin/cat", large_path])
assert "failed" not in result
assert "msg" not in result
assert result['rc'] == 0
@ -244,14 +243,14 @@ class TestRunner(unittest.TestCase):
def test_async(self):
# test async launch and job status
# of any particular module
result = self._run('command', [ get_binary("sleep"), "3" ], background=20)
result = self._run('command', [get_binary("sleep"), "3"], background=20)
assert 'ansible_job_id' in result
assert 'started' in result
jid = result['ansible_job_id']
# no real chance of this op taking a while, but whatever
time.sleep(5)
# CLI will abstract this (when polling), but this is how it works internally
result = self._run('async_status', [ "jid=%s" % jid ])
result = self._run('async_status', ["jid=%s" % jid])
# TODO: would be nice to have tests for supervisory process
# killing job after X seconds
assert 'finished' in result
@ -263,7 +262,7 @@ class TestRunner(unittest.TestCase):
def test_fetch(self):
input_ = self._get_test_file('sample.j2')
output = os.path.join(self.stage_dir, 'localhost', input_)
result = self._run('fetch', [ "src=%s" % input_, "dest=%s" % self.stage_dir ])
self._run('fetch', ["src=%s" % input_, "dest=%s" % self.stage_dir])
assert os.path.exists(output)
assert open(input_).read() == open(output).read()
@ -280,7 +279,7 @@ class TestRunner(unittest.TestCase):
assert out.find("first") != -1
assert out.find("second") != -1
assert out.find("third") != -1
assert result['changed'] == True
assert result['changed'] is True
assert 'md5sum' in result
assert 'failed' not in result
result = self._run('assemble', [
@ -288,13 +287,14 @@ class TestRunner(unittest.TestCase):
"dest=%s" % output,
])
print result
assert result['changed'] == False
assert result['changed'] is False
def test_lineinfile(self):
"""Unit tests for the lineinfile module, without backref features."""
sampleroot = 'rocannon'
sample_origin = self._get_test_file(sampleroot + '.txt')
sample = self._get_stage_file(sampleroot + '.out' + '.txt')
shutil.copy( sample_origin, sample)
shutil.copy(sample_origin, sample)
# The order of the test cases is important
# defaults to insertafter at the end of the file
@ -303,19 +303,19 @@ class TestRunner(unittest.TestCase):
"dest=%s" % sample,
"regexp='^First: '",
"line='%s'" % testline
])
])
result = self._run(*testcase)
assert result['changed'] == True
assert result['changed']
assert result['msg'] == 'line added'
artifact = [ x.strip() for x in open(sample).readlines() ]
artifact = [x.strip() for x in open(sample)]
assert artifact[-1] == testline
assert artifact.count(testline) == 1
# run a second time, verify only one line has been added
result = self._run(*testcase)
assert result['changed'] == False
assert not result['changed']
assert result['msg'] == ''
artifact = [ x.strip() for x in open(sample).readlines() ]
artifact = [x.strip() for x in open(sample)]
assert artifact.count(testline) == 1
# insertafter with EOF
@ -325,48 +325,48 @@ class TestRunner(unittest.TestCase):
"insertafter=EOF",
"regexp='^Second: '",
"line='%s'" % testline
])
])
result = self._run(*testcase)
assert result['changed'] == True
assert result['changed']
assert result['msg'] == 'line added'
artifact = [ x.strip() for x in open(sample).readlines() ]
artifact = [x.strip() for x in open(sample)]
assert artifact[-1] == testline
assert artifact.count(testline) == 1
# with invalid insertafter regex
# If the regexp doesn't match and the insertafter doesn't match,
# do nothing.
testline = 'Third: Line added with an invalid insertafter regex'
testcase = ('lineinfile', [
"dest=%s" % sample,
"insertafter='^abcdefgh'",
"regexp='^Third: '",
"line='%s'" % testline
])
])
result = self._run(*testcase)
assert result['changed'] == True
assert result['msg'] == 'line added'
artifact = [ x.strip() for x in open(sample).readlines() ]
assert artifact[-1] == testline
assert artifact.count(testline) == 1
assert not result['changed']
# with an insertafter regex
# The regexp doesn't match, but the insertafter is specified and does,
# so insert after insertafter.
testline = 'Fourth: Line added with a valid insertafter regex'
testcase = ('lineinfile', [
"dest=%s" % sample,
"insertafter='^receive messages to '",
"regexp='^Fourth: '",
"line='%s'" % testline
])
])
result = self._run(*testcase)
assert result['changed'] == True
assert result['changed']
assert result['msg'] == 'line added'
artifact = [ x.strip() for x in open(sample).readlines() ]
artifact = [x.strip() for x in open(sample)]
assert artifact.count(testline) == 1
idx = artifact.index('receive messages to and from a corresponding device over any distance')
assert artifact[idx + 1] == testline
# replacement of a line from a regex
# we replace the line, so we need to get its idx before the run
artifact = [ x.strip() for x in open(sample).readlines() ]
artifact = [x.strip() for x in open(sample)]
target_line = 'combination of microphone, speaker, keyboard and display. It can send and'
idx = artifact.index(target_line)
@ -375,18 +375,18 @@ class TestRunner(unittest.TestCase):
"dest=%s" % sample,
"regexp='combination of microphone'",
"line='%s'" % testline
])
])
result = self._run(*testcase)
assert result['changed'] == True
assert result['changed']
assert result['msg'] == 'line replaced'
artifact = [ x.strip() for x in open(sample).readlines() ]
artifact = [x.strip() for x in open(sample)]
assert artifact.count(testline) == 1
assert artifact.index(testline) == idx
assert target_line not in artifact
# removal of a line
# we replace the line, so we need to get its idx before the run
artifact = [ x.strip() for x in open(sample).readlines() ]
artifact = [x.strip() for x in open(sample)]
target_line = 'receive messages to and from a corresponding device over any distance'
idx = artifact.index(target_line)
@ -394,13 +394,12 @@ class TestRunner(unittest.TestCase):
"dest=%s" % sample,
"regexp='^receive messages to and from '",
"state=absent"
])
])
result = self._run(*testcase)
assert result['changed'] == True
artifact = [ x.strip() for x in open(sample).readlines() ]
assert result['changed']
artifact = [x.strip() for x in open(sample)]
assert target_line not in artifact
# with both insertafter and insertbefore (should fail)
testline = 'Seventh: this line should not be there'
testcase = ('lineinfile', [
@ -409,9 +408,9 @@ class TestRunner(unittest.TestCase):
"insertbefore='BOF'",
"regexp='^communication. '",
"line='%s'" % testline
])
])
result = self._run(*testcase)
assert result['failed'] == True
assert result['failed']
# insertbefore with BOF
testline = 'Eighth: insertbefore BOF'
@ -420,11 +419,11 @@ class TestRunner(unittest.TestCase):
"insertbefore=BOF",
"regexp='^Eighth: '",
"line='%s'" % testline
])
])
result = self._run(*testcase)
assert result['changed'] == True
assert result['changed']
assert result['msg'] == 'line added'
artifact = [ x.strip() for x in open(sample).readlines() ]
artifact = [x.strip() for x in open(sample)]
assert artifact.count(testline) == 1
assert artifact[0] == testline
@ -435,11 +434,11 @@ class TestRunner(unittest.TestCase):
"insertbefore='^communication. Typically '",
"regexp='^Ninth: '",
"line='%s'" % testline
])
])
result = self._run(*testcase)
assert result['changed'] == True
assert result['changed']
assert result['msg'] == 'line added'
artifact = [ x.strip() for x in open(sample).readlines() ]
artifact = [x.strip() for x in open(sample)]
assert artifact.count(testline) == 1
idx = artifact.index('communication. Typically it is depicted as a lunch-box sized object with some')
assert artifact[idx - 1] == testline
@ -447,4 +446,106 @@ class TestRunner(unittest.TestCase):
# cleanup
os.unlink(sample)
def test_lineinfile_backrefs(self):
"""Unit tests for the lineinfile module, with backref features."""
sampleroot = 'rocannon'
sample_origin = self._get_test_file(sampleroot + '.txt')
origin_lines = [line.strip() for line in open(sample_origin)]
sample = self._get_stage_file(sampleroot + '.out' + '.txt')
shutil.copy(sample_origin, sample)
# The order of the test cases is important
# The regexp doesn't match, so the line will not be added anywhere.
testline = '\\1: Line added by default at the end of the file.'
testcase = ('lineinfile', [
"dest=%s" % sample,
"regexp='^(First): '",
"line='%s'" % testline,
"backrefs=yes",
])
result = self._run(*testcase)
assert not result['changed']
assert result['msg'] == ''
artifact = [x.strip() for x in open(sample)]
assert artifact == origin_lines
# insertafter with EOF
# The regexp doesn't match, so the line will not be added anywhere.
testline = '\\1: Line added with insertafter=EOF'
testcase = ('lineinfile', [
"dest=%s" % sample,
"insertafter=EOF",
"regexp='^(Second): '",
"line='%s'" % testline,
"backrefs=yes",
])
result = self._run(*testcase)
assert not result['changed']
assert result['msg'] == ''
artifact = [x.strip() for x in open(sample)]
assert artifact == origin_lines
# with invalid insertafter regex
# The regexp doesn't match, so do nothing.
testline = '\\1: Line added with an invalid insertafter regex'
testcase = ('lineinfile', [
"dest=%s" % sample,
"insertafter='^abcdefgh'",
"regexp='^(Third): '",
"line='%s'" % testline,
"backrefs=yes",
])
result = self._run(*testcase)
assert not result['changed']
assert artifact == origin_lines
# with an insertafter regex
# The regexp doesn't match, so do nothing.
testline = '\\1: Line added with a valid insertafter regex'
testcase = ('lineinfile', [
"dest=%s" % sample,
"insertafter='^receive messages to '",
"regexp='^(Fourth): '",
"line='%s'" % testline,
"backrefs=yes",
])
result = self._run(*testcase)
assert not result['changed']
assert result['msg'] == ''
assert artifact == origin_lines
# replacement of a line from a regex
# we replace the line, so we need to get its idx before the run
artifact = [x.strip() for x in open(sample)]
target_line = 'combination of microphone, speaker, keyboard and display. It can send and'
idx = artifact.index(target_line)
testline = '\\\\1 of microphone'
testline_after = 'combination of microphone'
testcase = ('lineinfile', [
"dest=%s" % sample,
"regexp='(combination) of microphone'",
"line='%s'" % testline,
"backrefs=yes",
])
result = self._run(*testcase)
assert result['changed']
assert result['msg'] == 'line replaced'
artifact = [x.strip() for x in open(sample)]
assert artifact.count(testline_after) == 1
assert artifact.index(testline_after) == idx
assert target_line not in artifact
# with both insertafter and insertbefore (should fail)
testline = 'Seventh: this line should not be there'
testcase = ('lineinfile', [
"dest=%s" % sample,
"insertafter='BOF'",
"insertbefore='BOF'",
"regexp='^communication. '",
"line='%s'" % testline
])
result = self._run(*testcase)
assert result['failed']
os.unlink(sample)