fixup sns topic subscriptions (#2232)
* fixup sns topic subscriptions * return docs
This commit is contained in:
parent
4afae893e1
commit
0d8eefe197
1 changed files with 238 additions and 178 deletions
416
cloud/amazon/sns_topic.py
Executable file → Normal file
416
cloud/amazon/sns_topic.py
Executable file → Normal file
|
@ -97,35 +97,29 @@ EXAMPLES = """
|
|||
"""
|
||||
|
||||
RETURN = '''
|
||||
topic_created:
|
||||
description: Whether the topic was newly created
|
||||
type: bool
|
||||
returned: changed and state == present
|
||||
sample: True
|
||||
|
||||
attributes_set:
|
||||
description: The attributes which were changed
|
||||
type: list
|
||||
returned: state == "present"
|
||||
sample: ["policy", "delivery_policy"]
|
||||
|
||||
subscriptions_added:
|
||||
description: The subscriptions added to the topic
|
||||
type: list
|
||||
returned: state == "present"
|
||||
sample: [["sms", "my_mobile_number"], ["sms", "my_mobile_2"]]
|
||||
|
||||
subscriptions_deleted:
|
||||
description: The subscriptions deleted from the topic
|
||||
type: list
|
||||
returned: state == "present"
|
||||
sample: [["sms", "my_mobile_number"], ["sms", "my_mobile_2"]]
|
||||
|
||||
sns_arn:
|
||||
description: The ARN of the topic you are modifying
|
||||
type: string
|
||||
returned: state == "present"
|
||||
sample: "arn:aws:sns:us-east-1:123456789012:my_topic_name"
|
||||
|
||||
sns_topic:
|
||||
description: Dict of sns topic details
|
||||
type: dict
|
||||
sample:
|
||||
name: sns-topic-name
|
||||
state: present
|
||||
display_name: default
|
||||
policy: {}
|
||||
delivery_policy: {}
|
||||
subscriptions_new: []
|
||||
subscriptions_existing: []
|
||||
subscriptions_deleted: []
|
||||
subscriptions_added: []
|
||||
subscriptions_purge': false
|
||||
check_mode: false
|
||||
topic_created: false
|
||||
topic_deleted: false
|
||||
attributes_set: []
|
||||
'''
|
||||
|
||||
import sys
|
||||
|
@ -141,38 +135,210 @@ except ImportError:
|
|||
HAS_BOTO = False
|
||||
|
||||
|
||||
def canonicalize_endpoint(protocol, endpoint):
|
||||
if protocol == 'sms':
|
||||
import re
|
||||
return re.sub('[^0-9]*', '', endpoint)
|
||||
return endpoint
|
||||
class SnsTopicManager(object):
|
||||
""" Handles SNS Topic creation and destruction """
|
||||
|
||||
def __init__(self,
|
||||
module,
|
||||
name,
|
||||
state,
|
||||
display_name,
|
||||
policy,
|
||||
delivery_policy,
|
||||
subscriptions,
|
||||
purge_subscriptions,
|
||||
check_mode,
|
||||
region,
|
||||
**aws_connect_params):
|
||||
|
||||
def get_all_topics(connection, module):
|
||||
next_token = None
|
||||
topics = []
|
||||
while True:
|
||||
self.region = region
|
||||
self.aws_connect_params = aws_connect_params
|
||||
self.connection = self._get_boto_connection()
|
||||
self.changed = False
|
||||
self.module = module
|
||||
self.name = name
|
||||
self.state = state
|
||||
self.display_name = display_name
|
||||
self.policy = policy
|
||||
self.delivery_policy = delivery_policy
|
||||
self.subscriptions = subscriptions
|
||||
self.subscriptions_existing = []
|
||||
self.subscriptions_deleted = []
|
||||
self.subscriptions_added = []
|
||||
self.purge_subscriptions = purge_subscriptions
|
||||
self.check_mode = check_mode
|
||||
self.topic_created = False
|
||||
self.topic_deleted = False
|
||||
self.arn_topic = None
|
||||
self.attributes_set = []
|
||||
|
||||
def _get_boto_connection(self):
|
||||
try:
|
||||
response = connection.get_all_topics(next_token)
|
||||
except BotoServerError, e:
|
||||
module.fail_json(msg=e.message)
|
||||
return connect_to_aws(boto.sns, self.region,
|
||||
**self.aws_connect_params)
|
||||
except BotoServerError, err:
|
||||
self.module.fail_json(msg=err.message)
|
||||
|
||||
topics.extend(response['ListTopicsResponse']['ListTopicsResult']['Topics'])
|
||||
next_token = \
|
||||
response['ListTopicsResponse']['ListTopicsResult']['NextToken']
|
||||
if not next_token:
|
||||
break
|
||||
return [t['TopicArn'] for t in topics]
|
||||
def _get_all_topics(self):
|
||||
next_token = None
|
||||
topics = []
|
||||
while True:
|
||||
try:
|
||||
response = self.connection.get_all_topics(next_token)
|
||||
except BotoServerError, err:
|
||||
module.fail_json(msg=err.message)
|
||||
topics.extend(response['ListTopicsResponse']['ListTopicsResult']['Topics'])
|
||||
next_token = response['ListTopicsResponse']['ListTopicsResult']['NextToken']
|
||||
if not next_token:
|
||||
break
|
||||
return [t['TopicArn'] for t in topics]
|
||||
|
||||
|
||||
def arn_topic_lookup(connection, short_topic, module):
|
||||
# topic names cannot have colons, so this captures the full topic name
|
||||
all_topics = get_all_topics(connection, module)
|
||||
lookup_topic = ':%s' % short_topic
|
||||
for topic in all_topics:
|
||||
if topic.endswith(lookup_topic):
|
||||
return topic
|
||||
return None
|
||||
def _arn_topic_lookup(self):
|
||||
# topic names cannot have colons, so this captures the full topic name
|
||||
all_topics = self._get_all_topics()
|
||||
lookup_topic = ':%s' % self.name
|
||||
for topic in all_topics:
|
||||
if topic.endswith(lookup_topic):
|
||||
return topic
|
||||
|
||||
|
||||
def _create_topic(self):
|
||||
self.changed = True
|
||||
self.topic_created = True
|
||||
if not self.check_mode:
|
||||
self.connection.create_topic(self.name)
|
||||
self.arn_topic = self._arn_topic_lookup()
|
||||
while not self.arn_topic:
|
||||
time.sleep(3)
|
||||
self.arn_topic = self._arn_topic_lookup()
|
||||
|
||||
|
||||
def _set_topic_attrs(self):
|
||||
topic_attributes = self.connection.get_topic_attributes(self.arn_topic) \
|
||||
['GetTopicAttributesResponse'] ['GetTopicAttributesResult'] \
|
||||
['Attributes']
|
||||
|
||||
if self.display_name and self.display_name != topic_attributes['DisplayName']:
|
||||
self.changed = True
|
||||
self.attributes_set.append('display_name')
|
||||
if not self.check_mode:
|
||||
self.connection.set_topic_attributes(self.arn_topic, 'DisplayName',
|
||||
self.display_name)
|
||||
|
||||
if self.policy and self.policy != json.loads(topic_attributes['Policy']):
|
||||
self.changed = True
|
||||
self.attributes_set.append('policy')
|
||||
if not self.check_mode:
|
||||
self.connection.set_topic_attributes(self.arn_topic, 'Policy',
|
||||
json.dumps(self.policy))
|
||||
|
||||
if self.delivery_policy and ('DeliveryPolicy' not in topic_attributes or \
|
||||
self.delivery_policy != json.loads(topic_attributes['DeliveryPolicy'])):
|
||||
self.changed = True
|
||||
self.attributes_set.append('delivery_policy')
|
||||
if not self.check_mode:
|
||||
self.connection.set_topic_attributes(self.arn_topic, 'DeliveryPolicy',
|
||||
json.dumps(self.delivery_policy))
|
||||
|
||||
|
||||
def _canonicalize_endpoint(self, protocol, endpoint):
|
||||
if protocol == 'sms':
|
||||
return re.sub('[^0-9]*', '', endpoint)
|
||||
return endpoint
|
||||
|
||||
|
||||
def _get_topic_subs(self):
|
||||
next_token = None
|
||||
while True:
|
||||
response = self.connection.get_all_subscriptions_by_topic(self.arn_topic, next_token)
|
||||
self.subscriptions_existing.extend(response['ListSubscriptionsByTopicResponse'] \
|
||||
['ListSubscriptionsByTopicResult']['Subscriptions'])
|
||||
next_token = response['ListSubscriptionsByTopicResponse'] \
|
||||
['ListSubscriptionsByTopicResult']['NextToken']
|
||||
if not next_token:
|
||||
break
|
||||
|
||||
def _set_topic_subs(self):
|
||||
subscriptions_existing_list = []
|
||||
desired_subscriptions = [(sub['protocol'],
|
||||
self._canonicalize_endpoint(sub['protocol'], sub['endpoint'])) for sub in
|
||||
self.subscriptions]
|
||||
|
||||
if self.subscriptions_existing:
|
||||
for sub in self.subscriptions_existing:
|
||||
sub_key = (sub['Protocol'], sub['Endpoint'])
|
||||
subscriptions_existing_list.append(sub_key)
|
||||
if self.purge_subscriptions and sub_key not in desired_subscriptions and \
|
||||
sub['SubscriptionArn'] != 'PendingConfirmation':
|
||||
self.changed = True
|
||||
self.subscriptions_deleted.append(sub_key)
|
||||
if not self.check_mode:
|
||||
self.connection.unsubscribe(sub['SubscriptionArn'])
|
||||
|
||||
for (protocol, endpoint) in desired_subscriptions:
|
||||
if (protocol, endpoint) not in subscriptions_existing_list:
|
||||
self.changed = True
|
||||
self.subscriptions_added.append(sub)
|
||||
if not self.check_mode:
|
||||
self.connection.subscribe(self.arn_topic, protocol, endpoint)
|
||||
|
||||
|
||||
def _delete_subscriptions(self):
|
||||
# NOTE: subscriptions in 'PendingConfirmation' timeout in 3 days
|
||||
# https://forums.aws.amazon.com/thread.jspa?threadID=85993
|
||||
for sub in self.subscriptions_existing:
|
||||
if sub['SubscriptionArn'] != 'PendingConfirmation':
|
||||
self.subscriptions_deleted.append(sub['SubscriptionArn'])
|
||||
self.changed = True
|
||||
if not self.check_mode:
|
||||
self.connection.unsubscribe(sub['SubscriptionArn'])
|
||||
|
||||
|
||||
def _delete_topic(self):
|
||||
self.topic_deleted = True
|
||||
self.changed = True
|
||||
if not self.check_mode:
|
||||
self.connection.delete_topic(self.arn_topic)
|
||||
|
||||
|
||||
def ensure_ok(self):
|
||||
self.arn_topic = self._arn_topic_lookup()
|
||||
if not self.arn_topic:
|
||||
self._create_topic()
|
||||
self._set_topic_attrs()
|
||||
self._get_topic_subs()
|
||||
self._set_topic_subs()
|
||||
|
||||
def ensure_gone(self):
|
||||
self.arn_topic = self._arn_topic_lookup()
|
||||
if self.arn_topic:
|
||||
self._get_topic_subs()
|
||||
if self.subscriptions_existing:
|
||||
self._delete_subscriptions()
|
||||
self._delete_topic()
|
||||
|
||||
|
||||
def get_info(self):
|
||||
info = {
|
||||
'name': self.name,
|
||||
'state': self.state,
|
||||
'display_name': self.display_name,
|
||||
'policy': self.policy,
|
||||
'delivery_policy': self.delivery_policy,
|
||||
'subscriptions_new': self.subscriptions,
|
||||
'subscriptions_existing': self.subscriptions_existing,
|
||||
'subscriptions_deleted': self.subscriptions_deleted,
|
||||
'subscriptions_added': self.subscriptions_added,
|
||||
'subscriptions_purge': self.purge_subscriptions,
|
||||
'check_mode': self.check_mode,
|
||||
'topic_created': self.topic_created,
|
||||
'topic_deleted': self.topic_deleted,
|
||||
'attributes_set': self.attributes_set
|
||||
}
|
||||
|
||||
return info
|
||||
|
||||
|
||||
|
||||
def main():
|
||||
|
@ -190,7 +356,8 @@ def main():
|
|||
)
|
||||
)
|
||||
|
||||
module = AnsibleModule(argument_spec=argument_spec, supports_check_mode=True)
|
||||
module = AnsibleModule(argument_spec=argument_spec,
|
||||
supports_check_mode=True)
|
||||
|
||||
if not HAS_BOTO:
|
||||
module.fail_json(msg='boto required for this module')
|
||||
|
@ -203,142 +370,35 @@ def main():
|
|||
subscriptions = module.params.get('subscriptions')
|
||||
purge_subscriptions = module.params.get('purge_subscriptions')
|
||||
check_mode = module.check_mode
|
||||
changed = False
|
||||
|
||||
topic_created = False
|
||||
attributes_set = []
|
||||
subscriptions_added = []
|
||||
subscriptions_deleted = []
|
||||
|
||||
region, ec2_url, aws_connect_params = get_aws_connection_info(module)
|
||||
if not region:
|
||||
module.fail_json(msg="region must be specified")
|
||||
try:
|
||||
connection = connect_to_aws(boto.sns, region, **aws_connect_params)
|
||||
except boto.exception.NoAuthHandlerFound, e:
|
||||
module.fail_json(msg=str(e))
|
||||
|
||||
# topics cannot contain ':', so thats the decider
|
||||
if ':' in name:
|
||||
all_topics = get_all_topics(connection, module)
|
||||
if name in all_topics:
|
||||
arn_topic = name
|
||||
elif state == 'absent':
|
||||
module.exit_json(changed=False)
|
||||
else:
|
||||
module.fail_json(msg="specified an ARN for a topic but it doesn't"
|
||||
" exist")
|
||||
else:
|
||||
arn_topic = arn_topic_lookup(connection, name, module)
|
||||
if not arn_topic:
|
||||
if state == 'absent':
|
||||
module.exit_json(changed=False)
|
||||
elif check_mode:
|
||||
module.exit_json(changed=True, topic_created=True,
|
||||
subscriptions_added=subscriptions,
|
||||
subscriptions_deleted=[])
|
||||
sns_topic = SnsTopicManager(module,
|
||||
name,
|
||||
state,
|
||||
display_name,
|
||||
policy,
|
||||
delivery_policy,
|
||||
subscriptions,
|
||||
purge_subscriptions,
|
||||
check_mode,
|
||||
region,
|
||||
**aws_connect_params)
|
||||
|
||||
changed=True
|
||||
topic_created = True
|
||||
try:
|
||||
connection.create_topic(name)
|
||||
except BotoServerError, e:
|
||||
module.fail_json(msg=e.message)
|
||||
arn_topic = arn_topic_lookup(connection, name, module)
|
||||
while not arn_topic:
|
||||
time.sleep(3)
|
||||
arn_topic = arn_topic_lookup(connection, name, module)
|
||||
if state == 'present':
|
||||
sns_topic.ensure_ok()
|
||||
|
||||
if arn_topic and state == "absent":
|
||||
if not check_mode:
|
||||
try:
|
||||
connection.delete_topic(arn_topic)
|
||||
except BotoServerError, e:
|
||||
module.fail_json(msg=e.message)
|
||||
elif state == 'absent':
|
||||
sns_topic.ensure_gone()
|
||||
|
||||
module.exit_json(changed=True)
|
||||
sns_facts = dict(changed=sns_topic.changed,
|
||||
sns_arn=sns_topic.arn_topic,
|
||||
sns_topic=sns_topic.get_info())
|
||||
|
||||
topic_attributes = connection.get_topic_attributes(arn_topic) \
|
||||
['GetTopicAttributesResponse'] ['GetTopicAttributesResult'] \
|
||||
['Attributes']
|
||||
if display_name and display_name != topic_attributes['DisplayName']:
|
||||
changed = True
|
||||
attributes_set.append('display_name')
|
||||
if not check_mode:
|
||||
try:
|
||||
connection.set_topic_attributes(arn_topic, 'DisplayName', display_name)
|
||||
except BotoServerError, e:
|
||||
module.fail_json(msg=e.message)
|
||||
module.exit_json(**sns_facts)
|
||||
|
||||
if policy and policy != json.loads(topic_attributes['Policy']):
|
||||
changed = True
|
||||
attributes_set.append('policy')
|
||||
if not check_mode:
|
||||
try:
|
||||
connection.set_topic_attributes(arn_topic, 'Policy', json.dumps(policy))
|
||||
except BotoServerError, e:
|
||||
module.fail_json(msg=e.message)
|
||||
|
||||
if delivery_policy and ('DeliveryPolicy' not in topic_attributes or \
|
||||
delivery_policy != json.loads(topic_attributes['DeliveryPolicy'])):
|
||||
changed = True
|
||||
attributes_set.append('delivery_policy')
|
||||
if not check_mode:
|
||||
try:
|
||||
connection.set_topic_attributes(arn_topic, 'DeliveryPolicy',json.dumps(delivery_policy))
|
||||
except BotoServerError, e:
|
||||
module.fail_json(msg=e.message)
|
||||
|
||||
|
||||
next_token = None
|
||||
aws_subscriptions = []
|
||||
while True:
|
||||
try:
|
||||
response = connection.get_all_subscriptions_by_topic(arn_topic,
|
||||
next_token)
|
||||
except BotoServerError, e:
|
||||
module.fail_json(msg=e.message)
|
||||
|
||||
aws_subscriptions.extend(response['ListSubscriptionsByTopicResponse'] \
|
||||
['ListSubscriptionsByTopicResult']['Subscriptions'])
|
||||
next_token = response['ListSubscriptionsByTopicResponse'] \
|
||||
['ListSubscriptionsByTopicResult']['NextToken']
|
||||
if not next_token:
|
||||
break
|
||||
|
||||
desired_subscriptions = [(sub['protocol'],
|
||||
canonicalize_endpoint(sub['protocol'], sub['endpoint'])) for sub in
|
||||
subscriptions]
|
||||
|
||||
aws_subscriptions_list = []
|
||||
|
||||
for sub in aws_subscriptions:
|
||||
sub_key = (sub['Protocol'], sub['Endpoint'])
|
||||
aws_subscriptions_list.append(sub_key)
|
||||
if purge_subscriptions and sub_key not in desired_subscriptions and \
|
||||
sub['SubscriptionArn'] != 'PendingConfirmation':
|
||||
changed = True
|
||||
subscriptions_deleted.append(sub_key)
|
||||
if not check_mode:
|
||||
try:
|
||||
connection.unsubscribe(sub['SubscriptionArn'])
|
||||
except BotoServerError, e:
|
||||
module.fail_json(msg=e.message)
|
||||
|
||||
for (protocol, endpoint) in desired_subscriptions:
|
||||
if (protocol, endpoint) not in aws_subscriptions_list:
|
||||
changed = True
|
||||
subscriptions_added.append(sub)
|
||||
if not check_mode:
|
||||
try:
|
||||
connection.subscribe(arn_topic, protocol, endpoint)
|
||||
except BotoServerError, e:
|
||||
module.fail_json(msg=e.message)
|
||||
|
||||
module.exit_json(changed=changed, topic_created=topic_created,
|
||||
attributes_set=attributes_set,
|
||||
subscriptions_added=subscriptions_added,
|
||||
subscriptions_deleted=subscriptions_deleted, sns_arn=arn_topic)
|
||||
|
||||
from ansible.module_utils.basic import *
|
||||
from ansible.module_utils.ec2 import *
|
||||
|
|
Loading…
Add table
Reference in a new issue