import pytest import unittest boto3 = pytest.importorskip("boto3") botocore = pytest.importorskip("botocore") from ansible.modules.cloud.amazon import kinesis_stream aws_region = 'us-west-2' class AnsibleKinesisStreamFunctions(unittest.TestCase): def test_convert_to_lower(self): example = { 'HasMoreShards': True, 'RetentionPeriodHours': 24, 'StreamName': 'test', 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', 'StreamStatus': 'ACTIVE' } converted_example = kinesis_stream.convert_to_lower(example) keys = list(converted_example.keys()) keys.sort() for i in range(len(keys)): if i == 0: self.assertEqual(keys[i], 'has_more_shards') if i == 1: self.assertEqual(keys[i], 'retention_period_hours') if i == 2: self.assertEqual(keys[i], 'stream_arn') if i == 3: self.assertEqual(keys[i], 'stream_name') if i == 4: self.assertEqual(keys[i], 'stream_status') def test_make_tags_in_aws_format(self): example = { 'env': 'development' } should_return = [ { 'Key': 'env', 'Value': 'development' } ] aws_tags = kinesis_stream.make_tags_in_aws_format(example) self.assertEqual(aws_tags, should_return) def test_make_tags_in_proper_format(self): example = [ { 'Key': 'env', 'Value': 'development' }, { 'Key': 'service', 'Value': 'web' } ] should_return = { 'env': 'development', 'service': 'web' } proper_tags = kinesis_stream.make_tags_in_proper_format(example) self.assertEqual(proper_tags, should_return) def test_recreate_tags_from_list(self): example = [('environment', 'development'), ('service', 'web')] should_return = [ { 'Key': 'environment', 'Value': 'development' }, { 'Key': 'service', 'Value': 'web' } ] aws_tags = kinesis_stream.recreate_tags_from_list(example) self.assertEqual(aws_tags, should_return) def test_get_tags(self): client = boto3.client('kinesis', region_name=aws_region) success, err_msg, tags = kinesis_stream.get_tags(client, 'test', check_mode=True) self.assertTrue(success) should_return = [ { 'Key': 'DryRunMode', 'Value': 'true' } ] self.assertEqual(tags, should_return) def test_find_stream(self): client = boto3.client('kinesis', region_name=aws_region) success, err_msg, stream = ( kinesis_stream.find_stream(client, 'test', check_mode=True) ) should_return = { 'OpenShardsCount': 5, 'ClosedShardsCount': 0, 'ShardsCount': 5, 'HasMoreShards': True, 'RetentionPeriodHours': 24, 'StreamName': 'test', 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', 'StreamStatus': 'ACTIVE', 'EncryptionType': 'NONE' } self.assertTrue(success) self.assertEqual(stream, should_return) def test_wait_for_status(self): client = boto3.client('kinesis', region_name=aws_region) success, err_msg, stream = ( kinesis_stream.wait_for_status( client, 'test', 'ACTIVE', check_mode=True ) ) should_return = { 'OpenShardsCount': 5, 'ClosedShardsCount': 0, 'ShardsCount': 5, 'HasMoreShards': True, 'RetentionPeriodHours': 24, 'StreamName': 'test', 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', 'StreamStatus': 'ACTIVE', 'EncryptionType': 'NONE' } self.assertTrue(success) self.assertEqual(stream, should_return) def test_tags_action_create(self): client = boto3.client('kinesis', region_name=aws_region) tags = { 'env': 'development', 'service': 'web' } success, err_msg = ( kinesis_stream.tags_action( client, 'test', tags, 'create', check_mode=True ) ) self.assertTrue(success) def test_tags_action_delete(self): client = boto3.client('kinesis', region_name=aws_region) tags = { 'env': 'development', 'service': 'web' } success, err_msg = ( kinesis_stream.tags_action( client, 'test', tags, 'delete', check_mode=True ) ) self.assertTrue(success) def test_tags_action_invalid(self): client = boto3.client('kinesis', region_name=aws_region) tags = { 'env': 'development', 'service': 'web' } success, err_msg = ( kinesis_stream.tags_action( client, 'test', tags, 'append', check_mode=True ) ) self.assertFalse(success) def test_update_tags(self): client = boto3.client('kinesis', region_name=aws_region) tags = { 'env': 'development', 'service': 'web' } success, changed, err_msg = ( kinesis_stream.update_tags( client, 'test', tags, check_mode=True ) ) self.assertTrue(success) def test_stream_action_create(self): client = boto3.client('kinesis', region_name=aws_region) success, err_msg = ( kinesis_stream.stream_action( client, 'test', 10, 'create', check_mode=True ) ) self.assertTrue(success) def test_stream_action_delete(self): client = boto3.client('kinesis', region_name=aws_region) success, err_msg = ( kinesis_stream.stream_action( client, 'test', 10, 'delete', check_mode=True ) ) self.assertTrue(success) def test_stream_action_invalid(self): client = boto3.client('kinesis', region_name=aws_region) success, err_msg = ( kinesis_stream.stream_action( client, 'test', 10, 'append', check_mode=True ) ) self.assertFalse(success) def test_retention_action_increase(self): client = boto3.client('kinesis', region_name=aws_region) success, err_msg = ( kinesis_stream.retention_action( client, 'test', 48, 'increase', check_mode=True ) ) self.assertTrue(success) def test_retention_action_decrease(self): client = boto3.client('kinesis', region_name=aws_region) success, err_msg = ( kinesis_stream.retention_action( client, 'test', 24, 'decrease', check_mode=True ) ) self.assertTrue(success) def test_retention_action_invalid(self): client = boto3.client('kinesis', region_name=aws_region) success, err_msg = ( kinesis_stream.retention_action( client, 'test', 24, 'create', check_mode=True ) ) self.assertFalse(success) def test_update_shard_count(self): client = boto3.client('kinesis', region_name=aws_region) success, err_msg = ( kinesis_stream.update_shard_count( client, 'test', 5, check_mode=True ) ) self.assertTrue(success) def test_update(self): client = boto3.client('kinesis', region_name=aws_region) current_stream = { 'OpenShardsCount': 5, 'ClosedShardsCount': 0, 'ShardsCount': 1, 'HasMoreShards': True, 'RetentionPeriodHours': 24, 'StreamName': 'test', 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', 'StreamStatus': 'ACTIVE', 'EncryptionType': 'NONE' } tags = { 'env': 'development', 'service': 'web' } success, changed, err_msg = ( kinesis_stream.update( client, current_stream, 'test', number_of_shards=2, retention_period=48, tags=tags, check_mode=True ) ) self.assertTrue(success) self.assertTrue(changed) self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.') def test_create_stream(self): client = boto3.client('kinesis', region_name=aws_region) tags = { 'env': 'development', 'service': 'web' } success, changed, err_msg, results = ( kinesis_stream.create_stream( client, 'test', number_of_shards=10, retention_period=48, tags=tags, check_mode=True ) ) should_return = { 'open_shards_count': 5, 'closed_shards_count': 0, 'shards_count': 5, 'has_more_shards': True, 'retention_period_hours': 24, 'stream_name': 'test', 'stream_arn': 'arn:aws:kinesis:east-side:123456789:stream/test', 'stream_status': 'ACTIVE', 'encryption_type': 'NONE', 'tags': tags, } self.assertTrue(success) self.assertTrue(changed) self.assertEqual(results, should_return) self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.') def test_enable_stream_encription(self): client = boto3.client('kinesis', region_name=aws_region) success, changed, err_msg, results = ( kinesis_stream.start_stream_encryption( client, 'test', encryption_type='KMS', key_id='', wait=True, wait_timeout=60, check_mode=True ) ) self.assertTrue(success) self.assertTrue(changed) self.assertEqual(err_msg, 'Kinesis Stream test encryption started successfully.') def test_dsbale_stream_encryption(self): client = boto3.client('kinesis', region_name=aws_region) success, changed, err_msg, results = ( kinesis_stream.stop_stream_encryption( client, 'test', encryption_type='KMS', key_id='', wait=True, wait_timeout=60, check_mode=True ) ) self.assertTrue(success) self.assertTrue(changed) self.assertEqual(err_msg, 'Kinesis Stream test encryption stopped successfully.')