From 80aca4b9369073d7b58ff168cf4eef8bb126c2d0 Mon Sep 17 00:00:00 2001 From: Ash Wilson Date: Tue, 17 Feb 2015 16:27:00 -0500 Subject: [PATCH] Rework docker module states. Organize each state into a distinct function for readability and composability. Rework `present` to create but not start containers. Add a `restarted` state to unconditionally restart a container and a `reloaded` state to restart a container if and only if its configuration is incorrect. Store our most recent knowledge about container states in a ContainerSet object. Improve the value registered by this task to include not only the inspect data from any changed containers, but also action counters in their native form, a summary message for all actions taken, and a `reload_reasons` key to store a human-readable diagnostic to determine why each container was reloaded. --- cloud/docker/docker.py | 630 +++++++++++++++++++++++++++++++++-------- 1 file changed, 507 insertions(+), 123 deletions(-) diff --git a/cloud/docker/docker.py b/cloud/docker/docker.py index 0cf7296f370..8efa9a448bb 100644 --- a/cloud/docker/docker.py +++ b/cloud/docker/docker.py @@ -164,12 +164,13 @@ options: - '"killed" stop and kill all matching containers. "absent" stops and then' - removes any matching containers. required: false - default: present + default: started choices: - present - started - reloaded - restarted + - stopped - killed - absent privileged: @@ -324,6 +325,8 @@ HAS_DOCKER_PY = True import sys import json import re +import os +import shlex from urlparse import urlparse try: import docker.client @@ -356,9 +359,11 @@ def _human_to_bytes(number): print "failed=True msg='Could not convert %s to integer'" % (number) sys.exit(1) + def _ansible_facts(container_list): return {"docker_containers": container_list} + def _docker_id_quirk(inspect): # XXX: some quirk in docker if 'ID' in inspect: @@ -385,6 +390,13 @@ def get_split_image_tag(image): return resource, tag + +def is_running(container): + '''Return True if an inspected container is in a state we consider "running."''' + + return container['State']['Running'] == True and not container['State'].get('Ghost', False) + + def get_docker_py_versioninfo(): if hasattr(docker, '__version__'): # a '__version__' attribute was added to the module but not until @@ -414,6 +426,7 @@ def get_docker_py_versioninfo(): return tuple(version) + def check_dependencies(module): """ Ensure `docker-py` >= 0.3.0 is installed, and call module.fail_json with a @@ -429,8 +442,12 @@ def check_dependencies(module): class DockerManager(object): - counters = {'created':0, 'started':0, 'stopped':0, 'killed':0, 'removed':0, 'restarted':0, 'pull':0} + counters = dict( + created=0, started=0, stopped=0, killed=0, removed=0, restarted=0, pulled=0 + ) + reload_reasons = [] _capabilities = set() + # Map optional parameters to minimum (docker-py version, server APIVersion) # docker-py version is a tuple of ints because we have to compare them # server APIVersion is passed to a docker-py function that takes strings @@ -553,7 +570,6 @@ class DockerManager(object): return processed_links - def get_exposed_ports(self, expose_list): """ Parse the ports and protocols (TCP/UDP) to expose in the docker-py `create_container` call from the docker CLI-style syntax. @@ -572,7 +588,6 @@ class DockerManager(object): else: return None - def get_port_bindings(self, ports): """ Parse the `ports` string into a port bindings dict for the `start_container` call. @@ -615,6 +630,37 @@ class DockerManager(object): return binds + def get_summary_message(self): + ''' + Generate a message that briefly describes the actions taken by this + task, in English. + ''' + + parts = [] + for k, v in self.counters.iteritems(): + if v == 0: + continue + + if v == 1: + plural = "" + else: + plural = "s" + parts.append("%s %d container%s" % (k, v, plural)) + + if parts: + return ", ".join(parts) + "." + else: + return "No action taken." + + def get_reload_reason_message(self): + ''' + Generate a message describing why any reloaded containers were reloaded. + ''' + + if self.reload_reasons: + return ", ".join(self.reload_reasons) + else: + return None def get_summary_counters_msg(self): msg = "" @@ -654,9 +700,281 @@ class DockerManager(object): return inspect + def get_differing_containers(self): + """ + Inspect all matching, running containers, and return those that were + started with parameters that differ from the ones that are provided + during this module run. A list containing the differing + containers will be returned, and a short string describing the specific + difference encountered in each container will be appended to + reload_reasons. + + This generates the set of containers that need to be stopped and + started with new parameters with state=reloaded. + """ + + running = self.get_running_containers() + current = self.get_inspect_containers(running) + + image = self.get_inspect_image() + if image is None: + # The image isn't present. Assume that we're about to pull a new + # tag and *everything* will be restarted. + # + # This will give false positives if you untag an image on the host + # and there's nothing more to pull. + return current + + differing = [] + + for container in current: + + # IMAGE + # Compare the image by ID rather than name, so that containers + # will be restarted when new versions of an existing image are + # pulled. + if container['Image'] != image['Id']: + self.reload_reasons.append('image ({} => {})'.format(container['Image'], image['Id'])) + differing.append(container) + continue + + # COMMAND + + expected_command = self.module.params.get('command') + if expected_command: + expected_command = shlex.split(expected_command) + actual_command = container["Config"]["Cmd"] + + if actual_command != expected_command: + self.reload_reasons.append('command ({} => {})'.format(actual_command, expected_command)) + differing.append(container) + continue + + # EXPOSED PORTS + # Note that ports that are bound at container run are also exposed + # implicitly. + expected_exposed_ports = set() + for p in (self.exposed_ports or []): + expected_exposed_ports.add("/".join(p)) + + actually_exposed_ports = set((container["Config"]["ExposedPorts"] or {}).keys()) + + if actually_exposed_ports != expected_exposed_ports: + self.reload_reasons.append('exposed_ports ({} => {})'.format(actually_exposed_ports, expected_exposed_ports)) + differing.append(container) + continue + + # VOLUMES + # not including bind modes. + + expected_volume_keys = set() + if self.volumes: + for key, config in self.volumes.iteritems(): + if not config and key not in self.binds: + expected_volume_keys.add(key) + actual_volume_keys = set((container['Config']['Volumes'] or {}).keys()) + + if actual_volume_keys != expected_volume_keys: + self.reload_reasons.append('volumes ({} => {})'.format(actual_volume_keys, expected_volume_keys)) + differing.append(container) + continue + + # MEM_LIMIT + + expected_mem = _human_to_bytes(self.module.params.get('memory_limit')) + actual_mem = container['Config']['Memory'] + + if expected_mem and actual_mem != expected_mem: + self.reload_reasons.append('memory ({} => {})'.format(actual_mem, expected_mem)) + differing.append(container) + continue + + # ENVIRONMENT + # actual_env is likely to include environment variables injected by + # the Dockerfile. + + expected_env = set() + if self.env: + for name, value in self.env.iteritems(): + expected_env.add("{}={}".format(name, value)) + actual_env = set(container['Config']['Env'] or []) + + if not actual_env.issuperset(expected_env): + # Don't include the environment difference in the output. + self.reload_reasons.append('environment') + differing.append(container) + continue + + # HOSTNAME + + expected_hostname = self.module.params.get('hostname') + actual_hostname = container['Config']['Hostname'] + if expected_hostname and actual_hostname != expected_hostname: + self.reload_reasons.append('hostname ({} => {})'.format(actual_hostname, expected_hostname)) + differing.append(container) + continue + + # DOMAINNAME + + expected_domainname = self.module.params.get('domainname') + actual_domainname = container['Config']['Domainname'] + if expected_domainname and actual_domainname != expected_domainname: + self.reload_reasons.append('domainname ({} => {})'.format(actual_domainname, expected_domainname)) + differing.append(container) + continue + + # DETACH + + # We don't have to check for undetached containers. If it wasn't + # detached, it would have stopped before the playbook continued! + + # NAME + + # We also don't have to check name, because this is one of the + # criteria that's used to determine which container(s) match in + # the first place. + + # STDIN_OPEN + + expected_stdin_open = self.module.params.get('stdin_open') + actual_stdin_open = container['Config']['AttachStdin'] + if actual_stdin_open != expected_stdin_open: + self.reload_reasons.append('stdin_open ({} => {})'.format(actual_stdin_open, expected_stdin_open)) + differing.append(container) + continue + + # TTY + + expected_tty = self.module.params.get('tty') + actual_tty = container['Config']['Tty'] + if actual_tty != expected_tty: + self.reload_reasons.append('tty ({} => {})'.format(actual_tty, expected_tty)) + differing.append(container) + continue + + # -- "start" call differences -- + + # LXC_CONF + + if self.lxc_conf: + expected_lxc = set(self.lxc_conf) + actual_lxc = set(container['HostConfig']['LxcConf'] or []) + if actual_lxc != expected_lxc: + self.reload_reasons.append('lxc_conf ({} => {})'.format(actual_lxc, expected_lxc)) + differing.append(container) + continue + + # BINDS + + expected_binds = set() + if self.binds: + for host_path, config in self.binds.iteritems(): + if isinstance(config, dict): + container_path = config['bind'] + if config['ro']: + mode = 'ro' + else: + mode = 'rw' + else: + container_path = config + mode = 'rw' + expected_binds.add("{}:{}:{}".format(host_path, container_path, mode)) + + actual_binds = set() + for bind in (container['HostConfig']['Binds'] or []): + if len(bind.split(':')) == 2: + actual_binds.add(bind + ":rw") + else: + actual_binds.add(bind) + + if actual_binds != expected_binds: + self.reload_reasons.append('binds ({} => {})'.format(actual_binds, expected_binds)) + differing.append(container) + continue + + # PORT BINDINGS + + expected_bound_ports = {} + if self.port_bindings: + for container_port, config in self.port_bindings.iteritems(): + if isinstance(container_port, int): + container_port = "{}/tcp".format(container_port) + bind = {} + if len(config) == 1: + bind['HostIp'] = "0.0.0.0" + bind['HostPort'] = "" + else: + bind['HostIp'] = config[0] + bind['HostPort'] = str(config[1]) + + expected_bound_ports[container_port] = [bind] + + actual_bound_ports = container['HostConfig']['PortBindings'] or {} + + if actual_bound_ports != expected_bound_ports: + self.reload_reasons.append('port bindings ({} => {})'.format(actual_bound_ports, expected_bound_ports)) + differing.append(container) + continue + + # PUBLISHING ALL PORTS + + # What we really care about is the set of ports that is actually + # published. That should be caught above. + + # PRIVILEGED + + expected_privileged = self.module.params.get('privileged') + actual_privileged = container['HostConfig']['Privileged'] + if actual_privileged != expected_privileged: + self.reload_reasons.append('privileged ({} => {})'.format(actual_privileged, expected_privileged)) + differing.append(container) + continue + + # LINKS + + expected_links = set() + for link, alias in (self.links or {}).iteritems(): + expected_links.add("/{}:/running/{}".format(link, alias)) + + actual_links = set(container['HostConfig']['Links'] or []) + if actual_links != expected_links: + self.reload_reasons.append('links ({} => {})'.format(actual_links, expected_links)) + differing.append(container) + continue + + # NETWORK MODE + + expected_netmode = self.module.params.get('net') or '' + actual_netmode = container['HostConfig']['NetworkMode'] + if actual_netmode != expected_netmode: + self.reload_reasons.append('net ({} => {})'.format(actual_netmode, expected_netmode)) + differing.append(container) + continue + + # DNS + + expected_dns = set(self.module.params.get('dns') or []) + actual_dns = set(container['HostConfig']['Dns'] or []) + if actual_dns != expected_dns: + self.reload_reasons.append('dns ({} => {})'.format(actual_dns, expected_dns)) + differing.append(container) + continue + + # VOLUMES_FROM + + expected_volumes_from = set(self.module.params.get('volumes_from') or []) + actual_volumes_from = set(container['HostConfig']['VolumesFrom'] or []) + if actual_volumes_from != expected_volumes_from: + self.reload_reasons.append('volumes_from ({} => {})'.format(actual_volumes_from, expected_volumes_from)) + differing.append(container) + + return differing + def get_deployed_containers(self): - """determine which images/commands are running already""" - image = self.module.params.get('image') + """ + Return any matching containers that are already present. + """ + command = self.module.params.get('command') if command: command = command.strip() @@ -665,37 +983,41 @@ class DockerManager(object): name = '/' + name deployed = [] - # if we weren't given a tag with the image, we need to only compare on the image name, as that - # docker will give us back the full image name including a tag in the container list if one exists. - image, tag = get_split_image_tag(image) + # "images" will be a collection of equivalent "name:tag" image names + # that map to the same Docker image. + inspected = self.get_inspect_image() + if inspected: + images = inspected.get('RepoTags', []) + else: + image, tag = get_split_image_tag(self.module.params.get('image')) + images = [':'.join([image, tag])] for i in self.client.containers(all=True): - running_image, running_tag = get_split_image_tag(i['Image']) + running_image = i['Image'] running_command = i['Command'].strip() + match = False - name_matches = False - if i["Names"]: - name_matches = (name and name in i['Names']) - image_matches = (running_image == image) - tag_matches = (not tag or running_tag == tag) - # if a container has an entrypoint, `command` will actually equal - # '{} {}'.format(entrypoint, command) - command_matches = (not command or running_command.endswith(command)) + if name: + matches = name in i.get('Names', []) + else: + image_matches = running_image in images - if name_matches or (name is None and image_matches and tag_matches and command_matches): + # if a container has an entrypoint, `command` will actually equal + # '{} {}'.format(entrypoint, command) + command_matches = (not command or running_command.endswith(command)) + + matches = image_matches and command_matches + + if matches: details = self.client.inspect_container(i['Id']) details = _docker_id_quirk(details) + deployed.append(details) return deployed def get_running_containers(self): - running = [] - for i in self.get_deployed_containers(): - if i['State']['Running'] == True and i['State'].get('Ghost', False) == False: - running.append(i) - - return running + return [c for c in self.get_deployed_containers() if is_running(c)] def pull_image(self): extra_params = {} @@ -713,8 +1035,8 @@ class DockerManager(object): email=self.module.params.get('email'), registry=self.module.params.get('registry') ) - except: - self.module.fail_json(msg="failed to login to the remote registry, check your username/password.") + except e: + self.module.fail_json(msg="failed to login to the remote registry, check your username/password.", error=repr(e)) try: last = None for line in self.client.pull(image, tag=tag, stream=True, **extra_params): @@ -725,12 +1047,12 @@ class DockerManager(object): pass elif status.startswith('Status: Downloaded newer image for'): # Image was updated. Increment the pull counter. - self.increment_counter('pull') + self.increment_counter('pulled') else: # Unrecognized status string. - self.module.fail_json(msg="Unrecognized status from pull", status=status) - except: - self.module.fail_json(msg="failed to pull the specified image: %s" % resource) + self.module.fail_json(msg="Unrecognized status from pull.", status=status) + except e: + self.module.fail_json(msg="Failed to pull the specified image: %s" % resource, error=repr(e)) def create_containers(self, count=1): params = {'image': self.module.params.get('image'), @@ -776,7 +1098,7 @@ class DockerManager(object): 'binds': self.binds, 'port_bindings': self.port_bindings, 'publish_all_ports': self.module.params.get('publish_all_ports'), - 'privileged': self.module.params.get('privileged'), + 'privileged': self.module.params.get('privileged'), 'links': self.links, 'network_mode': self.module.params.get('net'), } @@ -826,6 +1148,129 @@ class DockerManager(object): self.increment_counter('restarted') +class ContainerSet: + + def __init__(self, manager): + self.manager = manager + self.running = [] + self.deployed = [] + self.changed = [] + + def refresh(self): + ''' + Update our view of the matching containers from the Docker daemon. + ''' + + + self.deployed = self.manager.get_deployed_containers() + self.running = [c for c in self.deployed if is_running(c)] + + def notice_changed(self, containers): + ''' + Record a collection of containers as "changed". + ''' + + self.changed.extend(containers) + + +def present(manager, containers, count, name): + '''Ensure that exactly `count` matching containers exist in any state.''' + + containers.refresh() + delta = count - len(containers.deployed) + + if delta > 0: + containers.notice_changed(manager.create_containers(delta)) + + if delta < 0: + # If both running and stopped containers exist, remove + # stopped containers first. + containers.deployed.sort(lambda cx, cy: cmp(is_running(cx), is_running(cy))) + + to_stop = [] + to_remove = [] + for c in containers.deployed[0:-delta]: + if is_running(c): + to_stop.append(c) + to_remove.append(c) + + manager.stop_containers(to_stop) + manager.remove_containers(to_remove) + containers.notice_changed(to_remove) + +def started(manager, containers, count, name): + '''Ensure that exactly `count` matching containers exist and are running.''' + + containers.refresh() + delta = count - len(containers.running) + + if delta > 0: + if name and containers.deployed: + # A stopped container exists with the requested name. + # Clean it up before attempting to start a new one. + manager.remove_containers(containers.deployed) + + created = manager.create_containers(delta) + manager.start_containers(created) + containers.notice_changed(created) + + if delta < 0: + excess = containers.running[0:-delta] + manager.stop_containers(excess) + manager.remove_containers(excess) + containers.notice_changed(excess) + +def reloaded(manager, containers, count, name): + ''' + Ensure that exactly `count` matching containers exist and are + running. If any associated settings have been changed (volumes, + ports or so on), restart those containers. + ''' + + containers.refresh() + + for container in manager.get_differing_containers(): + manager.stop_containers([container]) + manager.remove_containers([container]) + + started(manager, containers, count, name) + +def restarted(manager, containers, count, name): + ''' + Ensure that exactly `count` matching containers exist and are + running. Unconditionally restart any that were already running. + ''' + + containers.refresh() + + manager.restart_containers(containers.running) + started(manager, containers, count, name) + +def stopped(manager, containers, count, name): + '''Stop any matching containers that are running.''' + + containers.refresh() + + manager.stop_containers(containers.running) + containers.notice_changed(containers.running) + +def killed(manager, containers, count, name): + '''Kill any matching containers that are running.''' + + containers.refresh() + + manager.kill_containers(containers.running) + containers.notice_changed(containers.running) + +def absent(manager, containers, count, name): + '''Stop and remove any matching containers.''' + + containers.refresh() + + manager.stop_containers(containers.running) + manager.remove_containers(containers.deployed) + containers.notice_changed(containers.deployed) + def main(): module = AnsibleModule( argument_spec = dict( @@ -852,7 +1297,7 @@ def main(): env = dict(type='dict'), dns = dict(), detach = dict(default=True, type='bool'), - state = dict(default='running', choices=['absent', 'present', 'running', 'stopped', 'killed', 'restarted']), + state = dict(default='started', choices=['present', 'started', 'reloaded', 'restarted', 'stopped', 'killed', 'absent']), restart_policy = dict(default=None, choices=['always', 'on-failure', 'no']), restart_policy_retry = dict(default=0, type='int'), debug = dict(default=False, type='bool'), @@ -878,6 +1323,7 @@ def main(): if count < 0: module.fail_json(msg="Count must be greater than zero") + if count > 1 and name: module.fail_json(msg="Count and name must not be used together") @@ -887,101 +1333,39 @@ def main(): if pull == "always": manager.pull_image() - # Find the ID of the requested image and tag, if available. - image_id = None - inspected_image = manager.get_inspect_image() - if inspected_image: - image_id = inspected_image.get('Id') - - running_containers = manager.get_running_containers() - running_count = len(running_containers) - delta = count - running_count - deployed_containers = manager.get_deployed_containers() - facts = None + containers = ContainerSet(manager) failed = False - changed = False - # start/stop containers - if state in [ "running", "present" ]: + if state == 'present': + present(manager, containers, count, name) + elif state == 'started': + started(manager, containers, count, name) + elif state == 'reloaded': + reloaded(manager, containers, count, name) + elif state == 'restarted': + restarted(manager, containers, count, name) + elif state == 'stopped': + stopped(manager, containers, count, name) + elif state == 'killed': + killed(manager, containers, count, name) + elif state == 'absent': + absent(manager, containers, count, name) + else: + module.fail_json(msg='Unrecognized state %s. Must be one of: ' + 'present; started; reloaded; restarted; ' + 'stopped; killed; absent.' % state) - # make sure a container with `name` exists, if not create and start it - if name: - # first determine if a container with this name exists - existing_container = None - for deployed_container in deployed_containers: - if deployed_container.get('Name') == '/%s' % name: - existing_container = deployed_container - break + module.exit_json(changed=manager.has_changed(), + msg=manager.get_summary_message(), + summary=manager.counters, + containers=containers.changed, + reload_reasons=manager.get_reload_reason_message()) - # the named container is running, but with a - # different image or tag, so we stop it first - if existing_container and (image_id is None or existing_container.get('Image') != image_id): - manager.stop_containers([existing_container]) - manager.remove_containers([existing_container]) - running_containers = manager.get_running_containers() - deployed_containers = manager.get_deployed_containers() - existing_container = None + except DockerAPIError as e: + module.fail_json(changed=manager.has_changed(), msg="Docker API Error: %s" % e.explanation) - # if the container isn't running (or if we stopped the - # old version above), create and (maybe) start it up now - if not existing_container: - containers = manager.create_containers(1) - if state == "present": # otherwise it get (re)started later anyways.. - manager.start_containers(containers) - running_containers = manager.get_running_containers() - deployed_containers = manager.get_deployed_containers() - - if state == "running": - # make sure a container with `name` is running - if name and "/" + name not in map(lambda x: x.get('Name'), running_containers): - manager.start_containers(deployed_containers) - - # start more containers if we don't have enough - elif delta > 0: - containers = manager.create_containers(delta) - manager.start_containers(containers) - - # stop containers if we have too many - elif delta < 0: - containers_to_stop = running_containers[0:abs(delta)] - containers = manager.stop_containers(containers_to_stop) - manager.remove_containers(containers_to_stop) - - facts = manager.get_running_containers() - else: - facts = manager.get_deployed_containers() - - # stop and remove containers - elif state == "absent": - facts = manager.stop_containers(deployed_containers) - manager.remove_containers(deployed_containers) - - # stop containers - elif state == "stopped": - facts = manager.stop_containers(running_containers) - - # kill containers - elif state == "killed": - manager.kill_containers(running_containers) - - # restart containers - elif state == "restarted": - manager.restart_containers(running_containers) - facts = manager.get_inspect_containers(running_containers) - - msg = "%s container(s) running image %s with command %s" % \ - (manager.get_summary_counters_msg(), module.params.get('image'), module.params.get('command')) - changed = manager.has_changed() - - module.exit_json(failed=failed, changed=changed, msg=msg, ansible_facts=_ansible_facts(facts)) - - except DockerAPIError, e: - changed = manager.has_changed() - module.exit_json(failed=True, changed=changed, msg="Docker API error: " + e.explanation) - - except RequestException, e: - changed = manager.has_changed() - module.exit_json(failed=True, changed=changed, msg=repr(e)) + except RequestException as e: + module.fail_json(changed=manager.has_changed(), msg=repr(e)) # import module snippets from ansible.module_utils.basic import *