From e9f7fb092754e6a2aa1c540bc39d963594b141dd Mon Sep 17 00:00:00 2001 From: Allen Sanabria Date: Thu, 14 Jul 2016 16:37:06 -0700 Subject: [PATCH] Now when number of shards is different than what is the stream currently, it will fail.\n\nShards can not be changed on an already created stream --- cloud/amazon/kinesis_stream.py | 79 +++++++++++++++++++++------------- 1 file changed, 48 insertions(+), 31 deletions(-) diff --git a/cloud/amazon/kinesis_stream.py b/cloud/amazon/kinesis_stream.py index a9139053894..1ba25e69860 100644 --- a/cloud/amazon/kinesis_stream.py +++ b/cloud/amazon/kinesis_stream.py @@ -147,7 +147,6 @@ tags: ''' try: - import boto import botocore import boto3 HAS_BOTO3 = True @@ -285,20 +284,18 @@ def get_tags(client, stream_name, check_mode=False): }, ] success = True - except botocore.exceptions.ClientError, e: + except botocore.exceptions.ClientError as e: err_msg = str(e) return success, err_msg, results -def find_stream(client, stream_name, limit=1, check_mode=False): +def find_stream(client, stream_name, check_mode=False): """Retrieve a Kinesis Stream. Args: client (botocore.client.EC2): Boto3 client. stream_name (str): Name of the Kinesis stream. Kwargs: - limit (int): Limit the number of shards to return within a stream. - default=1 check_mode (bool): This will pass DryRun as one of the parameters to the aws api. default=False @@ -313,15 +310,20 @@ def find_stream(client, stream_name, limit=1, check_mode=False): success = False params = { 'StreamName': stream_name, - 'Limit': limit } results = dict() + has_more_shards = True + shards = list() try: if not check_mode: - results = ( - client.describe_stream(**params)['StreamDescription'] - ) - results.pop('Shards') + while has_more_shards: + results = ( + client.describe_stream(**params)['StreamDescription'] + ) + shards.extend(results.pop('Shards')) + has_more_shards = results['HasMoreShards'] + results['Shards'] = shards + results['ShardsCount'] = len(shards) else: results = { 'HasMoreShards': True, @@ -331,7 +333,7 @@ def find_stream(client, stream_name, limit=1, check_mode=False): 'StreamStatus': 'ACTIVE' } success = True - except botocore.exceptions.ClientError, e: + except botocore.exceptions.ClientError as e: err_msg = str(e) return success, err_msg, results @@ -391,6 +393,8 @@ def wait_for_status(client, stream_name, status, wait_timeout=300, if not status_achieved: err_msg = "Wait time out reached, while waiting for results" + else: + err_msg = "Status {0} achieved successfully".format(status) return status_achieved, err_msg, stream @@ -442,7 +446,7 @@ def tags_action(client, stream_name, tags, action='create', check_mode=False): else: err_msg = 'Invalid action {0}'.format(action) - except botocore.exceptions.ClientError, e: + except botocore.exceptions.ClientError as e: err_msg = str(e) return success, err_msg @@ -500,6 +504,7 @@ def update_tags(client, stream_name, tags, check_mode=False): Tuple (bool, str) """ success = False + changed = False err_msg = '' tag_success, tag_msg, current_tags = ( get_tags(client, stream_name, check_mode=check_mode) @@ -536,13 +541,13 @@ def update_tags(client, stream_name, tags, check_mode=False): ) ) if not delete_success: - return delete_success, delete_msg + return delete_success, changed, delete_msg if tags_to_update: tags = make_tags_in_proper_format( recreate_tags_from_list(tags_to_update) ) else: - return True, 'Tags do not need to be updated' + return True, changed, 'Tags do not need to be updated' if tags: create_success, create_msg = ( @@ -551,9 +556,11 @@ def update_tags(client, stream_name, tags, check_mode=False): check_mode=check_mode ) ) - return create_success, create_msg + if create_success: + changed = True + return create_success, changed, create_msg - return success, err_msg + return success, changed, err_msg def stream_action(client, stream_name, shard_count=1, action='create', timeout=300, check_mode=False): @@ -603,7 +610,7 @@ def stream_action(client, stream_name, shard_count=1, action='create', else: err_msg = 'Invalid action {0}'.format(action) - except botocore.exceptions.ClientError, e: + except botocore.exceptions.ClientError as e: err_msg = str(e) return success, err_msg @@ -644,10 +651,18 @@ def retention_action(client, stream_name, retention_period=24, params['RetentionPeriodHours'] = retention_period client.increase_stream_retention_period(**params) success = True + err_msg = ( + 'Retention Period increased successfully to {0}' + .format(retention_period) + ) elif action == 'decrease': params['RetentionPeriodHours'] = retention_period client.decrease_stream_retention_period(**params) success = True + err_msg = ( + 'Retention Period decreased successfully to {0}' + .format(retention_period) + ) else: err_msg = 'Invalid action {0}'.format(action) else: @@ -658,7 +673,7 @@ def retention_action(client, stream_name, retention_period=24, else: err_msg = 'Invalid action {0}'.format(action) - except botocore.exceptions.ClientError, e: + except botocore.exceptions.ClientError as e: err_msg = str(e) return success, err_msg @@ -698,7 +713,7 @@ def update(client, current_stream, stream_name, retention_period=None, Returns: Tuple (bool, bool, str) """ - success = False + success = True changed = False err_msg = '' if retention_period: @@ -710,9 +725,10 @@ def update(client, current_stream, stream_name, retention_period=None, ) ) if not wait_success: - return wait_success, True, wait_msg + return wait_success, False, wait_msg if current_stream['StreamStatus'] == 'ACTIVE': + retention_changed = False if retention_period > current_stream['RetentionPeriodHours']: retention_changed, retention_msg = ( retention_action( @@ -720,8 +736,6 @@ def update(client, current_stream, stream_name, retention_period=None, check_mode=check_mode ) ) - if retention_changed: - success = True elif retention_period < current_stream['RetentionPeriodHours']: retention_changed, retention_msg = ( @@ -730,11 +744,8 @@ def update(client, current_stream, stream_name, retention_period=None, check_mode=check_mode ) ) - if retention_changed: - success = True elif retention_period == current_stream['RetentionPeriodHours']: - retention_changed = False retention_msg = ( 'Retention {0} is the same as {1}' .format( @@ -744,7 +755,10 @@ def update(client, current_stream, stream_name, retention_period=None, ) success = True - changed = retention_changed + if retention_changed: + success = True + changed = True + err_msg = retention_msg if changed and wait: wait_success, wait_msg, current_stream = ( @@ -754,7 +768,7 @@ def update(client, current_stream, stream_name, retention_period=None, ) ) if not wait_success: - return wait_success, True, wait_msg + return wait_success, False, wait_msg elif changed and not wait: stream_found, stream_msg, current_stream = ( find_stream(client, stream_name, check_mode=check_mode) @@ -774,11 +788,9 @@ def update(client, current_stream, stream_name, retention_period=None, return success, changed, err_msg if tags: - changed, err_msg = ( + _, _, err_msg = ( update_tags(client, stream_name, tags, check_mode=check_mode) ) - if changed: - success = True if wait: success, err_msg, _ = ( wait_for_status( @@ -832,6 +844,11 @@ def create_stream(client, stream_name, number_of_shards=1, retention_period=None stream_found, stream_msg, current_stream = ( find_stream(client, stream_name, check_mode=check_mode) ) + if stream_found: + if current_stream['ShardsCount'] != number_of_shards: + err_msg = 'Can not change the number of shards in a Kinesis Stream' + return success, changed, err_msg, results + if stream_found and current_stream['StreamStatus'] == 'DELETING' and wait: wait_success, wait_msg, current_stream = ( wait_for_status( @@ -1031,7 +1048,7 @@ def main(): region=region, endpoint=ec2_url, **aws_connect_kwargs ) ) - except botocore.exceptions.ClientError, e: + except botocore.exceptions.ClientError as e: err_msg = 'Boto3 Client Error - {0}'.format(str(e.msg)) module.fail_json( success=False, changed=False, result={}, msg=err_msg