Now when number of shards is different than what is the stream currently, it will fail.\n\nShards can not be changed on an already created stream
This commit is contained in:
parent
8a17506058
commit
e9f7fb0927
1 changed files with 48 additions and 31 deletions
|
@ -147,7 +147,6 @@ tags:
|
||||||
'''
|
'''
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import boto
|
|
||||||
import botocore
|
import botocore
|
||||||
import boto3
|
import boto3
|
||||||
HAS_BOTO3 = True
|
HAS_BOTO3 = True
|
||||||
|
@ -285,20 +284,18 @@ def get_tags(client, stream_name, check_mode=False):
|
||||||
},
|
},
|
||||||
]
|
]
|
||||||
success = True
|
success = True
|
||||||
except botocore.exceptions.ClientError, e:
|
except botocore.exceptions.ClientError as e:
|
||||||
err_msg = str(e)
|
err_msg = str(e)
|
||||||
|
|
||||||
return success, err_msg, results
|
return success, err_msg, results
|
||||||
|
|
||||||
def find_stream(client, stream_name, limit=1, check_mode=False):
|
def find_stream(client, stream_name, check_mode=False):
|
||||||
"""Retrieve a Kinesis Stream.
|
"""Retrieve a Kinesis Stream.
|
||||||
Args:
|
Args:
|
||||||
client (botocore.client.EC2): Boto3 client.
|
client (botocore.client.EC2): Boto3 client.
|
||||||
stream_name (str): Name of the Kinesis stream.
|
stream_name (str): Name of the Kinesis stream.
|
||||||
|
|
||||||
Kwargs:
|
Kwargs:
|
||||||
limit (int): Limit the number of shards to return within a stream.
|
|
||||||
default=1
|
|
||||||
check_mode (bool): This will pass DryRun as one of the parameters to the aws api.
|
check_mode (bool): This will pass DryRun as one of the parameters to the aws api.
|
||||||
default=False
|
default=False
|
||||||
|
|
||||||
|
@ -313,15 +310,20 @@ def find_stream(client, stream_name, limit=1, check_mode=False):
|
||||||
success = False
|
success = False
|
||||||
params = {
|
params = {
|
||||||
'StreamName': stream_name,
|
'StreamName': stream_name,
|
||||||
'Limit': limit
|
|
||||||
}
|
}
|
||||||
results = dict()
|
results = dict()
|
||||||
|
has_more_shards = True
|
||||||
|
shards = list()
|
||||||
try:
|
try:
|
||||||
if not check_mode:
|
if not check_mode:
|
||||||
|
while has_more_shards:
|
||||||
results = (
|
results = (
|
||||||
client.describe_stream(**params)['StreamDescription']
|
client.describe_stream(**params)['StreamDescription']
|
||||||
)
|
)
|
||||||
results.pop('Shards')
|
shards.extend(results.pop('Shards'))
|
||||||
|
has_more_shards = results['HasMoreShards']
|
||||||
|
results['Shards'] = shards
|
||||||
|
results['ShardsCount'] = len(shards)
|
||||||
else:
|
else:
|
||||||
results = {
|
results = {
|
||||||
'HasMoreShards': True,
|
'HasMoreShards': True,
|
||||||
|
@ -331,7 +333,7 @@ def find_stream(client, stream_name, limit=1, check_mode=False):
|
||||||
'StreamStatus': 'ACTIVE'
|
'StreamStatus': 'ACTIVE'
|
||||||
}
|
}
|
||||||
success = True
|
success = True
|
||||||
except botocore.exceptions.ClientError, e:
|
except botocore.exceptions.ClientError as e:
|
||||||
err_msg = str(e)
|
err_msg = str(e)
|
||||||
|
|
||||||
return success, err_msg, results
|
return success, err_msg, results
|
||||||
|
@ -391,6 +393,8 @@ def wait_for_status(client, stream_name, status, wait_timeout=300,
|
||||||
|
|
||||||
if not status_achieved:
|
if not status_achieved:
|
||||||
err_msg = "Wait time out reached, while waiting for results"
|
err_msg = "Wait time out reached, while waiting for results"
|
||||||
|
else:
|
||||||
|
err_msg = "Status {0} achieved successfully".format(status)
|
||||||
|
|
||||||
return status_achieved, err_msg, stream
|
return status_achieved, err_msg, stream
|
||||||
|
|
||||||
|
@ -442,7 +446,7 @@ def tags_action(client, stream_name, tags, action='create', check_mode=False):
|
||||||
else:
|
else:
|
||||||
err_msg = 'Invalid action {0}'.format(action)
|
err_msg = 'Invalid action {0}'.format(action)
|
||||||
|
|
||||||
except botocore.exceptions.ClientError, e:
|
except botocore.exceptions.ClientError as e:
|
||||||
err_msg = str(e)
|
err_msg = str(e)
|
||||||
|
|
||||||
return success, err_msg
|
return success, err_msg
|
||||||
|
@ -500,6 +504,7 @@ def update_tags(client, stream_name, tags, check_mode=False):
|
||||||
Tuple (bool, str)
|
Tuple (bool, str)
|
||||||
"""
|
"""
|
||||||
success = False
|
success = False
|
||||||
|
changed = False
|
||||||
err_msg = ''
|
err_msg = ''
|
||||||
tag_success, tag_msg, current_tags = (
|
tag_success, tag_msg, current_tags = (
|
||||||
get_tags(client, stream_name, check_mode=check_mode)
|
get_tags(client, stream_name, check_mode=check_mode)
|
||||||
|
@ -536,13 +541,13 @@ def update_tags(client, stream_name, tags, check_mode=False):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
if not delete_success:
|
if not delete_success:
|
||||||
return delete_success, delete_msg
|
return delete_success, changed, delete_msg
|
||||||
if tags_to_update:
|
if tags_to_update:
|
||||||
tags = make_tags_in_proper_format(
|
tags = make_tags_in_proper_format(
|
||||||
recreate_tags_from_list(tags_to_update)
|
recreate_tags_from_list(tags_to_update)
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
return True, 'Tags do not need to be updated'
|
return True, changed, 'Tags do not need to be updated'
|
||||||
|
|
||||||
if tags:
|
if tags:
|
||||||
create_success, create_msg = (
|
create_success, create_msg = (
|
||||||
|
@ -551,9 +556,11 @@ def update_tags(client, stream_name, tags, check_mode=False):
|
||||||
check_mode=check_mode
|
check_mode=check_mode
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
return create_success, create_msg
|
if create_success:
|
||||||
|
changed = True
|
||||||
|
return create_success, changed, create_msg
|
||||||
|
|
||||||
return success, err_msg
|
return success, changed, err_msg
|
||||||
|
|
||||||
def stream_action(client, stream_name, shard_count=1, action='create',
|
def stream_action(client, stream_name, shard_count=1, action='create',
|
||||||
timeout=300, check_mode=False):
|
timeout=300, check_mode=False):
|
||||||
|
@ -603,7 +610,7 @@ def stream_action(client, stream_name, shard_count=1, action='create',
|
||||||
else:
|
else:
|
||||||
err_msg = 'Invalid action {0}'.format(action)
|
err_msg = 'Invalid action {0}'.format(action)
|
||||||
|
|
||||||
except botocore.exceptions.ClientError, e:
|
except botocore.exceptions.ClientError as e:
|
||||||
err_msg = str(e)
|
err_msg = str(e)
|
||||||
|
|
||||||
return success, err_msg
|
return success, err_msg
|
||||||
|
@ -644,10 +651,18 @@ def retention_action(client, stream_name, retention_period=24,
|
||||||
params['RetentionPeriodHours'] = retention_period
|
params['RetentionPeriodHours'] = retention_period
|
||||||
client.increase_stream_retention_period(**params)
|
client.increase_stream_retention_period(**params)
|
||||||
success = True
|
success = True
|
||||||
|
err_msg = (
|
||||||
|
'Retention Period increased successfully to {0}'
|
||||||
|
.format(retention_period)
|
||||||
|
)
|
||||||
elif action == 'decrease':
|
elif action == 'decrease':
|
||||||
params['RetentionPeriodHours'] = retention_period
|
params['RetentionPeriodHours'] = retention_period
|
||||||
client.decrease_stream_retention_period(**params)
|
client.decrease_stream_retention_period(**params)
|
||||||
success = True
|
success = True
|
||||||
|
err_msg = (
|
||||||
|
'Retention Period decreased successfully to {0}'
|
||||||
|
.format(retention_period)
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
err_msg = 'Invalid action {0}'.format(action)
|
err_msg = 'Invalid action {0}'.format(action)
|
||||||
else:
|
else:
|
||||||
|
@ -658,7 +673,7 @@ def retention_action(client, stream_name, retention_period=24,
|
||||||
else:
|
else:
|
||||||
err_msg = 'Invalid action {0}'.format(action)
|
err_msg = 'Invalid action {0}'.format(action)
|
||||||
|
|
||||||
except botocore.exceptions.ClientError, e:
|
except botocore.exceptions.ClientError as e:
|
||||||
err_msg = str(e)
|
err_msg = str(e)
|
||||||
|
|
||||||
return success, err_msg
|
return success, err_msg
|
||||||
|
@ -698,7 +713,7 @@ def update(client, current_stream, stream_name, retention_period=None,
|
||||||
Returns:
|
Returns:
|
||||||
Tuple (bool, bool, str)
|
Tuple (bool, bool, str)
|
||||||
"""
|
"""
|
||||||
success = False
|
success = True
|
||||||
changed = False
|
changed = False
|
||||||
err_msg = ''
|
err_msg = ''
|
||||||
if retention_period:
|
if retention_period:
|
||||||
|
@ -710,9 +725,10 @@ def update(client, current_stream, stream_name, retention_period=None,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
if not wait_success:
|
if not wait_success:
|
||||||
return wait_success, True, wait_msg
|
return wait_success, False, wait_msg
|
||||||
|
|
||||||
if current_stream['StreamStatus'] == 'ACTIVE':
|
if current_stream['StreamStatus'] == 'ACTIVE':
|
||||||
|
retention_changed = False
|
||||||
if retention_period > current_stream['RetentionPeriodHours']:
|
if retention_period > current_stream['RetentionPeriodHours']:
|
||||||
retention_changed, retention_msg = (
|
retention_changed, retention_msg = (
|
||||||
retention_action(
|
retention_action(
|
||||||
|
@ -720,8 +736,6 @@ def update(client, current_stream, stream_name, retention_period=None,
|
||||||
check_mode=check_mode
|
check_mode=check_mode
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
if retention_changed:
|
|
||||||
success = True
|
|
||||||
|
|
||||||
elif retention_period < current_stream['RetentionPeriodHours']:
|
elif retention_period < current_stream['RetentionPeriodHours']:
|
||||||
retention_changed, retention_msg = (
|
retention_changed, retention_msg = (
|
||||||
|
@ -730,11 +744,8 @@ def update(client, current_stream, stream_name, retention_period=None,
|
||||||
check_mode=check_mode
|
check_mode=check_mode
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
if retention_changed:
|
|
||||||
success = True
|
|
||||||
|
|
||||||
elif retention_period == current_stream['RetentionPeriodHours']:
|
elif retention_period == current_stream['RetentionPeriodHours']:
|
||||||
retention_changed = False
|
|
||||||
retention_msg = (
|
retention_msg = (
|
||||||
'Retention {0} is the same as {1}'
|
'Retention {0} is the same as {1}'
|
||||||
.format(
|
.format(
|
||||||
|
@ -744,7 +755,10 @@ def update(client, current_stream, stream_name, retention_period=None,
|
||||||
)
|
)
|
||||||
success = True
|
success = True
|
||||||
|
|
||||||
changed = retention_changed
|
if retention_changed:
|
||||||
|
success = True
|
||||||
|
changed = True
|
||||||
|
|
||||||
err_msg = retention_msg
|
err_msg = retention_msg
|
||||||
if changed and wait:
|
if changed and wait:
|
||||||
wait_success, wait_msg, current_stream = (
|
wait_success, wait_msg, current_stream = (
|
||||||
|
@ -754,7 +768,7 @@ def update(client, current_stream, stream_name, retention_period=None,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
if not wait_success:
|
if not wait_success:
|
||||||
return wait_success, True, wait_msg
|
return wait_success, False, wait_msg
|
||||||
elif changed and not wait:
|
elif changed and not wait:
|
||||||
stream_found, stream_msg, current_stream = (
|
stream_found, stream_msg, current_stream = (
|
||||||
find_stream(client, stream_name, check_mode=check_mode)
|
find_stream(client, stream_name, check_mode=check_mode)
|
||||||
|
@ -774,11 +788,9 @@ def update(client, current_stream, stream_name, retention_period=None,
|
||||||
return success, changed, err_msg
|
return success, changed, err_msg
|
||||||
|
|
||||||
if tags:
|
if tags:
|
||||||
changed, err_msg = (
|
_, _, err_msg = (
|
||||||
update_tags(client, stream_name, tags, check_mode=check_mode)
|
update_tags(client, stream_name, tags, check_mode=check_mode)
|
||||||
)
|
)
|
||||||
if changed:
|
|
||||||
success = True
|
|
||||||
if wait:
|
if wait:
|
||||||
success, err_msg, _ = (
|
success, err_msg, _ = (
|
||||||
wait_for_status(
|
wait_for_status(
|
||||||
|
@ -832,6 +844,11 @@ def create_stream(client, stream_name, number_of_shards=1, retention_period=None
|
||||||
stream_found, stream_msg, current_stream = (
|
stream_found, stream_msg, current_stream = (
|
||||||
find_stream(client, stream_name, check_mode=check_mode)
|
find_stream(client, stream_name, check_mode=check_mode)
|
||||||
)
|
)
|
||||||
|
if stream_found:
|
||||||
|
if current_stream['ShardsCount'] != number_of_shards:
|
||||||
|
err_msg = 'Can not change the number of shards in a Kinesis Stream'
|
||||||
|
return success, changed, err_msg, results
|
||||||
|
|
||||||
if stream_found and current_stream['StreamStatus'] == 'DELETING' and wait:
|
if stream_found and current_stream['StreamStatus'] == 'DELETING' and wait:
|
||||||
wait_success, wait_msg, current_stream = (
|
wait_success, wait_msg, current_stream = (
|
||||||
wait_for_status(
|
wait_for_status(
|
||||||
|
@ -1031,7 +1048,7 @@ def main():
|
||||||
region=region, endpoint=ec2_url, **aws_connect_kwargs
|
region=region, endpoint=ec2_url, **aws_connect_kwargs
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
except botocore.exceptions.ClientError, e:
|
except botocore.exceptions.ClientError as e:
|
||||||
err_msg = 'Boto3 Client Error - {0}'.format(str(e.msg))
|
err_msg = 'Boto3 Client Error - {0}'.format(str(e.msg))
|
||||||
module.fail_json(
|
module.fail_json(
|
||||||
success=False, changed=False, result={}, msg=err_msg
|
success=False, changed=False, result={}, msg=err_msg
|
||||||
|
|
Loading…
Reference in a new issue