[cloud] Add the ability to modify shard count to kinesis_stream module (#24805)
* Add the ability to modify shard count to kinesis_stream module * Fixed an issue in kinesis_stream where update() reports not changed when it is changed * Remove unreachable message and make the try and catch block shorter
This commit is contained in:
parent
d9fc3def94
commit
75998d3ca3
2 changed files with 109 additions and 11 deletions
|
@ -36,8 +36,7 @@ options:
|
||||||
required: true
|
required: true
|
||||||
shards:
|
shards:
|
||||||
description:
|
description:
|
||||||
- "The number of shards you want to have with this stream. This can not
|
- "The number of shards you want to have with this stream."
|
||||||
be modified after being created."
|
|
||||||
- "This is required when state == present"
|
- "This is required when state == present"
|
||||||
required: false
|
required: false
|
||||||
default: None
|
default: None
|
||||||
|
@ -334,9 +333,15 @@ def find_stream(client, stream_name, check_mode=False):
|
||||||
shards.extend(results.pop('Shards'))
|
shards.extend(results.pop('Shards'))
|
||||||
has_more_shards = results['HasMoreShards']
|
has_more_shards = results['HasMoreShards']
|
||||||
results['Shards'] = shards
|
results['Shards'] = shards
|
||||||
|
num_closed_shards = len([s for s in shards if 'EndingSequenceNumber' in s['SequenceNumberRange']])
|
||||||
|
results['OpenShardsCount'] = len(shards) - num_closed_shards
|
||||||
|
results['ClosedShardsCount'] = num_closed_shards
|
||||||
results['ShardsCount'] = len(shards)
|
results['ShardsCount'] = len(shards)
|
||||||
else:
|
else:
|
||||||
results = {
|
results = {
|
||||||
|
'OpenShardsCount': 5,
|
||||||
|
'ClosedShardsCount': 0,
|
||||||
|
'ShardsCount': 5,
|
||||||
'HasMoreShards': True,
|
'HasMoreShards': True,
|
||||||
'RetentionPeriodHours': 24,
|
'RetentionPeriodHours': 24,
|
||||||
'StreamName': stream_name,
|
'StreamName': stream_name,
|
||||||
|
@ -634,10 +639,10 @@ def stream_action(client, stream_name, shard_count=1, action='create',
|
||||||
|
|
||||||
def retention_action(client, stream_name, retention_period=24,
|
def retention_action(client, stream_name, retention_period=24,
|
||||||
action='increase', check_mode=False):
|
action='increase', check_mode=False):
|
||||||
"""Increase or Decreaste the retention of messages in the Kinesis stream.
|
"""Increase or Decrease the retention of messages in the Kinesis stream.
|
||||||
Args:
|
Args:
|
||||||
client (botocore.client.EC2): Boto3 client.
|
client (botocore.client.EC2): Boto3 client.
|
||||||
stream_name (str): The
|
stream_name (str): The name of the kinesis stream.
|
||||||
|
|
||||||
Kwargs:
|
Kwargs:
|
||||||
retention_period (int): This is how long messages will be kept before
|
retention_period (int): This is how long messages will be kept before
|
||||||
|
@ -652,7 +657,7 @@ def retention_action(client, stream_name, retention_period=24,
|
||||||
>>> client = boto3.client('kinesis')
|
>>> client = boto3.client('kinesis')
|
||||||
>>> stream_name = 'test-stream'
|
>>> stream_name = 'test-stream'
|
||||||
>>> retention_period = 48
|
>>> retention_period = 48
|
||||||
>>> stream_action(client, stream_name, retention_period, action='create')
|
>>> retention_action(client, stream_name, retention_period, action='increase')
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Tuple (bool, str)
|
Tuple (bool, str)
|
||||||
|
@ -696,7 +701,44 @@ def retention_action(client, stream_name, retention_period=24,
|
||||||
return success, err_msg
|
return success, err_msg
|
||||||
|
|
||||||
|
|
||||||
def update(client, current_stream, stream_name, retention_period=None,
|
def update_shard_count(client, stream_name, number_of_shards=1, check_mode=False):
|
||||||
|
"""Increase or Decrease the number of shards in the Kinesis stream.
|
||||||
|
Args:
|
||||||
|
client (botocore.client.EC2): Boto3 client.
|
||||||
|
stream_name (str): The name of the kinesis stream.
|
||||||
|
|
||||||
|
Kwargs:
|
||||||
|
number_of_shards (int): Number of shards this stream will use.
|
||||||
|
default=1
|
||||||
|
check_mode (bool): This will pass DryRun as one of the parameters to the aws api.
|
||||||
|
default=False
|
||||||
|
|
||||||
|
Basic Usage:
|
||||||
|
>>> client = boto3.client('kinesis')
|
||||||
|
>>> stream_name = 'test-stream'
|
||||||
|
>>> number_of_shards = 3
|
||||||
|
>>> update_shard_count(client, stream_name, number_of_shards)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple (bool, str)
|
||||||
|
"""
|
||||||
|
success = True
|
||||||
|
err_msg = ''
|
||||||
|
params = {
|
||||||
|
'StreamName': stream_name,
|
||||||
|
'ScalingType': 'UNIFORM_SCALING'
|
||||||
|
}
|
||||||
|
if not check_mode:
|
||||||
|
params['TargetShardCount'] = number_of_shards
|
||||||
|
try:
|
||||||
|
client.update_shard_count(**params)
|
||||||
|
except botocore.exceptions.ClientError as e:
|
||||||
|
return False, str(e)
|
||||||
|
|
||||||
|
return success, err_msg
|
||||||
|
|
||||||
|
|
||||||
|
def update(client, current_stream, stream_name, number_of_shards=1, retention_period=None,
|
||||||
tags=None, wait=False, wait_timeout=300, check_mode=False):
|
tags=None, wait=False, wait_timeout=300, check_mode=False):
|
||||||
"""Update an Amazon Kinesis Stream.
|
"""Update an Amazon Kinesis Stream.
|
||||||
Args:
|
Args:
|
||||||
|
@ -704,6 +746,8 @@ def update(client, current_stream, stream_name, retention_period=None,
|
||||||
stream_name (str): The name of the kinesis stream.
|
stream_name (str): The name of the kinesis stream.
|
||||||
|
|
||||||
Kwargs:
|
Kwargs:
|
||||||
|
number_of_shards (int): Number of shards this stream will use.
|
||||||
|
default=1
|
||||||
retention_period (int): This is how long messages will be kept before
|
retention_period (int): This is how long messages will be kept before
|
||||||
they are discarded. This can not be less than 24 hours.
|
they are discarded. This can not be less than 24 hours.
|
||||||
tags (dict): The tags you want applied.
|
tags (dict): The tags you want applied.
|
||||||
|
@ -717,6 +761,7 @@ def update(client, current_stream, stream_name, retention_period=None,
|
||||||
Basic Usage:
|
Basic Usage:
|
||||||
>>> client = boto3.client('kinesis')
|
>>> client = boto3.client('kinesis')
|
||||||
>>> current_stream = {
|
>>> current_stream = {
|
||||||
|
'ShardCount': 3,
|
||||||
'HasMoreShards': True,
|
'HasMoreShards': True,
|
||||||
'RetentionPeriodHours': 24,
|
'RetentionPeriodHours': 24,
|
||||||
'StreamName': 'test-stream',
|
'StreamName': 'test-stream',
|
||||||
|
@ -725,8 +770,9 @@ def update(client, current_stream, stream_name, retention_period=None,
|
||||||
}
|
}
|
||||||
>>> stream_name = 'test-stream'
|
>>> stream_name = 'test-stream'
|
||||||
>>> retention_period = 48
|
>>> retention_period = 48
|
||||||
>>> stream_action(client, current_stream, stream_name,
|
>>> number_of_shards = 10
|
||||||
retention_period, action='create' )
|
>>> update(client, current_stream, stream_name,
|
||||||
|
number_of_shards, retention_period )
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Tuple (bool, bool, str)
|
Tuple (bool, bool, str)
|
||||||
|
@ -805,6 +851,36 @@ def update(client, current_stream, stream_name, retention_period=None,
|
||||||
)
|
)
|
||||||
return success, changed, err_msg
|
return success, changed, err_msg
|
||||||
|
|
||||||
|
if current_stream['OpenShardsCount'] != number_of_shards:
|
||||||
|
success, err_msg = (
|
||||||
|
update_shard_count(client, stream_name, number_of_shards, check_mode=check_mode)
|
||||||
|
)
|
||||||
|
|
||||||
|
if not success:
|
||||||
|
return success, changed, err_msg
|
||||||
|
|
||||||
|
changed = True
|
||||||
|
|
||||||
|
if wait:
|
||||||
|
wait_success, wait_msg, current_stream = (
|
||||||
|
wait_for_status(
|
||||||
|
client, stream_name, 'ACTIVE', wait_timeout,
|
||||||
|
check_mode=check_mode
|
||||||
|
)
|
||||||
|
)
|
||||||
|
if not wait_success:
|
||||||
|
return wait_success, changed, wait_msg
|
||||||
|
else:
|
||||||
|
stream_found, stream_msg, current_stream = (
|
||||||
|
find_stream(client, stream_name, check_mode=check_mode)
|
||||||
|
)
|
||||||
|
if stream_found and current_stream['StreamStatus'] != 'ACTIVE':
|
||||||
|
err_msg = (
|
||||||
|
'Number of shards for {0} is in the process of updating'
|
||||||
|
.format(stream_name)
|
||||||
|
)
|
||||||
|
return success, changed, err_msg
|
||||||
|
|
||||||
if tags:
|
if tags:
|
||||||
_, _, err_msg = (
|
_, _, err_msg = (
|
||||||
update_tags(client, stream_name, tags, check_mode=check_mode)
|
update_tags(client, stream_name, tags, check_mode=check_mode)
|
||||||
|
@ -863,6 +939,7 @@ def create_stream(client, stream_name, number_of_shards=1, retention_period=None
|
||||||
stream_found, stream_msg, current_stream = (
|
stream_found, stream_msg, current_stream = (
|
||||||
find_stream(client, stream_name, check_mode=check_mode)
|
find_stream(client, stream_name, check_mode=check_mode)
|
||||||
)
|
)
|
||||||
|
|
||||||
if stream_found and current_stream.get('StreamStatus') == 'DELETING' and wait:
|
if stream_found and current_stream.get('StreamStatus') == 'DELETING' and wait:
|
||||||
wait_success, wait_msg, current_stream = (
|
wait_success, wait_msg, current_stream = (
|
||||||
wait_for_status(
|
wait_for_status(
|
||||||
|
@ -878,8 +955,8 @@ def create_stream(client, stream_name, number_of_shards=1, retention_period=None
|
||||||
|
|
||||||
if stream_found and current_stream.get('StreamStatus') != 'DELETING':
|
if stream_found and current_stream.get('StreamStatus') != 'DELETING':
|
||||||
success, changed, err_msg = update(
|
success, changed, err_msg = update(
|
||||||
client, current_stream, stream_name, retention_period, tags,
|
client, current_stream, stream_name, number_of_shards,
|
||||||
wait, wait_timeout, check_mode=check_mode
|
retention_period, tags, wait, wait_timeout, check_mode=check_mode
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
create_success, create_msg = (
|
create_success, create_msg = (
|
||||||
|
|
|
@ -98,6 +98,9 @@ class AnsibleKinesisStreamFunctions(unittest.TestCase):
|
||||||
kinesis_stream.find_stream(client, 'test', check_mode=True)
|
kinesis_stream.find_stream(client, 'test', check_mode=True)
|
||||||
)
|
)
|
||||||
should_return = {
|
should_return = {
|
||||||
|
'OpenShardsCount': 5,
|
||||||
|
'ClosedShardsCount': 0,
|
||||||
|
'ShardsCount': 5,
|
||||||
'HasMoreShards': True,
|
'HasMoreShards': True,
|
||||||
'RetentionPeriodHours': 24,
|
'RetentionPeriodHours': 24,
|
||||||
'StreamName': 'test',
|
'StreamName': 'test',
|
||||||
|
@ -115,6 +118,9 @@ class AnsibleKinesisStreamFunctions(unittest.TestCase):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
should_return = {
|
should_return = {
|
||||||
|
'OpenShardsCount': 5,
|
||||||
|
'ClosedShardsCount': 0,
|
||||||
|
'ShardsCount': 5,
|
||||||
'HasMoreShards': True,
|
'HasMoreShards': True,
|
||||||
'RetentionPeriodHours': 24,
|
'RetentionPeriodHours': 24,
|
||||||
'StreamName': 'test',
|
'StreamName': 'test',
|
||||||
|
@ -230,9 +236,21 @@ class AnsibleKinesisStreamFunctions(unittest.TestCase):
|
||||||
)
|
)
|
||||||
self.assertFalse(success)
|
self.assertFalse(success)
|
||||||
|
|
||||||
|
def test_update_shard_count(self):
|
||||||
|
client = boto3.client('kinesis', region_name=aws_region)
|
||||||
|
success, err_msg = (
|
||||||
|
kinesis_stream.update_shard_count(
|
||||||
|
client, 'test', 5, check_mode=True
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.assertTrue(success)
|
||||||
|
|
||||||
def test_update(self):
|
def test_update(self):
|
||||||
client = boto3.client('kinesis', region_name=aws_region)
|
client = boto3.client('kinesis', region_name=aws_region)
|
||||||
current_stream = {
|
current_stream = {
|
||||||
|
'OpenShardsCount': 5,
|
||||||
|
'ClosedShardsCount': 0,
|
||||||
|
'ShardsCount': 1,
|
||||||
'HasMoreShards': True,
|
'HasMoreShards': True,
|
||||||
'RetentionPeriodHours': 24,
|
'RetentionPeriodHours': 24,
|
||||||
'StreamName': 'test',
|
'StreamName': 'test',
|
||||||
|
@ -245,7 +263,7 @@ class AnsibleKinesisStreamFunctions(unittest.TestCase):
|
||||||
}
|
}
|
||||||
success, changed, err_msg = (
|
success, changed, err_msg = (
|
||||||
kinesis_stream.update(
|
kinesis_stream.update(
|
||||||
client, current_stream, 'test', retention_period=48,
|
client, current_stream, 'test', number_of_shards=2, retention_period=48,
|
||||||
tags=tags, check_mode=True
|
tags=tags, check_mode=True
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
@ -266,6 +284,9 @@ class AnsibleKinesisStreamFunctions(unittest.TestCase):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
should_return = {
|
should_return = {
|
||||||
|
'open_shards_count': 5,
|
||||||
|
'closed_shards_count': 0,
|
||||||
|
'shards_count': 5,
|
||||||
'has_more_shards': True,
|
'has_more_shards': True,
|
||||||
'retention_period_hours': 24,
|
'retention_period_hours': 24,
|
||||||
'stream_name': 'test',
|
'stream_name': 'test',
|
||||||
|
|
Loading…
Reference in a new issue