From f4cd1c6321705fb1fbf50963b08606e8b1fad502 Mon Sep 17 00:00:00 2001 From: Toshio Kuratomi Date: Wed, 14 Sep 2016 11:49:54 -0700 Subject: [PATCH] Fix galaxy's parsing of the command line. (#17569) Also make the parsing of the action in both galaxy and vault more robust. Fixes #17534 May Fix #17563 --- lib/ansible/cli/__init__.py | 48 +++++++++++++++- lib/ansible/cli/galaxy.py | 2 +- test/units/cli/test_galaxy.py | 105 ++++++++++++++++------------------ 3 files changed, 95 insertions(+), 60 deletions(-) diff --git a/lib/ansible/cli/__init__.py b/lib/ansible/cli/__init__.py index 48def42ba9c..2d29b78b549 100644 --- a/lib/ansible/cli/__init__.py +++ b/lib/ansible/cli/__init__.py @@ -1,4 +1,5 @@ # (c) 2012-2014, Michael DeHaan +# (c) 2016, Toshio Kuratomi # # This file is part of Ansible # @@ -52,6 +53,42 @@ class SortedOptParser(optparse.OptionParser): return optparse.OptionParser.format_help(self, formatter=None) +# Note: Inherit from SortedOptParser so that we get our format_help method +class InvalidOptsParser(SortedOptParser): + '''Ignore invalid options. + + Meant for the special case where we need to take care of help and version + but may not know the full range of options yet. (See it in use in set_action) + ''' + def __init__(self, parser): + # Since this is special purposed to just handle help and version, we + # take a pre-existing option parser here and set our options from + # that. This allows us to give accurate help based on the given + # option parser. + SortedOptParser.__init__(self, usage=parser.usage, + option_list=parser.option_list, + option_class=parser.option_class, + conflict_handler=parser.conflict_handler, + description=parser.description, + formatter=parser.formatter, + add_help_option=False, + prog=parser.prog, + epilog=parser.epilog) + self.version=parser.version + + def _process_long_opt(self, rargs, values): + try: + optparse.OptionParser._process_long_opt(self, rargs, values) + except optparse.BadOptionError: + pass + + def _process_short_opts(self, rargs, values): + try: + optparse.OptionParser._process_short_opts(self, rargs, values) + except optparse.BadOptionError: + pass + + class CLI(object): ''' code behind bin/ansible* programs ''' @@ -82,7 +119,7 @@ class CLI(object): """ Get the action the user wants to execute from the sys argv list. """ - for i in range(0,len(self.args)): + for i in range(0, len(self.args)): arg = self.args[i] if arg in self.VALID_ACTIONS: self.action = arg @@ -90,8 +127,13 @@ class CLI(object): break if not self.action: - # if no need for action if version/help - tmp_options, tmp_args = self.parser.parse_args() + # if we're asked for help or version, we don't need an action. + # have to use a special purpose Option Parser to figure that out as + # the standard OptionParser throws an error for unknown options and + # without knowing action, we only know of a subset of the options + # that could be legal for this command + tmp_parser = InvalidOptsParser(self.parser) + tmp_options, tmp_args = tmp_parser.parse_args(self.args) if not(hasattr(tmp_options, 'help') and tmp_options.help) or (hasattr(tmp_options, 'version') and tmp_options.version): raise AnsibleOptionsError("Missing required action") diff --git a/lib/ansible/cli/galaxy.py b/lib/ansible/cli/galaxy.py index 457e15d5c12..ffbd772c68e 100644 --- a/lib/ansible/cli/galaxy.py +++ b/lib/ansible/cli/galaxy.py @@ -120,7 +120,7 @@ class GalaxyCLI(CLI): if self.action in ("init","install"): self.parser.add_option('-f', '--force', dest='force', action='store_true', default=False, help='Force overwriting an existing role') - self.options, self.args =self.parser.parse_args() + self.options, self.args = self.parser.parse_args(self.args[1:]) display.verbosity = self.options.verbosity self.galaxy = Galaxy(self.options) return True diff --git a/test/units/cli/test_galaxy.py b/test/units/cli/test_galaxy.py index ec975ae11b6..be9f8a5dad2 100644 --- a/test/units/cli/test_galaxy.py +++ b/test/units/cli/test_galaxy.py @@ -46,9 +46,8 @@ class TestGalaxy(unittest.TestCase): shutil.rmtree("./delete_me") # creating framework for a role - gc = GalaxyCLI(args=["init"]) - with patch('sys.argv', ["-c", "--offline", "delete_me"]): - gc.parse() + gc = GalaxyCLI(args=["init", "-c", "--offline", "delete_me"]) + gc.parse() gc.run() cls.role_dir = "./delete_me" cls.role_name = "delete_me" @@ -117,9 +116,8 @@ class TestGalaxy(unittest.TestCase): def test_execute_remove(self): # installing role - gc = GalaxyCLI(args=["install"]) - with patch('sys.argv', ["--offline", "-p", self.role_path, "-r", self.role_req]): - galaxy_parser = gc.parse() + gc = GalaxyCLI(args=["install", "--offline", "-p", self.role_path, "-r", self.role_req]) + galaxy_parser = gc.parse() gc.run() # checking that installation worked @@ -127,9 +125,8 @@ class TestGalaxy(unittest.TestCase): self.assertTrue(os.path.exists(role_file)) # removing role - gc = GalaxyCLI(args=["remove"]) - with patch('sys.argv', ["-c", "-p", self.role_path, self.role_name]): - galaxy_parser = gc.parse() + gc = GalaxyCLI(args=["remove", "-c", "-p", self.role_path, self.role_name]) + galaxy_parser = gc.parse() super(GalaxyCLI, gc).run() gc.api = ansible.galaxy.api.GalaxyAPI(gc.galaxy) completed_task = gc.execute_remove() @@ -140,19 +137,18 @@ class TestGalaxy(unittest.TestCase): def test_exit_without_ignore(self): ''' tests that GalaxyCLI exits with the error specified unless the --ignore-errors flag is used ''' - gc = GalaxyCLI(args=["install"]) + gc = GalaxyCLI(args=["install", "-c", "fake_role_name"]) # testing without --ignore-errors flag - with patch('sys.argv', ["-c", "fake_role_name"]): - galaxy_parser = gc.parse() + galaxy_parser = gc.parse() with patch.object(ansible.utils.display.Display, "display", return_value=None) as mocked_display: # testing that error expected is raised self.assertRaises(AnsibleError, gc.run) self.assertTrue(mocked_display.called_once_with("- downloading role 'fake_role_name', owned by ")) # testing with --ignore-errors flag - with patch('sys.argv', ["-c", "fake_role_name", "--ignore-errors"]): - galalxy_parser = gc.parse() + gc = GalaxyCLI(args=["install", "-c", "fake_role_name", "--ignore-errors"]) + galalxy_parser = gc.parse() with patch.object(ansible.utils.display.Display, "display", return_value=None) as mocked_display: # testing that error expected is not raised with --ignore-errors flag in use gc.run() @@ -160,53 +156,50 @@ class TestGalaxy(unittest.TestCase): def run_parse_common(self, galaxycli_obj, action): with patch.object(ansible.cli.SortedOptParser, "set_usage") as mocked_usage: - with patch('sys.argv', ["-c"]): - galaxy_parser = galaxycli_obj.parse() + galaxy_parser = galaxycli_obj.parse() - # checking that the common results of parse() for all possible actions have been created/called - self.assertTrue(galaxy_parser) - self.assertTrue(isinstance(galaxycli_obj.parser, ansible.cli.SortedOptParser)) - self.assertTrue(isinstance(galaxycli_obj.galaxy, ansible.galaxy.Galaxy)) - if action in ['import', 'delete']: - formatted_call = 'usage: %prog ' + action + ' [options] github_user github_repo' - elif action == 'info': - formatted_call = 'usage: %prog ' + action + ' [options] role_name[,version]' - elif action == 'init': - formatted_call = 'usage: %prog ' + action + ' [options] role_name' - elif action == 'install': - formatted_call = 'usage: %prog ' + action + ' [options] [-r FILE | role_name(s)[,version] | scm+role_repo_url[,version] | tar_file(s)]' - elif action == 'list': - formatted_call = 'usage: %prog ' + action + ' [role_name]' - elif action == 'login': - formatted_call = 'usage: %prog ' + action + ' [options]' - elif action == 'remove': - formatted_call = 'usage: %prog ' + action + ' role1 role2 ...' - elif action == 'search': - formatted_call = 'usage: %prog ' + action + ' [searchterm1 searchterm2] [--galaxy-tags galaxy_tag1,galaxy_tag2] [--platforms platform1,platform2] [--author username]' - elif action == 'setup': - formatted_call = 'usage: %prog ' + action + ' [options] source github_user github_repo secret' - calls = [call('usage: %prog [delete|import|info|init|install|list|login|remove|search|setup] [--help] [options] ...'), call(formatted_call)] - mocked_usage.assert_has_calls(calls) + # checking that the common results of parse() for all possible actions have been created/called + self.assertTrue(galaxy_parser) + self.assertTrue(isinstance(galaxycli_obj.parser, ansible.cli.SortedOptParser)) + self.assertTrue(isinstance(galaxycli_obj.galaxy, ansible.galaxy.Galaxy)) + if action in ['import', 'delete']: + formatted_call = 'usage: %prog ' + action + ' [options] github_user github_repo' + elif action == 'info': + formatted_call = 'usage: %prog ' + action + ' [options] role_name[,version]' + elif action == 'init': + formatted_call = 'usage: %prog ' + action + ' [options] role_name' + elif action == 'install': + formatted_call = 'usage: %prog ' + action + ' [options] [-r FILE | role_name(s)[,version] | scm+role_repo_url[,version] | tar_file(s)]' + elif action == 'list': + formatted_call = 'usage: %prog ' + action + ' [role_name]' + elif action == 'login': + formatted_call = 'usage: %prog ' + action + ' [options]' + elif action == 'remove': + formatted_call = 'usage: %prog ' + action + ' role1 role2 ...' + elif action == 'search': + formatted_call = 'usage: %prog ' + action + ' [searchterm1 searchterm2] [--galaxy-tags galaxy_tag1,galaxy_tag2] [--platforms platform1,platform2] [--author username]' + elif action == 'setup': + formatted_call = 'usage: %prog ' + action + ' [options] source github_user github_repo secret' + calls = [call('usage: %prog [delete|import|info|init|install|list|login|remove|search|setup] [--help] [options] ...'), call(formatted_call)] + mocked_usage.assert_has_calls(calls) def test_parse(self): ''' systematically testing that the expected options parser is created ''' # testing no action given - gc = GalaxyCLI(args=[]) - with patch('sys.argv', ["-c"]): - self.assertRaises(AnsibleOptionsError, gc.parse) + gc = GalaxyCLI(args=["-c"]) + self.assertRaises(AnsibleOptionsError, gc.parse) # testing action that doesn't exist - gc = GalaxyCLI(args=["NOT_ACTION"]) - with patch('sys.argv', ["-c"]): - self.assertRaises(AnsibleOptionsError, gc.parse) + gc = GalaxyCLI(args=["NOT_ACTION", "-c"]) + self.assertRaises(AnsibleOptionsError, gc.parse) # testing action 'delete' - gc = GalaxyCLI(args=["delete"]) + gc = GalaxyCLI(args=["delete", "-c"]) self.run_parse_common(gc, "delete") self.assertTrue(gc.options.verbosity==0) # testing action 'import' - gc = GalaxyCLI(args=["import"]) + gc = GalaxyCLI(args=["import", "-c"]) self.run_parse_common(gc, "import") self.assertTrue(gc.options.wait==True) self.assertTrue(gc.options.reference==None) @@ -214,18 +207,18 @@ class TestGalaxy(unittest.TestCase): self.assertTrue(gc.options.verbosity==0) # testing action 'info' - gc = GalaxyCLI(args=["info"]) + gc = GalaxyCLI(args=["info", "-c"]) self.run_parse_common(gc, "info") self.assertTrue(gc.options.offline==False) # testing action 'init' - gc = GalaxyCLI(args=["init"]) + gc = GalaxyCLI(args=["init", "-c"]) self.run_parse_common(gc, "init") self.assertTrue(gc.options.offline==False) self.assertTrue(gc.options.force==False) # testing action 'install' - gc = GalaxyCLI(args=["install"]) + gc = GalaxyCLI(args=["install", "-c"]) self.run_parse_common(gc, "install") self.assertTrue(gc.options.ignore_errors==False) self.assertTrue(gc.options.no_deps==False) @@ -233,30 +226,30 @@ class TestGalaxy(unittest.TestCase): self.assertTrue(gc.options.force==False) # testing action 'list' - gc = GalaxyCLI(args=["list"]) + gc = GalaxyCLI(args=["list", "-c"]) self.run_parse_common(gc, "list") self.assertTrue(gc.options.verbosity==0) # testing action 'login' - gc = GalaxyCLI(args=["login"]) + gc = GalaxyCLI(args=["login", "-c"]) self.run_parse_common(gc, "login") self.assertTrue(gc.options.verbosity==0) self.assertTrue(gc.options.token==None) # testing action 'remove' - gc = GalaxyCLI(args=["remove"]) + gc = GalaxyCLI(args=["remove", "-c"]) self.run_parse_common(gc, "remove") self.assertTrue(gc.options.verbosity==0) # testing action 'search' - gc = GalaxyCLI(args=["search"]) + gc = GalaxyCLI(args=["search", "-c"]) self.run_parse_common(gc, "search") self.assertTrue(gc.options.platforms==None) self.assertTrue(gc.options.tags==None) self.assertTrue(gc.options.author==None) # testing action 'setup' - gc = GalaxyCLI(args=["setup"]) + gc = GalaxyCLI(args=["setup", "-c"]) self.run_parse_common(gc, "setup") self.assertTrue(gc.options.verbosity==0) self.assertTrue(gc.options.remove_id==None)