Fix inventory for test_dir_inventory

It came up that fixing this unit test may relate to another ticket that is open. This work allows us to uncomment this unit test by fixing how we pars variables allowing a quoted variable to contain a '#'.

Work also went into cleaning up some of the test data to clarify what was working.

Lastly work went into cleaning up formatting so that the code is easily read.
This commit is contained in:
Richard C Isaacson 2014-03-04 18:31:49 -06:00
parent ca4ff261eb
commit 49bd8b0b35
4 changed files with 130 additions and 92 deletions

View file

@ -27,6 +27,7 @@ import shlex
import re
import ast
class InventoryParser(object):
"""
Host inventory for ansible.
@ -47,7 +48,6 @@ class InventoryParser(object):
self._parse_group_variables()
return self.groups
# [webservers]
# alpha
# beta:2345
@ -65,9 +65,36 @@ class InventoryParser(object):
active_group_name = 'ungrouped'
for line in self.lines:
line = line.split("#")[0].strip()
# Split off any comments that are not contained in a variable.
if "#" in line:
split_line = line.split("#")
instances = len(split_line) - 1
if instances > 0:
marker = 0
while marker < instances:
if ("=\"" in split_line[marker] and "\"" in split_line[marker + 1]) or (
"='" in split_line[marker] and "'" in split_line[marker + 1]):
marker += 1
else:
if marker == 0:
line = split_line[marker]
else:
# We have multiple fragments that we need to combine back together.
# rekram is us reversing that work we did with marker.
rekram = 0
new_line = split_line[rekram]
while marker > rekram:
rekram += 1
new_line = new_line + "#" + split_line[rekram]
line = new_line
break
# Clean up the end of the line.
line = line.strip()
if line.startswith("[") and line.endswith("]"):
active_group_name = line.replace("[","").replace("]","")
active_group_name = line.replace("[", "").replace("]", "")
if line.find(":vars") != -1 or line.find(":children") != -1:
active_group_name = active_group_name.rsplit(":", 1)[0]
if active_group_name not in self.groups:
@ -95,20 +122,18 @@ class InventoryParser(object):
if hostname.count(".") == 1:
(hostname, port) = hostname.rsplit(".", 1)
elif (hostname.find("[") != -1 and
hostname.find("]") != -1 and
hostname.find(":") != -1 and
(hostname.rindex("]") < hostname.rindex(":")) or
(hostname.find("]") == -1 and hostname.find(":") != -1)):
(hostname, port) = hostname.rsplit(":", 1)
hostname.find("]") != -1 and
hostname.find(":") != -1 and
(hostname.rindex("]") < hostname.rindex(":")) or
(hostname.find("]") == -1 and hostname.find(":") != -1)):
(hostname, port) = hostname.rsplit(":", 1)
hostnames = []
if detect_range(hostname):
hostnames = expand_hostname_range(hostname)
else:
hostnames = [hostname]
for hn in hostnames:
host = None
if hn in self.hosts:
host = self.hosts[hn]
else:
@ -119,15 +144,24 @@ class InventoryParser(object):
if t.startswith('#'):
break
try:
(k,v) = t.split("=", 1)
(k, v) = t.split("=", 1)
except ValueError, e:
raise errors.AnsibleError("Invalid ini entry: %s - %s" % (t, str(e)))
try:
host.set_variable(k,ast.literal_eval(v))
except:
# most likely a string that literal_eval
# doesn't like, so just set it
host.set_variable(k,v)
# I am not sure where a variable with a hash needs to be evaluated via ast.
# If an instance comes up this is the condition we need to modify.
if "#" in v:
host.set_variable(k, v)
else:
try:
host.set_variable(k, ast.literal_eval(v))
# Using explicit exceptions.
# Likely a string that literal_eval does not like. We wil then just set it.
except ValueError:
# For some reason this was thought to be malformed.
host.set_variable(k, v)
except SyntaxError:
# Is this a hash with an equals at the end?
host.set_variable(k, v)
self.groups[active_group_name].add_host(host)
# [southeast:children]
@ -142,7 +176,7 @@ class InventoryParser(object):
if line is None or line == '':
continue
if line.startswith("[") and line.find(":children]") != -1:
line = line.replace("[","").replace(":children]","")
line = line.replace("[", "").replace(":children]", "")
group = self.groups.get(line, None)
if group is None:
group = self.groups[line] = Group(name=line)
@ -157,7 +191,6 @@ class InventoryParser(object):
else:
group.add_child_group(kid_group)
# [webservers:vars]
# http_port=1234
# maxRequestsPerChild=200
@ -167,7 +200,7 @@ class InventoryParser(object):
for line in self.lines:
line = line.strip()
if line.startswith("[") and line.find(":vars]") != -1:
line = line.replace("[","").replace(":vars]","")
line = line.replace("[", "").replace(":vars]", "")
group = self.groups.get(line, None)
if group is None:
raise errors.AnsibleError("can't add vars to undefined group: %s" % line)

View file

@ -5,6 +5,7 @@ from nose.tools import raises
from ansible import errors
from ansible.inventory import Inventory
class TestInventory(unittest.TestCase):
def setUp(self):
@ -49,14 +50,14 @@ class TestInventory(unittest.TestCase):
def dir_inventory(self):
return Inventory(self.inventory_dir)
all_simple_hosts=['jupiter', 'saturn', 'zeus', 'hera',
'cerberus001','cerberus002','cerberus003',
'cottus99', 'cottus100',
'poseidon', 'thor', 'odin', 'loki',
'thrudgelmir0', 'thrudgelmir1', 'thrudgelmir2',
'thrudgelmir3', 'thrudgelmir4', 'thrudgelmir5',
'Hotep-a', 'Hotep-b', 'Hotep-c',
'BastC', 'BastD', 'neptun', ]
all_simple_hosts = ['jupiter', 'saturn', 'zeus', 'hera',
'cerberus001', 'cerberus002', 'cerberus003',
'cottus99', 'cottus100',
'poseidon', 'thor', 'odin', 'loki',
'thrudgelmir0', 'thrudgelmir1', 'thrudgelmir2',
'thrudgelmir3', 'thrudgelmir4', 'thrudgelmir5',
'Hotep-a', 'Hotep-b', 'Hotep-c',
'BastC', 'BastD', 'neptun', ]
#####################################
### Empty inventory format tests
@ -93,36 +94,36 @@ class TestInventory(unittest.TestCase):
inventory = self.simple_inventory()
hosts = inventory.list_hosts("norse")
expected_hosts=['thor', 'odin', 'loki']
expected_hosts = ['thor', 'odin', 'loki']
assert sorted(hosts) == sorted(expected_hosts)
def test_simple_ungrouped(self):
inventory = self.simple_inventory()
hosts = inventory.list_hosts("ungrouped")
expected_hosts=['jupiter', 'saturn',
'thrudgelmir0', 'thrudgelmir1', 'thrudgelmir2',
'thrudgelmir3', 'thrudgelmir4', 'thrudgelmir5']
expected_hosts = ['jupiter', 'saturn',
'thrudgelmir0', 'thrudgelmir1', 'thrudgelmir2',
'thrudgelmir3', 'thrudgelmir4', 'thrudgelmir5']
assert sorted(hosts) == sorted(expected_hosts)
def test_simple_combined(self):
inventory = self.simple_inventory()
hosts = inventory.list_hosts("norse:greek")
expected_hosts=['zeus', 'hera', 'poseidon',
'cerberus001','cerberus002','cerberus003',
'cottus99','cottus100',
'thor', 'odin', 'loki']
expected_hosts = ['zeus', 'hera', 'poseidon',
'cerberus001', 'cerberus002', 'cerberus003',
'cottus99', 'cottus100',
'thor', 'odin', 'loki']
assert sorted(hosts) == sorted(expected_hosts)
def test_simple_restrict(self):
inventory = self.simple_inventory()
restricted_hosts = ['hera', 'poseidon', 'thor']
expected_hosts=['zeus', 'hera', 'poseidon',
'cerberus001','cerberus002','cerberus003',
'cottus99', 'cottus100',
'thor', 'odin', 'loki']
expected_hosts = ['zeus', 'hera', 'poseidon',
'cerberus001', 'cerberus002', 'cerberus003',
'cottus99', 'cottus100',
'thor', 'odin', 'loki']
inventory.restrict_to(restricted_hosts)
hosts = inventory.list_hosts("norse:greek")
@ -137,12 +138,12 @@ class TestInventory(unittest.TestCase):
def test_simple_string_ipv4(self):
inventory = Inventory('127.0.0.1,192.168.1.1')
hosts = inventory.list_hosts()
self.assertEqual(sorted(hosts), sorted(['127.0.0.1','192.168.1.1']))
self.assertEqual(sorted(hosts), sorted(['127.0.0.1', '192.168.1.1']))
def test_simple_string_ipv4_port(self):
inventory = Inventory('127.0.0.1:2222,192.168.1.1')
hosts = inventory.list_hosts()
self.assertEqual(sorted(hosts), sorted(['127.0.0.1','192.168.1.1']))
self.assertEqual(sorted(hosts), sorted(['127.0.0.1', '192.168.1.1']))
def test_simple_string_ipv4_vars(self):
inventory = Inventory('127.0.0.1:2222,192.168.1.1')
@ -152,12 +153,12 @@ class TestInventory(unittest.TestCase):
def test_simple_string_ipv6(self):
inventory = Inventory('FE80:EF45::12:1,192.168.1.1')
hosts = inventory.list_hosts()
self.assertEqual(sorted(hosts), sorted(['FE80:EF45::12:1','192.168.1.1']))
self.assertEqual(sorted(hosts), sorted(['FE80:EF45::12:1', '192.168.1.1']))
def test_simple_string_ipv6_port(self):
inventory = Inventory('[FE80:EF45::12:1]:2222,192.168.1.1')
hosts = inventory.list_hosts()
self.assertEqual(sorted(hosts), sorted(['FE80:EF45::12:1','192.168.1.1']))
self.assertEqual(sorted(hosts), sorted(['FE80:EF45::12:1', '192.168.1.1']))
def test_simple_string_ipv6_vars(self):
inventory = Inventory('[FE80:EF45::12:1]:2222,192.168.1.1')
@ -167,12 +168,12 @@ class TestInventory(unittest.TestCase):
def test_simple_string_fqdn(self):
inventory = Inventory('foo.example.com,bar.example.com')
hosts = inventory.list_hosts()
self.assertEqual(sorted(hosts), sorted(['foo.example.com','bar.example.com']))
self.assertEqual(sorted(hosts), sorted(['foo.example.com', 'bar.example.com']))
def test_simple_string_fqdn_port(self):
inventory = Inventory('foo.example.com:2222,bar.example.com')
hosts = inventory.list_hosts()
self.assertEqual(sorted(hosts), sorted(['foo.example.com','bar.example.com']))
self.assertEqual(sorted(hosts), sorted(['foo.example.com', 'bar.example.com']))
def test_simple_string_fqdn_vars(self):
inventory = Inventory('foo.example.com:2222,bar.example.com')
@ -191,26 +192,26 @@ class TestInventory(unittest.TestCase):
inventory = self.simple_inventory()
vars = inventory.get_variables('hera')
expected = { 'ansible_ssh_port': 3000,
'group_names': ['greek'],
'inventory_hostname': 'hera',
'inventory_hostname_short': 'hera' }
expected = {'ansible_ssh_port': 3000,
'group_names': ['greek'],
'inventory_hostname': 'hera',
'inventory_hostname_short': 'hera'}
assert vars == expected
def test_large_range(self):
inventory = self.large_range_inventory()
hosts = inventory.list_hosts()
self.assertEqual(sorted(hosts), sorted('bob%03i' %i for i in range(0, 143)))
self.assertEqual(sorted(hosts), sorted('bob%03i' % i for i in range(0, 143)))
def test_subset(self):
inventory = self.simple_inventory()
inventory.subset('odin;thor,loki')
self.assertEqual(sorted(inventory.list_hosts()), sorted(['thor','odin','loki']))
self.assertEqual(sorted(inventory.list_hosts()), sorted(['thor', 'odin', 'loki']))
def test_subset_range(self):
inventory = self.simple_inventory()
inventory.subset('greek[0-2];norse[0]')
self.assertEqual(sorted(inventory.list_hosts()), sorted(['zeus','hera','thor']))
self.assertEqual(sorted(inventory.list_hosts()), sorted(['zeus', 'hera', 'thor']))
def test_subet_range_empty_group(self):
inventory = self.simple_inventory()
@ -220,11 +221,11 @@ class TestInventory(unittest.TestCase):
def test_subset_filename(self):
inventory = self.simple_inventory()
inventory.subset('@' + os.path.join(self.test_dir, 'restrict_pattern'))
self.assertEqual(sorted(inventory.list_hosts()), sorted(['thor','odin']))
self.assertEqual(sorted(inventory.list_hosts()), sorted(['thor', 'odin']))
@raises(errors.AnsibleError)
def testinvalid_entry(self):
Inventory('1234')
Inventory('1234')
###################################################
### INI file advanced tests
@ -240,7 +241,7 @@ class TestInventory(unittest.TestCase):
g=' g ', h=' h ', i="' i \"", j='" j',
rga='1', rgb='2', rgc='3',
inventory_hostname='rtp_a', inventory_hostname_short='rtp_a',
group_names=[ 'eastcoast', 'nc', 'redundantgroup', 'redundantgroup2', 'redundantgroup3', 'rtp', 'us' ]
group_names=['eastcoast', 'nc', 'redundantgroup', 'redundantgroup2', 'redundantgroup3', 'rtp', 'us']
)
print vars
print expected
@ -249,9 +250,9 @@ class TestInventory(unittest.TestCase):
def test_complex_group_names(self):
inventory = self.complex_inventory()
tests = {
'host1': [ 'role1', 'role3' ],
'host2': [ 'role1', 'role2' ],
'host3': [ 'role2', 'role3' ]
'host1': ['role1', 'role3'],
'host2': ['role1', 'role2'],
'host3': ['role2', 'role3']
}
for host, roles in tests.iteritems():
group_names = inventory.get_variables(host)['group_names']
@ -275,11 +276,10 @@ class TestInventory(unittest.TestCase):
def test_complex_enumeration(self):
expected1 = ['rtp_b']
expected2 = ['rtp_a', 'rtp_b']
expected3 = ['rtp_a', 'rtp_b', 'rtp_c', 'tri_a', 'tri_b', 'tri_c']
expected4 = ['rtp_b', 'orlando' ]
expected4 = ['rtp_b', 'orlando']
expected5 = ['blade-a-1']
inventory = self.complex_inventory()
@ -303,34 +303,34 @@ class TestInventory(unittest.TestCase):
@raises(errors.AnsibleError)
def test_invalid_range(self):
Inventory(os.path.join(self.test_dir, 'inventory','test_incorrect_range'))
Inventory(os.path.join(self.test_dir, 'inventory', 'test_incorrect_range'))
@raises(errors.AnsibleError)
def test_missing_end(self):
Inventory(os.path.join(self.test_dir, 'inventory','test_missing_end'))
Inventory(os.path.join(self.test_dir, 'inventory', 'test_missing_end'))
@raises(errors.AnsibleError)
def test_incorrect_format(self):
Inventory(os.path.join(self.test_dir, 'inventory','test_incorrect_format'))
Inventory(os.path.join(self.test_dir, 'inventory', 'test_incorrect_format'))
@raises(errors.AnsibleError)
def test_alpha_end_before_beg(self):
Inventory(os.path.join(self.test_dir, 'inventory','test_alpha_end_before_beg'))
Inventory(os.path.join(self.test_dir, 'inventory', 'test_alpha_end_before_beg'))
def test_combined_range(self):
i = Inventory(os.path.join(self.test_dir, 'inventory','test_combined_range'))
i = Inventory(os.path.join(self.test_dir, 'inventory', 'test_combined_range'))
hosts = i.list_hosts('test')
expected_hosts=['host1A','host2A','host1B','host2B']
expected_hosts = ['host1A', 'host2A', 'host1B', 'host2B']
assert sorted(hosts) == sorted(expected_hosts)
def test_leading_range(self):
i = Inventory(os.path.join(self.test_dir, 'inventory','test_leading_range'))
i = Inventory(os.path.join(self.test_dir, 'inventory', 'test_leading_range'))
hosts = i.list_hosts('test')
expected_hosts=['1.host','2.host','A.host','B.host']
expected_hosts = ['1.host', '2.host', 'A.host', 'B.host']
assert sorted(hosts) == sorted(expected_hosts)
hosts2 = i.list_hosts('test2')
expected_hosts2=['1.host','2.host','3.host']
expected_hosts2 = ['1.host', '2.host', '3.host']
assert sorted(hosts2) == sorted(expected_hosts2)
###################################################
@ -340,38 +340,38 @@ class TestInventory(unittest.TestCase):
inventory = self.script_inventory()
hosts = inventory.list_hosts()
expected_hosts=['jupiter', 'saturn', 'zeus', 'hera', 'poseidon', 'thor', 'odin', 'loki']
expected_hosts = ['jupiter', 'saturn', 'zeus', 'hera', 'poseidon', 'thor', 'odin', 'loki']
print "Expected: %s"%(expected_hosts)
print "Got : %s"%(hosts)
print "Expected: %s" % expected_hosts
print "Got : %s" % hosts
assert sorted(hosts) == sorted(expected_hosts)
def test_script_all(self):
inventory = self.script_inventory()
hosts = inventory.list_hosts('all')
expected_hosts=['jupiter', 'saturn', 'zeus', 'hera', 'poseidon', 'thor', 'odin', 'loki']
expected_hosts = ['jupiter', 'saturn', 'zeus', 'hera', 'poseidon', 'thor', 'odin', 'loki']
assert sorted(hosts) == sorted(expected_hosts)
def test_script_norse(self):
inventory = self.script_inventory()
hosts = inventory.list_hosts("norse")
expected_hosts=['thor', 'odin', 'loki']
expected_hosts = ['thor', 'odin', 'loki']
assert sorted(hosts) == sorted(expected_hosts)
def test_script_combined(self):
inventory = self.script_inventory()
hosts = inventory.list_hosts("norse:greek")
expected_hosts=['zeus', 'hera', 'poseidon', 'thor', 'odin', 'loki']
expected_hosts = ['zeus', 'hera', 'poseidon', 'thor', 'odin', 'loki']
assert sorted(hosts) == sorted(expected_hosts)
def test_script_restrict(self):
inventory = self.script_inventory()
restricted_hosts = ['hera', 'poseidon', 'thor']
expected_hosts=['zeus', 'hera', 'poseidon', 'thor', 'odin', 'loki']
expected_hosts = ['zeus', 'hera', 'poseidon', 'thor', 'odin', 'loki']
inventory.restrict_to(restricted_hosts)
hosts = inventory.list_hosts("norse:greek")
@ -389,7 +389,7 @@ class TestInventory(unittest.TestCase):
print "VARS=%s" % vars
assert vars == {'hammer':True,
assert vars == {'hammer': True,
'group_names': ['norse'],
'inventory_hostname': 'thor',
'inventory_hostname_short': 'thor'}
@ -417,15 +417,17 @@ class TestInventory(unittest.TestCase):
auth = inventory.get_variables('neptun')['auth']
assert auth == 'YWRtaW46YWRtaW4='
# test disabled as needs to be updated to model desired behavior
#
#def test_dir_inventory(self):
# inventory = self.dir_inventory()
# vars = inventory.get_variables('zeus')
#
# print "VARS=%s" % vars
#
# assert vars == {'inventory_hostname': 'zeus',
# 'inventory_hostname_short': 'zeus',
# 'group_names': ['greek', 'major-god', 'ungrouped'],
# 'var_a': '1#2'}
def test_dir_inventory(self):
inventory = self.dir_inventory()
host_vars = inventory.get_variables('zeus')
expected_vars = {'inventory_hostname': 'zeus',
'inventory_hostname_short': 'zeus',
'group_names': ['greek', 'major-god', 'ungrouped'],
'var_a': '2#3'}
print "HOST VARS=%s" % host_vars
print "EXPECTED VARS=%s" % expected_vars
assert host_vars == expected_vars

View file

@ -1,3 +1,3 @@
zeus var_a=2
zeus var_a=0
morpheus
thor

View file

@ -1,5 +1,8 @@
[titan]
cronus var_a="a#b" var_b="b#c" var_c="c#d" # Is this overkill?
[major-god] # group with inline comments
zeus var_a="1#2" # host with inline comments and "#" in the var string
zeus var_a="2#3" # host with inline comments and "#" in the var string
# A comment
thor