#!/usr/bin/python # -*- coding: utf-8 -*- # (c) 2013, Andrew Dunham # (c) 2013, Daniel Jaouen # # Based on macports (Jimmy Tang ) # # This module is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This software is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this software. If not, see . DOCUMENTATION = ''' --- module: homebrew author: Andrew Dunham and Daniel Jaouen short_description: Package manager for Homebrew description: - Manages Homebrew packages version_added: "1.1" options: name: description: - name of package to install/remove required: true state: description: - state of the package choices: [ 'head', 'latest', 'present', 'absent', 'linked', 'uninstalled' ] required: false default: present update_homebrew: description: - update homebrew itself first required: false default: "no" choices: [ "yes", "no" ] install_options: description: - options flags to install a package required: false default: null notes: [] ''' EXAMPLES = ''' - homebrew: name=foo state=present - homebrew: name=foo state=present update_homebrew=yes - homebrew: name=foo state=latest update_homebrew=yes - homebrew: update_homebrew=yes upgrade=yes - homebrew: name=foo state=head - homebrew: name=foo state=linked - homebrew: name=foo state=absent - homebrew: name=foo,bar state=absent - homebrew: name=foo state=present install_options=with-baz,enable-debug ''' import os.path import re # exceptions -------------------------------------------------------------- {{{ class HomebrewException(Exception): pass # /exceptions ------------------------------------------------------------- }}} # utils ------------------------------------------------------------------- {{{ def _create_regex_group(s): lines = (line.strip() for line in s.split('\n') if line.strip()) chars = filter(None, (line.split('#')[0].strip() for line in lines)) group = r'[^' + r''.join(chars) + r']' return re.compile(group) # /utils ------------------------------------------------------------------ }}} class Homebrew(object): '''A class to manage Homebrew packages.''' # class regexes ------------------------------------------------ {{{ VALID_PATH_CHARS = r''' \w # alphanumeric characters (i.e., [a-zA-Z0-9_]) \s # spaces : # colons {sep} # the OS-specific path separator - # dashes '''.format(sep=os.path.sep) VALID_BREW_PATH_CHARS = r''' \w # alphanumeric characters (i.e., [a-zA-Z0-9_]) \s # spaces {sep} # the OS-specific path separator - # dashes '''.format(sep=os.path.sep) VALID_PACKAGE_CHARS = r''' \w # alphanumeric characters (i.e., [a-zA-Z0-9_]) - # dashes ''' INVALID_PATH_REGEX = _create_regex_group(VALID_PATH_CHARS) INVALID_BREW_PATH_REGEX = _create_regex_group(VALID_BREW_PATH_CHARS) INVALID_PACKAGE_REGEX = _create_regex_group(VALID_PACKAGE_CHARS) # /class regexes ----------------------------------------------- }}} # class validations -------------------------------------------- {{{ @classmethod def valid_path(cls, path): ''' `path` must be one of: - list of paths - a string containing only: - alphanumeric characters - dashes - spaces - colons - os.path.sep ''' if isinstance(path, basestring): return not cls.INVALID_PATH_REGEX.search(path) try: iter(path) except TypeError: return False else: paths = path return all(cls.valid_brew_path(path_) for path_ in paths) @classmethod def valid_brew_path(cls, brew_path): ''' `brew_path` must be one of: - None - a string containing only: - alphanumeric characters - dashes - spaces - os.path.sep ''' if brew_path is None: return True return ( isinstance(brew_path, basestring) and not cls.INVALID_BREW_PATH_REGEX.search(brew_path) ) @classmethod def valid_package(cls, package): '''A valid package is either None or alphanumeric.''' if package is None: return True return ( isinstance(package, basestring) and not cls.INVALID_PACKAGE_REGEX.search(package) ) @classmethod def valid_state(cls, state): ''' A valid state is one of: - None - installed - upgraded - head - linked - unlinked - absent ''' if state is None: return True else: return ( isinstance(state, basestring) and state.lower() in ( 'installed', 'upgraded', 'head', 'linked', 'unlinked', 'absent', ) ) @classmethod def valid_module(cls, module): '''A valid module is an instance of AnsibleModule.''' return isinstance(module, AnsibleModule) # /class validations ------------------------------------------- }}} # class properties --------------------------------------------- {{{ @property def module(self): return self._module @module.setter def module(self, module): if not self.valid_module(module): self._module = None self.failed = True self.message = 'Invalid module: {0}.'.format(module) raise HomebrewException(self.message) else: self._module = module return module @property def path(self): return self._path @path.setter def path(self, path): if not self.valid_path(path): self._path = [] self.failed = True self.message = 'Invalid path: {0}.'.format(path) raise HomebrewException(self.message) else: if isinstance(path, basestring): self._path = path.split(':') else: self._path = path return path @property def brew_path(self): return self._brew_path @brew_path.setter def brew_path(self, brew_path): if not self.valid_brew_path(brew_path): self._brew_path = None self.failed = True self.message = 'Invalid brew_path: {0}.'.format(brew_path) raise HomebrewException(self.message) else: self._brew_path = brew_path return brew_path @property def params(self): return self._params @params.setter def params(self, params): self._params = self.module.params return self._params @property def current_package(self): return self._current_package @current_package.setter def current_package(self, package): if not self.valid_package(package): self._current_package = None self.failed = True self.message = 'Invalid package: {0}.'.format(package) raise HomebrewException(self.message) else: self._current_package = package return package # /class properties -------------------------------------------- }}} def __init__(self, module, path=None, packages=None, state=None, update_homebrew=False, install_options=None): if not install_options: install_options = list() self._setup_status_vars() self._setup_instance_vars(module=module, path=path, packages=packages, state=state, update_homebrew=update_homebrew, install_options=install_options, ) self._prep() # prep --------------------------------------------------------- {{{ def _setup_status_vars(self): self.failed = False self.changed = False self.changed_count = 0 self.unchanged_count = 0 self.message = '' def _setup_instance_vars(self, **kwargs): for key, val in kwargs.iteritems(): setattr(self, key, val) def _prep(self): self._prep_path() self._prep_brew_path() def _prep_path(self): if not self.path: self.path = ['/usr/local/bin'] def _prep_brew_path(self): if not self.module: self.brew_path = None self.failed = True self.message = 'AnsibleModule not set.' raise HomebrewException(self.message) self.brew_path = self.module.get_bin_path( 'brew', required=True, opt_dirs=self.path, ) if not self.brew_path: self.brew_path = None self.failed = True self.message = 'Unable to locate homebrew executable.' raise HomebrewException('Unable to locate homebrew executable.') return self.brew_path def _status(self): return (self.failed, self.changed, self.message) # /prep -------------------------------------------------------- }}} def run(self): try: self._run() except HomebrewException: pass if not self.failed and (self.changed_count + self.unchanged_count > 1): self.message = "Changed: %d, Unchanged: %d" % ( self.changed_count, self.unchanged_count, ) (failed, changed, message) = self._status() return (failed, changed, message) # checks ------------------------------------------------------- {{{ def _current_package_is_installed(self): if not self.valid_package(self.current_package): self.failed = True self.message = 'Invalid package: {0}.'.format(self.current_package) raise HomebrewException(self.message) rc, out, err = self.module.run_command( "{brew_path} list -m1 | grep -q '^{package}$'".format( brew_path=self.brew_path, package=self.current_package, ) ) if rc == 0: return True else: return False def _outdated_packages(self): rc, out, err = self.module.run_command([ self.brew_path, 'outdated', ]) return [line.split(' ')[0].strip() for line in out.split('\n') if line] def _current_package_is_outdated(self): if not self.valid_package(self.current_package): return False return self.current_package in self._outdated_packages() def _current_package_is_installed_from_head(self): if not Homebrew.valid_package(self.current_package): return False elif not self._current_package_is_installed(): return False rc, out, err = self.module.run_command([ self.brew_path, 'info', self.current_package, ]) try: version_info = [line for line in out.split('\n') if line][0] except IndexError: return False return version_info.split(' ')[-1] == 'HEAD' # /checks ------------------------------------------------------ }}} # commands ----------------------------------------------------- {{{ def _run(self): if self.update_homebrew: self._update_homebrew() if self.packages: if self.state == 'installed': return self._install_packages() elif self.state == 'upgraded': return self._upgrade_packages() elif self.state == 'head': return self._install_packages() elif self.state == 'linked': return self._link_packages() elif self.state == 'unlinked': return self._unlink_packages() elif self.state == 'absent': return self._uninstall_packages() # updated -------------------------------- {{{ def _update_homebrew(self): rc, out, err = self.module.run_command([ self.brew_path, 'update', ]) if rc == 0: if out and isinstance(out, basestring): already_updated = any( re.search(r'Already up-to-date.', s.strip(), re.IGNORECASE) for s in out.split('\n') if s ) if not already_updated: self.changed = True self.message = 'Homebrew updated successfully.' else: self.message = 'Homebrew already up-to-date.' return True else: self.failed = True self.message = err.strip() raise HomebrewException(self.message) # /updated ------------------------------- }}} # installed ------------------------------ {{{ def _install_current_package(self): if not self.valid_package(self.current_package): self.failed = True self.message = 'Invalid package: {0}.'.format(self.current_package) raise HomebrewException(self.message) if self._current_package_is_installed(): self.unchanged_count += 1 self.message = 'Package already installed: {0}'.format( self.current_package, ) return True if self.module.check_mode: self.changed = True self.message = 'Package would be installed: {0}'.format( self.current_package ) raise HomebrewException(self.message) if self.state == 'head': head = '--HEAD' else: head = None opts = ( [self.brew_path, 'install'] + self.install_options + [self.current_package, head] ) cmd = [opt for opt in opts if opt] rc, out, err = self.module.run_command(cmd) if self._current_package_is_installed(): self.changed_count += 1 self.changed = True self.message = 'Package installed: {0}'.format(self.current_package) return True else: self.failed = True self.message = err.strip() raise HomebrewException(self.message) def _install_packages(self): for package in self.packages: self.current_package = package self._install_current_package() return True # /installed ----------------------------- }}} # upgraded ------------------------------- {{{ def _upgrade_current_package(self): command = 'upgrade' if not self.valid_package(self.current_package): self.failed = True self.message = 'Invalid package: {0}.'.format(self.current_package) raise HomebrewException(self.message) if not self._current_package_is_installed(): command = 'install' if self._current_package_is_installed() and not self._current_package_is_outdated(): self.message = 'Package is already upgraded: {0}'.format( self.current_package, ) self.unchanged_count += 1 return True if self.module.check_mode: self.changed = True self.message = 'Package would be upgraded: {0}'.format( self.current_package ) raise HomebrewException(self.message) opts = ( [self.brew_path, command] + self.install_options + [self.current_package] ) cmd = [opt for opt in opts if opt] rc, out, err = self.module.run_command(cmd) if not self._current_package_is_outdated(): self.changed_count += 1 self.changed = True self.message = 'Package upgraded: {0}'.format(self.current_package) return True else: self.failed = True self.message = err.strip() raise HomebrewException(self.message) def _upgrade_all_packages(self): opts = ( [self.brew_path, 'upgrade'] + self.install_options ) cmd = [opt for opt in opts if opt] rc, out, err = self.module.run_command(cmd) if rc == 0: self.changed = True self.message = 'All packages upgraded.' return True else: self.failed = True self.message = err.strip() raise HomebrewException(self.message) def _upgrade_packages(self): if not self.packages: self._upgrade_all_packages() else: for package in self.packages: self.current_package = package self._upgrade_current_package() return True # /upgraded ------------------------------ }}} # uninstalled ---------------------------- {{{ def _uninstall_current_package(self): if not self.valid_package(self.current_package): self.failed = True self.message = 'Invalid package: {0}.'.format(self.current_package) raise HomebrewException(self.message) if not self._current_package_is_installed(): self.unchanged_count += 1 self.message = 'Package already uninstalled: {0}'.format( self.current_package, ) return True if self.module.check_mode: self.changed = True self.message = 'Package would be uninstalled: {0}'.format( self.current_package ) raise HomebrewException(self.message) opts = ( [self.brew_path, 'uninstall'] + self.install_options + [self.current_package] ) cmd = [opt for opt in opts if opt] rc, out, err = self.module.run_command(cmd) if not self._current_package_is_installed(): self.changed_count += 1 self.changed = True self.message = 'Package uninstalled: {0}'.format(self.current_package) return True else: self.failed = True self.message = err.strip() raise HomebrewException(self.message) def _uninstall_packages(self): for package in self.packages: self.current_package = package self._uninstall_current_package() return True # /uninstalled ----------------------------- }}} # linked --------------------------------- {{{ def _link_current_package(self): if not self.valid_package(self.current_package): self.failed = True self.message = 'Invalid package: {0}.'.format(self.current_package) raise HomebrewException(self.message) if not self._current_package_is_installed(): self.failed = True self.message = 'Package not installed: {0}.'.format(self.current_package) raise HomebrewException(self.message) if self.module.check_mode: self.changed = True self.message = 'Package would be linked: {0}'.format( self.current_package ) raise HomebrewException(self.message) opts = ( [self.brew_path, 'link'] + self.install_options + [self.current_package] ) cmd = [opt for opt in opts if opt] rc, out, err = self.module.run_command(cmd) if rc == 0: self.changed_count += 1 self.changed = True self.message = 'Package linked: {0}'.format(self.current_package) return True else: self.failed = True self.message = 'Package could not be linked: {0}.'.format(self.current_package) raise HomebrewException(self.message) def _link_packages(self): for package in self.packages: self.current_package = package self._link_current_package() return True # /linked -------------------------------- }}} # unlinked ------------------------------- {{{ def _unlink_current_package(self): if not self.valid_package(self.current_package): self.failed = True self.message = 'Invalid package: {0}.'.format(self.current_package) raise HomebrewException(self.message) if not self._current_package_is_installed(): self.failed = True self.message = 'Package not installed: {0}.'.format(self.current_package) raise HomebrewException(self.message) if self.module.check_mode: self.changed = True self.message = 'Package would be unlinked: {0}'.format( self.current_package ) raise HomebrewException(self.message) opts = ( [self.brew_path, 'unlink'] + self.install_options + [self.current_package] ) cmd = [opt for opt in opts if opt] rc, out, err = self.module.run_command(cmd) if rc == 0: self.changed_count += 1 self.changed = True self.message = 'Package unlinked: {0}'.format(self.current_package) return True else: self.failed = True self.message = 'Package could not be unlinked: {0}.'.format(self.current_package) raise HomebrewException(self.message) def _unlink_packages(self): for package in self.packages: self.current_package = package self._unlink_current_package() return True # /unlinked ------------------------------ }}} # /commands ---------------------------------------------------- }}} def main(): module = AnsibleModule( argument_spec=dict( name=dict(aliases=["pkg"], required=False), path=dict(required=False), state=dict( default="present", choices=[ "present", "installed", "latest", "upgraded", "head", "linked", "unlinked", "absent", "removed", "uninstalled", ], ), update_homebrew=dict( default="no", aliases=["update-brew"], type='bool', ), install_options=dict( default=None, aliases=['options'], type='list', ) ), supports_check_mode=True, ) p = module.params if p['name']: packages = p['name'].split(',') else: packages = None path = p['path'] if path: path = path.split(':') else: path = ['/usr/local/bin'] state = p['state'] if state in ('present', 'installed', 'head'): state = 'installed' if state in ('latest', 'upgraded'): state = 'upgraded' if state == 'linked': state = 'linked' if state == 'unlinked': state = 'unlinked' if state in ('absent', 'removed', 'uninstalled'): state = 'absent' update_homebrew = p['update_homebrew'] p['install_options'] = p['install_options'] or [] install_options = ['--{0}'.format(install_option) for install_option in p['install_options']] brew = Homebrew(module=module, path=path, packages=packages, state=state, update_homebrew=update_homebrew, install_options=install_options) (failed, changed, message) = brew.run() if failed: module.fail_json(msg=message) else: module.exit_json(changed=changed, msg=message) # this is magic, see lib/ansible/module_common.py #<> main()