Merge pull request #1030 from jsmartin/asg_rolling_optimizations
ec2_asg: Streamlined rolling udpate algorithm.
This commit is contained in:
commit
85e9ccd17b
1 changed files with 227 additions and 61 deletions
|
@ -190,9 +190,13 @@ to "replace_instances":
|
|||
'''
|
||||
|
||||
import time
|
||||
import logging as log
|
||||
|
||||
from ansible.module_utils.basic import *
|
||||
from ansible.module_utils.ec2 import *
|
||||
log.getLogger('boto').setLevel(log.CRITICAL)
|
||||
#log.basicConfig(filename='/tmp/ansible_ec2_asg.log',level=log.DEBUG, format='%(asctime)s: %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
|
||||
|
||||
|
||||
try:
|
||||
import boto.ec2.autoscale
|
||||
|
@ -265,8 +269,71 @@ def get_properties(autoscaling_group):
|
|||
if getattr(autoscaling_group, "tags", None):
|
||||
properties['tags'] = dict((t.key, t.value) for t in autoscaling_group.tags)
|
||||
|
||||
|
||||
return properties
|
||||
|
||||
def elb_dreg(asg_connection, module, group_name, instance_id):
|
||||
region, ec2_url, aws_connect_params = get_aws_connection_info(module)
|
||||
as_group = asg_connection.get_all_groups(names=[group_name])[0]
|
||||
wait_timeout = module.params.get('wait_timeout')
|
||||
props = get_properties(as_group)
|
||||
count = 1
|
||||
if as_group.load_balancers and as_group.health_check_type == 'ELB':
|
||||
try:
|
||||
elb_connection = connect_to_aws(boto.ec2.elb, region, **aws_connect_params)
|
||||
except boto.exception.NoAuthHandlerFound, e:
|
||||
module.fail_json(msg=str(e))
|
||||
else:
|
||||
return
|
||||
|
||||
exists = True
|
||||
for lb in as_group.load_balancers:
|
||||
elb_connection.deregister_instances(lb, instance_id)
|
||||
log.debug("De-registering {0} from ELB {1}".format(instance_id, lb))
|
||||
|
||||
wait_timeout = time.time() + wait_timeout
|
||||
while wait_timeout > time.time() and count > 0:
|
||||
count = 0
|
||||
for lb in as_group.load_balancers:
|
||||
lb_instances = elb_connection.describe_instance_health(lb)
|
||||
for i in lb_instances:
|
||||
if i.instance_id == instance_id and i.state == "InService":
|
||||
count += 1
|
||||
log.debug("{0}: {1}, {2}".format(i.instance_id, i.state, i.description))
|
||||
time.sleep(10)
|
||||
|
||||
if wait_timeout <= time.time():
|
||||
# waiting took too long
|
||||
module.fail_json(msg = "Waited too long for instance to deregister. {0}".format(time.asctime()))
|
||||
|
||||
|
||||
|
||||
|
||||
def elb_healthy(asg_connection, elb_connection, module, group_name):
|
||||
healthy_instances = []
|
||||
as_group = asg_connection.get_all_groups(names=[group_name])[0]
|
||||
props = get_properties(as_group)
|
||||
# get healthy, inservice instances from ASG
|
||||
instances = []
|
||||
for instance, settings in props['instance_facts'].items():
|
||||
if settings['lifecycle_state'] == 'InService' and settings['health_status'] == 'Healthy':
|
||||
instances.append(instance)
|
||||
log.debug("ASG considers the following instances InService and Healthy: {0}".format(instances))
|
||||
log.debug("ELB instance status:")
|
||||
for lb in as_group.load_balancers:
|
||||
# we catch a race condition that sometimes happens if the instance exists in the ASG
|
||||
# but has not yet show up in the ELB
|
||||
try:
|
||||
lb_instances = elb_connection.describe_instance_health(lb, instances=instances)
|
||||
except boto.exception.InvalidInstance, e:
|
||||
pass
|
||||
for i in lb_instances:
|
||||
if i.state == "InService":
|
||||
healthy_instances.append(i.instance_id)
|
||||
log.debug("{0}: {1}".format(i.instance_id, i.state))
|
||||
return len(healthy_instances)
|
||||
|
||||
|
||||
|
||||
def wait_for_elb(asg_connection, module, group_name):
|
||||
region, ec2_url, aws_connect_params = get_aws_connection_info(module)
|
||||
|
@ -277,36 +344,23 @@ def wait_for_elb(asg_connection, module, group_name):
|
|||
as_group = asg_connection.get_all_groups(names=[group_name])[0]
|
||||
|
||||
if as_group.load_balancers and as_group.health_check_type == 'ELB':
|
||||
log.debug("Waiting for ELB to consider intances healthy.")
|
||||
try:
|
||||
elb_connection = connect_to_aws(boto.ec2.elb, region, **aws_connect_params)
|
||||
except boto.exception.NoAuthHandlerFound, e:
|
||||
module.fail_json(msg=str(e))
|
||||
|
||||
wait_timeout = time.time() + wait_timeout
|
||||
healthy_instances = {}
|
||||
healthy_instances = elb_healthy(asg_connection, elb_connection, module, group_name)
|
||||
|
||||
while len(healthy_instances.keys()) < as_group.min_size and wait_timeout > time.time():
|
||||
as_group = asg_connection.get_all_groups(names=[group_name])[0]
|
||||
props = get_properties(as_group)
|
||||
# get healthy, inservice instances from ASG
|
||||
instances = []
|
||||
for instance, settings in props['instance_facts'].items():
|
||||
if settings['lifecycle_state'] == 'InService' and settings['health_status'] == 'Healthy':
|
||||
instances.append(instance)
|
||||
for lb in as_group.load_balancers:
|
||||
# we catch a race condition that sometimes happens if the instance exists in the ASG
|
||||
# but has not yet show up in the ELB
|
||||
try:
|
||||
lb_instances = elb_connection.describe_instance_health(lb, instances=instances)
|
||||
except boto.exception.InvalidInstance, e:
|
||||
pass
|
||||
for i in lb_instances:
|
||||
if i.state == "InService":
|
||||
healthy_instances[i.instance_id] = i.state
|
||||
while healthy_instances < as_group.min_size and wait_timeout > time.time():
|
||||
healthy_instances = elb_healthy(asg_connection, elb_connection, module, group_name)
|
||||
log.debug("ELB thinks {0} instances are healthy.".format(healthy_instances))
|
||||
time.sleep(10)
|
||||
if wait_timeout <= time.time():
|
||||
# waiting took too long
|
||||
module.fail_json(msg = "Waited too long for ELB instances to be healthy. %s" % time.asctime())
|
||||
log.debug("Waiting complete. ELB thinks {0} instances are healthy.".format(healthy_instances))
|
||||
|
||||
def create_autoscaling_group(connection, module):
|
||||
group_name = module.params.get('name')
|
||||
|
@ -364,7 +418,7 @@ def create_autoscaling_group(connection, module):
|
|||
try:
|
||||
connection.create_auto_scaling_group(ag)
|
||||
if wait_for_instances == True:
|
||||
wait_for_new_instances(module, connection, group_name, wait_timeout, desired_capacity, 'viable_instances')
|
||||
wait_for_new_inst(module, connection, group_name, wait_timeout, desired_capacity, 'viable_instances')
|
||||
wait_for_elb(connection, module, group_name)
|
||||
as_group = connection.get_all_groups(names=[group_name])[0]
|
||||
asg_properties = get_properties(as_group)
|
||||
|
@ -430,7 +484,7 @@ def create_autoscaling_group(connection, module):
|
|||
module.fail_json(msg=str(e))
|
||||
|
||||
if wait_for_instances == True:
|
||||
wait_for_new_instances(module, connection, group_name, wait_timeout, desired_capacity, 'viable_instances')
|
||||
wait_for_new_inst(module, connection, group_name, wait_timeout, desired_capacity, 'viable_instances')
|
||||
wait_for_elb(connection, module, group_name)
|
||||
try:
|
||||
as_group = connection.get_all_groups(names=[group_name])[0]
|
||||
|
@ -471,6 +525,15 @@ def get_chunks(l, n):
|
|||
for i in xrange(0, len(l), n):
|
||||
yield l[i:i+n]
|
||||
|
||||
def update_size(group, max_size, min_size, dc):
|
||||
|
||||
log.debug("setting ASG sizes")
|
||||
log.debug("minimum size: {0}, desired_capacity: {1}, max size: {2}".format(min_size, dc, max_size ))
|
||||
group.max_size = max_size
|
||||
group.min_size = min_size
|
||||
group.desired_capacity = dc
|
||||
group.update()
|
||||
|
||||
def replace(connection, module):
|
||||
batch_size = module.params.get('replace_batch_size')
|
||||
wait_timeout = module.params.get('wait_timeout')
|
||||
|
@ -478,91 +541,191 @@ def replace(connection, module):
|
|||
max_size = module.params.get('max_size')
|
||||
min_size = module.params.get('min_size')
|
||||
desired_capacity = module.params.get('desired_capacity')
|
||||
|
||||
# FIXME: we need some more docs about this feature
|
||||
lc_check = module.params.get('lc_check')
|
||||
replace_instances = module.params.get('replace_instances')
|
||||
|
||||
as_group = connection.get_all_groups(names=[group_name])[0]
|
||||
wait_for_new_instances(module, connection, group_name, wait_timeout, as_group.min_size, 'viable_instances')
|
||||
wait_for_new_inst(module, connection, group_name, wait_timeout, as_group.min_size, 'viable_instances')
|
||||
props = get_properties(as_group)
|
||||
instances = props['instances']
|
||||
replaceable = 0
|
||||
if replace_instances:
|
||||
instances = replace_instances
|
||||
for k in props['instance_facts'].keys():
|
||||
if k in instances:
|
||||
if props['instance_facts'][k]['launch_config_name'] != props['launch_config_name']:
|
||||
replaceable += 1
|
||||
if replaceable == 0:
|
||||
# check to see if instances are replaceable if checking launch configs
|
||||
|
||||
new_instances, old_instances = get_instances_by_lc(props, lc_check, instances)
|
||||
num_new_inst_needed = desired_capacity - len(new_instances)
|
||||
|
||||
if lc_check:
|
||||
if num_new_inst_needed == 0 and old_instances:
|
||||
log.debug("No new instances needed, but old instances are present. Removing old instances")
|
||||
terminate_batch(connection, module, old_instances, instances, True)
|
||||
as_group = connection.get_all_groups(names=[group_name])[0]
|
||||
props = get_properties(as_group)
|
||||
changed = True
|
||||
return(changed, props)
|
||||
|
||||
# we don't want to spin up extra instances if not necessary
|
||||
if num_new_inst_needed < batch_size:
|
||||
log.debug("Overriding batch size to {0}".format(num_new_inst_needed))
|
||||
batch_size = num_new_inst_needed
|
||||
|
||||
if not old_instances:
|
||||
changed = False
|
||||
return(changed, props)
|
||||
|
||||
# set temporary settings and wait for them to be reached
|
||||
# This should get overriden if the number of instances left is less than the batch size.
|
||||
|
||||
as_group = connection.get_all_groups(names=[group_name])[0]
|
||||
as_group.max_size = max_size + batch_size
|
||||
as_group.min_size = min_size + batch_size
|
||||
as_group.desired_capacity = desired_capacity + batch_size
|
||||
as_group.update()
|
||||
wait_for_new_instances(module, connection, group_name, wait_timeout, as_group.min_size, 'viable_instances')
|
||||
update_size(as_group, max_size + batch_size, min_size + batch_size, desired_capacity + batch_size)
|
||||
wait_for_new_inst(module, connection, group_name, wait_timeout, as_group.min_size, 'viable_instances')
|
||||
wait_for_elb(connection, module, group_name)
|
||||
as_group = connection.get_all_groups(names=[group_name])[0]
|
||||
props = get_properties(as_group)
|
||||
instances = props['instances']
|
||||
if replace_instances:
|
||||
instances = replace_instances
|
||||
log.debug("beginning main loop")
|
||||
for i in get_chunks(instances, batch_size):
|
||||
terminate_batch(connection, module, i)
|
||||
wait_for_new_instances(module, connection, group_name, wait_timeout, as_group.min_size, 'viable_instances')
|
||||
# break out of this loop if we have enough new instances
|
||||
break_early, desired_size, term_instances = terminate_batch(connection, module, i, instances, False)
|
||||
wait_for_term_inst(connection, module, term_instances)
|
||||
wait_for_new_inst(module, connection, group_name, wait_timeout, desired_size, 'viable_instances')
|
||||
wait_for_elb(connection, module, group_name)
|
||||
as_group = connection.get_all_groups(names=[group_name])[0]
|
||||
# return settings to normal
|
||||
as_group.max_size = max_size
|
||||
as_group.min_size = min_size
|
||||
as_group.desired_capacity = desired_capacity
|
||||
as_group.update()
|
||||
if break_early:
|
||||
log.debug("breaking loop")
|
||||
break
|
||||
update_size(as_group, max_size, min_size, desired_capacity)
|
||||
as_group = connection.get_all_groups(names=[group_name])[0]
|
||||
asg_properties = get_properties(as_group)
|
||||
log.debug("Rolling update complete.")
|
||||
changed=True
|
||||
return(changed, asg_properties)
|
||||
|
||||
def terminate_batch(connection, module, replace_instances):
|
||||
group_name = module.params.get('name')
|
||||
wait_timeout = int(module.params.get('wait_timeout'))
|
||||
lc_check = module.params.get('lc_check')
|
||||
def get_instances_by_lc(props, lc_check, initial_instances):
|
||||
|
||||
as_group = connection.get_all_groups(names=[group_name])[0]
|
||||
props = get_properties(as_group)
|
||||
new_instances = []
|
||||
old_instances = []
|
||||
# old instances are those that have the old launch config
|
||||
if lc_check:
|
||||
for i in props['instances']:
|
||||
if props['instance_facts'][i]['launch_config_name'] == props['launch_config_name']:
|
||||
new_instances.append(i)
|
||||
else:
|
||||
old_instances.append(i)
|
||||
|
||||
else:
|
||||
log.debug("Comparing initial instances with current: {0}".format(initial_instances))
|
||||
for i in props['instances']:
|
||||
if i not in initial_instances:
|
||||
new_instances.append(i)
|
||||
else:
|
||||
old_instances.append(i)
|
||||
log.debug("New instances: {0}, {1}".format(len(new_instances), new_instances))
|
||||
log.debug("Old instances: {0}, {1}".format(len(old_instances), old_instances))
|
||||
|
||||
return new_instances, old_instances
|
||||
|
||||
|
||||
def list_purgeable_instances(props, lc_check, replace_instances, initial_instances):
|
||||
instances_to_terminate = []
|
||||
instances = ( inst_id for inst_id in replace_instances if inst_id in props['instances'])
|
||||
|
||||
# check to make sure instances given are actually in the given ASG
|
||||
# and they have a non-current launch config
|
||||
old_instances = []
|
||||
instances = ( inst_id for inst_id in replace_instances if inst_id in props['instances'])
|
||||
|
||||
if lc_check:
|
||||
for i in instances:
|
||||
if props['instance_facts'][i]['launch_config_name'] != props['launch_config_name']:
|
||||
old_instances.append(i)
|
||||
instances_to_terminate.append(i)
|
||||
else:
|
||||
old_instances = instances
|
||||
for i in instances:
|
||||
if i in initial_instances:
|
||||
instances_to_terminate.append(i)
|
||||
return instances_to_terminate
|
||||
|
||||
# set all instances given to unhealthy
|
||||
for instance_id in old_instances:
|
||||
connection.set_instance_health(instance_id,'Unhealthy')
|
||||
def terminate_batch(connection, module, replace_instances, initial_instances, leftovers=False):
|
||||
batch_size = module.params.get('replace_batch_size')
|
||||
min_size = module.params.get('min_size')
|
||||
desired_capacity = module.params.get('desired_capacity')
|
||||
group_name = module.params.get('name')
|
||||
wait_timeout = int(module.params.get('wait_timeout'))
|
||||
lc_check = module.params.get('lc_check')
|
||||
decrement_capacity = False
|
||||
break_loop = False
|
||||
|
||||
as_group = connection.get_all_groups(names=[group_name])[0]
|
||||
props = get_properties(as_group)
|
||||
desired_size = as_group.min_size
|
||||
|
||||
new_instances, old_instances = get_instances_by_lc(props, lc_check, initial_instances)
|
||||
num_new_inst_needed = desired_capacity - len(new_instances)
|
||||
|
||||
# check to make sure instances given are actually in the given ASG
|
||||
# and they have a non-current launch config
|
||||
instances_to_terminate = list_purgeable_instances(props, lc_check, replace_instances, initial_instances)
|
||||
|
||||
log.debug("new instances needed: {0}".format(num_new_inst_needed))
|
||||
log.debug("new instances: {0}".format(new_instances))
|
||||
log.debug("old instances: {0}".format(old_instances))
|
||||
log.debug("batch instances: {0}".format(",".join(instances_to_terminate)))
|
||||
|
||||
if num_new_inst_needed == 0:
|
||||
decrement_capacity = True
|
||||
if as_group.min_size != min_size:
|
||||
as_group.min_size = min_size
|
||||
as_group.update()
|
||||
log.debug("Updating minimum size back to original of {0}".format(min_size))
|
||||
#if are some leftover old instances, but we are already at capacity with new ones
|
||||
# we don't want to decrement capacity
|
||||
if leftovers:
|
||||
decrement_capacity = False
|
||||
break_loop = True
|
||||
instances_to_terminate = old_instances
|
||||
desired_size = min_size
|
||||
log.debug("No new instances needed")
|
||||
|
||||
if num_new_inst_needed < batch_size and num_new_inst_needed !=0 :
|
||||
instances_to_terminate = instances_to_terminate[:num_new_inst_needed]
|
||||
decrement_capacity = False
|
||||
break_loop = False
|
||||
log.debug("{0} new instances needed".format(num_new_inst_needed))
|
||||
|
||||
log.debug("decrementing capacity: {0}".format(decrement_capacity))
|
||||
|
||||
for instance_id in instances_to_terminate:
|
||||
elb_dreg(connection, module, group_name, instance_id)
|
||||
log.debug("terminating instance: {0}".format(instance_id))
|
||||
connection.terminate_instance(instance_id, decrement_capacity=decrement_capacity)
|
||||
|
||||
# we wait to make sure the machines we marked as Unhealthy are
|
||||
# no longer in the list
|
||||
|
||||
return break_loop, desired_size, instances_to_terminate
|
||||
|
||||
|
||||
def wait_for_term_inst(connection, module, term_instances):
|
||||
|
||||
batch_size = module.params.get('replace_batch_size')
|
||||
wait_timeout = module.params.get('wait_timeout')
|
||||
group_name = module.params.get('name')
|
||||
lc_check = module.params.get('lc_check')
|
||||
as_group = connection.get_all_groups(names=[group_name])[0]
|
||||
props = get_properties(as_group)
|
||||
count = 1
|
||||
wait_timeout = time.time() + wait_timeout
|
||||
while wait_timeout > time.time() and count > 0:
|
||||
log.debug("waiting for instances to terminate")
|
||||
count = 0
|
||||
as_group = connection.get_all_groups(names=[group_name])[0]
|
||||
props = get_properties(as_group)
|
||||
instance_facts = props['instance_facts']
|
||||
instances = ( i for i in instance_facts if i in old_instances)
|
||||
instances = ( i for i in instance_facts if i in term_instances)
|
||||
for i in instances:
|
||||
if ( instance_facts[i]['lifecycle_state'] == 'Terminating'
|
||||
or instance_facts[i]['health_status'] == 'Unhealthy' ):
|
||||
lifecycle = instance_facts[i]['lifecycle_state']
|
||||
health = instance_facts[i]['health_status']
|
||||
log.debug("Instance {0} has state of {1},{2}".format(i,lifecycle,health ))
|
||||
if lifecycle == 'Terminating' or healthy == 'Unhealthy':
|
||||
count += 1
|
||||
time.sleep(10)
|
||||
|
||||
|
@ -570,21 +733,24 @@ def terminate_batch(connection, module, replace_instances):
|
|||
# waiting took too long
|
||||
module.fail_json(msg = "Waited too long for old instances to terminate. %s" % time.asctime())
|
||||
|
||||
def wait_for_new_instances(module, connection, group_name, wait_timeout, desired_size, prop):
|
||||
|
||||
def wait_for_new_inst(module, connection, group_name, wait_timeout, desired_size, prop):
|
||||
|
||||
# make sure we have the latest stats after that last loop.
|
||||
as_group = connection.get_all_groups(names=[group_name])[0]
|
||||
props = get_properties(as_group)
|
||||
log.debug("Waiting for {0} = {1}, currently {2}".format(prop, desired_size, props[prop]))
|
||||
# now we make sure that we have enough instances in a viable state
|
||||
wait_timeout = time.time() + wait_timeout
|
||||
while wait_timeout > time.time() and desired_size > props[prop]:
|
||||
log.debug("Waiting for {0} = {1}, currently {2}".format(prop, desired_size, props[prop]))
|
||||
time.sleep(10)
|
||||
as_group = connection.get_all_groups(names=[group_name])[0]
|
||||
props = get_properties(as_group)
|
||||
if wait_timeout <= time.time():
|
||||
# waiting took too long
|
||||
module.fail_json(msg = "Waited too long for new instances to become viable. %s" % time.asctime())
|
||||
|
||||
log.debug("Reached {0}: {1}".format(prop, desired_size))
|
||||
return props
|
||||
|
||||
def main():
|
||||
|
|
Loading…
Reference in a new issue