Now when number of shards is different than what is the stream currently, it will fail.\n\nShards can not be changed on an already created stream
This commit is contained in:
parent
11f4aa6538
commit
14476c0e8b
1 changed files with 48 additions and 31 deletions
|
@ -147,7 +147,6 @@ tags:
|
|||
'''
|
||||
|
||||
try:
|
||||
import boto
|
||||
import botocore
|
||||
import boto3
|
||||
HAS_BOTO3 = True
|
||||
|
@ -285,20 +284,18 @@ def get_tags(client, stream_name, check_mode=False):
|
|||
},
|
||||
]
|
||||
success = True
|
||||
except botocore.exceptions.ClientError, e:
|
||||
except botocore.exceptions.ClientError as e:
|
||||
err_msg = str(e)
|
||||
|
||||
return success, err_msg, results
|
||||
|
||||
def find_stream(client, stream_name, limit=1, check_mode=False):
|
||||
def find_stream(client, stream_name, check_mode=False):
|
||||
"""Retrieve a Kinesis Stream.
|
||||
Args:
|
||||
client (botocore.client.EC2): Boto3 client.
|
||||
stream_name (str): Name of the Kinesis stream.
|
||||
|
||||
Kwargs:
|
||||
limit (int): Limit the number of shards to return within a stream.
|
||||
default=1
|
||||
check_mode (bool): This will pass DryRun as one of the parameters to the aws api.
|
||||
default=False
|
||||
|
||||
|
@ -313,15 +310,20 @@ def find_stream(client, stream_name, limit=1, check_mode=False):
|
|||
success = False
|
||||
params = {
|
||||
'StreamName': stream_name,
|
||||
'Limit': limit
|
||||
}
|
||||
results = dict()
|
||||
has_more_shards = True
|
||||
shards = list()
|
||||
try:
|
||||
if not check_mode:
|
||||
while has_more_shards:
|
||||
results = (
|
||||
client.describe_stream(**params)['StreamDescription']
|
||||
)
|
||||
results.pop('Shards')
|
||||
shards.extend(results.pop('Shards'))
|
||||
has_more_shards = results['HasMoreShards']
|
||||
results['Shards'] = shards
|
||||
results['ShardsCount'] = len(shards)
|
||||
else:
|
||||
results = {
|
||||
'HasMoreShards': True,
|
||||
|
@ -331,7 +333,7 @@ def find_stream(client, stream_name, limit=1, check_mode=False):
|
|||
'StreamStatus': 'ACTIVE'
|
||||
}
|
||||
success = True
|
||||
except botocore.exceptions.ClientError, e:
|
||||
except botocore.exceptions.ClientError as e:
|
||||
err_msg = str(e)
|
||||
|
||||
return success, err_msg, results
|
||||
|
@ -391,6 +393,8 @@ def wait_for_status(client, stream_name, status, wait_timeout=300,
|
|||
|
||||
if not status_achieved:
|
||||
err_msg = "Wait time out reached, while waiting for results"
|
||||
else:
|
||||
err_msg = "Status {0} achieved successfully".format(status)
|
||||
|
||||
return status_achieved, err_msg, stream
|
||||
|
||||
|
@ -442,7 +446,7 @@ def tags_action(client, stream_name, tags, action='create', check_mode=False):
|
|||
else:
|
||||
err_msg = 'Invalid action {0}'.format(action)
|
||||
|
||||
except botocore.exceptions.ClientError, e:
|
||||
except botocore.exceptions.ClientError as e:
|
||||
err_msg = str(e)
|
||||
|
||||
return success, err_msg
|
||||
|
@ -500,6 +504,7 @@ def update_tags(client, stream_name, tags, check_mode=False):
|
|||
Tuple (bool, str)
|
||||
"""
|
||||
success = False
|
||||
changed = False
|
||||
err_msg = ''
|
||||
tag_success, tag_msg, current_tags = (
|
||||
get_tags(client, stream_name, check_mode=check_mode)
|
||||
|
@ -536,13 +541,13 @@ def update_tags(client, stream_name, tags, check_mode=False):
|
|||
)
|
||||
)
|
||||
if not delete_success:
|
||||
return delete_success, delete_msg
|
||||
return delete_success, changed, delete_msg
|
||||
if tags_to_update:
|
||||
tags = make_tags_in_proper_format(
|
||||
recreate_tags_from_list(tags_to_update)
|
||||
)
|
||||
else:
|
||||
return True, 'Tags do not need to be updated'
|
||||
return True, changed, 'Tags do not need to be updated'
|
||||
|
||||
if tags:
|
||||
create_success, create_msg = (
|
||||
|
@ -551,9 +556,11 @@ def update_tags(client, stream_name, tags, check_mode=False):
|
|||
check_mode=check_mode
|
||||
)
|
||||
)
|
||||
return create_success, create_msg
|
||||
if create_success:
|
||||
changed = True
|
||||
return create_success, changed, create_msg
|
||||
|
||||
return success, err_msg
|
||||
return success, changed, err_msg
|
||||
|
||||
def stream_action(client, stream_name, shard_count=1, action='create',
|
||||
timeout=300, check_mode=False):
|
||||
|
@ -603,7 +610,7 @@ def stream_action(client, stream_name, shard_count=1, action='create',
|
|||
else:
|
||||
err_msg = 'Invalid action {0}'.format(action)
|
||||
|
||||
except botocore.exceptions.ClientError, e:
|
||||
except botocore.exceptions.ClientError as e:
|
||||
err_msg = str(e)
|
||||
|
||||
return success, err_msg
|
||||
|
@ -644,10 +651,18 @@ def retention_action(client, stream_name, retention_period=24,
|
|||
params['RetentionPeriodHours'] = retention_period
|
||||
client.increase_stream_retention_period(**params)
|
||||
success = True
|
||||
err_msg = (
|
||||
'Retention Period increased successfully to {0}'
|
||||
.format(retention_period)
|
||||
)
|
||||
elif action == 'decrease':
|
||||
params['RetentionPeriodHours'] = retention_period
|
||||
client.decrease_stream_retention_period(**params)
|
||||
success = True
|
||||
err_msg = (
|
||||
'Retention Period decreased successfully to {0}'
|
||||
.format(retention_period)
|
||||
)
|
||||
else:
|
||||
err_msg = 'Invalid action {0}'.format(action)
|
||||
else:
|
||||
|
@ -658,7 +673,7 @@ def retention_action(client, stream_name, retention_period=24,
|
|||
else:
|
||||
err_msg = 'Invalid action {0}'.format(action)
|
||||
|
||||
except botocore.exceptions.ClientError, e:
|
||||
except botocore.exceptions.ClientError as e:
|
||||
err_msg = str(e)
|
||||
|
||||
return success, err_msg
|
||||
|
@ -698,7 +713,7 @@ def update(client, current_stream, stream_name, retention_period=None,
|
|||
Returns:
|
||||
Tuple (bool, bool, str)
|
||||
"""
|
||||
success = False
|
||||
success = True
|
||||
changed = False
|
||||
err_msg = ''
|
||||
if retention_period:
|
||||
|
@ -710,9 +725,10 @@ def update(client, current_stream, stream_name, retention_period=None,
|
|||
)
|
||||
)
|
||||
if not wait_success:
|
||||
return wait_success, True, wait_msg
|
||||
return wait_success, False, wait_msg
|
||||
|
||||
if current_stream['StreamStatus'] == 'ACTIVE':
|
||||
retention_changed = False
|
||||
if retention_period > current_stream['RetentionPeriodHours']:
|
||||
retention_changed, retention_msg = (
|
||||
retention_action(
|
||||
|
@ -720,8 +736,6 @@ def update(client, current_stream, stream_name, retention_period=None,
|
|||
check_mode=check_mode
|
||||
)
|
||||
)
|
||||
if retention_changed:
|
||||
success = True
|
||||
|
||||
elif retention_period < current_stream['RetentionPeriodHours']:
|
||||
retention_changed, retention_msg = (
|
||||
|
@ -730,11 +744,8 @@ def update(client, current_stream, stream_name, retention_period=None,
|
|||
check_mode=check_mode
|
||||
)
|
||||
)
|
||||
if retention_changed:
|
||||
success = True
|
||||
|
||||
elif retention_period == current_stream['RetentionPeriodHours']:
|
||||
retention_changed = False
|
||||
retention_msg = (
|
||||
'Retention {0} is the same as {1}'
|
||||
.format(
|
||||
|
@ -744,7 +755,10 @@ def update(client, current_stream, stream_name, retention_period=None,
|
|||
)
|
||||
success = True
|
||||
|
||||
changed = retention_changed
|
||||
if retention_changed:
|
||||
success = True
|
||||
changed = True
|
||||
|
||||
err_msg = retention_msg
|
||||
if changed and wait:
|
||||
wait_success, wait_msg, current_stream = (
|
||||
|
@ -754,7 +768,7 @@ def update(client, current_stream, stream_name, retention_period=None,
|
|||
)
|
||||
)
|
||||
if not wait_success:
|
||||
return wait_success, True, wait_msg
|
||||
return wait_success, False, wait_msg
|
||||
elif changed and not wait:
|
||||
stream_found, stream_msg, current_stream = (
|
||||
find_stream(client, stream_name, check_mode=check_mode)
|
||||
|
@ -774,11 +788,9 @@ def update(client, current_stream, stream_name, retention_period=None,
|
|||
return success, changed, err_msg
|
||||
|
||||
if tags:
|
||||
changed, err_msg = (
|
||||
_, _, err_msg = (
|
||||
update_tags(client, stream_name, tags, check_mode=check_mode)
|
||||
)
|
||||
if changed:
|
||||
success = True
|
||||
if wait:
|
||||
success, err_msg, _ = (
|
||||
wait_for_status(
|
||||
|
@ -832,6 +844,11 @@ def create_stream(client, stream_name, number_of_shards=1, retention_period=None
|
|||
stream_found, stream_msg, current_stream = (
|
||||
find_stream(client, stream_name, check_mode=check_mode)
|
||||
)
|
||||
if stream_found:
|
||||
if current_stream['ShardsCount'] != number_of_shards:
|
||||
err_msg = 'Can not change the number of shards in a Kinesis Stream'
|
||||
return success, changed, err_msg, results
|
||||
|
||||
if stream_found and current_stream['StreamStatus'] == 'DELETING' and wait:
|
||||
wait_success, wait_msg, current_stream = (
|
||||
wait_for_status(
|
||||
|
@ -1031,7 +1048,7 @@ def main():
|
|||
region=region, endpoint=ec2_url, **aws_connect_kwargs
|
||||
)
|
||||
)
|
||||
except botocore.exceptions.ClientError, e:
|
||||
except botocore.exceptions.ClientError as e:
|
||||
err_msg = 'Boto3 Client Error - {0}'.format(str(e.msg))
|
||||
module.fail_json(
|
||||
success=False, changed=False, result={}, msg=err_msg
|
||||
|
|
Loading…
Reference in a new issue