Lookup plugin for rabbitmq (#44070)
* Adding a basic get lookup for rabbitmq. * Always return a list * If content type is JSON, make accessible via dict. * Fixed incorrect json.loads variable and missing raise * Change to document returned data * Fixed pep8 issues * Adding integration testing * Moving lookup intgration tests to new target * New rabbitmq lookup plugin (#44070). * New rabbitmq lookup plugin (#44070). * PR review feedback updates * Testing pika is installed * Minor mods to tests * Check if connection is already closed or closing * Updated tests and connection testing * PR review feedback updates * PR review include ValueError in AnsibleError output * Suggesting to use set_fact when using returned variable more than once. * Cleaned up some tests, added some notes and handling connection closure on some exceptions. * Removed finally statement and added some additional error handling. * Added some additional error handling. * PR review updates. * Additional integration tests and removing return in finally * Updated version * Changing back to running tests on ubuntu. * Additional tests * Running tests on Ubuntu only * Fixing syntax error * Fixing ingtegration tests and a string/byte issue * Removed non-required test and fixed BOTMETA * Trying to fix integration test failure on ubuntu1404 * Some issues occured when handling messages from the queue with to_native. Switching to to_text resolved the issues. * Renaming channel to queue (thanks dch). Disabling trusty tests.
This commit is contained in:
parent
3f2f53681d
commit
c4cfa387ea
6 changed files with 341 additions and 0 deletions
3
.github/BOTMETA.yml
vendored
3
.github/BOTMETA.yml
vendored
|
@ -926,6 +926,9 @@ files:
|
|||
support: network
|
||||
maintainers: $team_networking
|
||||
labels: networking
|
||||
lib/ansible/plugins/lookup/rabbitmq.py:
|
||||
maintainers: Im0
|
||||
support: community
|
||||
lib/ansible/plugins/netconf/sros.py:
|
||||
maintainers: wisotzky $team_networking
|
||||
labels: networking
|
||||
|
|
188
lib/ansible/plugins/lookup/rabbitmq.py
Normal file
188
lib/ansible/plugins/lookup/rabbitmq.py
Normal file
|
@ -0,0 +1,188 @@
|
|||
# (c) 2018, John Imison <john+github@imison.net>
|
||||
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
|
||||
from __future__ import (absolute_import, division, print_function)
|
||||
__metaclass__ = type
|
||||
|
||||
DOCUMENTATION = """
|
||||
lookup: rabbitmq
|
||||
author: John Imison <@Im0>
|
||||
version_added: "2.8"
|
||||
short_description: Retrieve messages from an AMQP/AMQPS RabbitMQ queue.
|
||||
description:
|
||||
- This lookup uses a basic get to retrieve all, or a limited number C(count), messages from a RabbitMQ queue.
|
||||
options:
|
||||
url:
|
||||
description:
|
||||
- An URI connection string to connect to the AMQP/AMQPS RabbitMQ server.
|
||||
- For more information refer to the URI spec U(https://www.rabbitmq.com/uri-spec.html).
|
||||
required: True
|
||||
queue:
|
||||
description:
|
||||
- The queue to get messages from.
|
||||
required: True
|
||||
count:
|
||||
description:
|
||||
- How many messages to collect from the queue.
|
||||
- If not set, defaults to retrieving all the messages from the queue.
|
||||
requirements:
|
||||
- The python pika package U(https://pypi.org/project/pika/).
|
||||
notes:
|
||||
- This lookup implements BlockingChannel.basic_get to get messages from a RabbitMQ server.
|
||||
- After retrieving a message from the server, receipt of the message is acknowledged and the message on the server is deleted.
|
||||
- Pika is a pure-Python implementation of the AMQP 0-9-1 protocol that tries to stay fairly independent of the underlying network support library.
|
||||
- More information about pika can be found at U(https://pika.readthedocs.io/en/stable/).
|
||||
- This plugin is tested against RabbitMQ. Other AMQP 0.9.1 protocol based servers may work but not tested/guaranteed.
|
||||
- Assigning the return messages to a variable under C(vars) may result in unexpected results as the lookup is evaluated every time the
|
||||
variable is referenced.
|
||||
- Currently this plugin only handles text based messages from a queue. Unexpected results may occur when retrieving binary data.
|
||||
"""
|
||||
|
||||
|
||||
EXAMPLES = """
|
||||
- name: Get all messages off a queue
|
||||
debug:
|
||||
msg: "{{ lookup('rabbitmq', url='amqp://guest:guest@192.168.0.10:5672/%2F', queue='hello') }}"
|
||||
|
||||
|
||||
# If you are intending on using the returned messages as a variable in more than
|
||||
# one task (eg. debug, template), it is recommended to set_fact.
|
||||
|
||||
- name: Get 2 messages off a queue and set a fact for re-use
|
||||
set_fact:
|
||||
messages: "{{ lookup('rabbitmq', url='amqp://guest:guest@192.168.0.10:5672/%2F', queue='hello', count=2) }}"
|
||||
|
||||
- name: Dump out contents of the messages
|
||||
debug:
|
||||
var: messages
|
||||
|
||||
"""
|
||||
|
||||
RETURN = """
|
||||
_list:
|
||||
description:
|
||||
- A list of dictionaries with keys and value from the queue.
|
||||
type: list
|
||||
contains:
|
||||
content_type:
|
||||
description: The content_type on the message in the queue.
|
||||
type: str
|
||||
delivery_mode:
|
||||
description: The delivery_mode on the message in the queue.
|
||||
type: str
|
||||
delivery_tag:
|
||||
description: The delivery_tag on the message in the queue.
|
||||
type: str
|
||||
exchange:
|
||||
description: The exchange the message came from.
|
||||
type: str
|
||||
message_count:
|
||||
description: The message_count for the message on the queue.
|
||||
type: str
|
||||
msg:
|
||||
description: The content of the message.
|
||||
type: str
|
||||
redelivered:
|
||||
description: The redelivered flag. True if the message has been delivered before.
|
||||
type: bool
|
||||
routing_key:
|
||||
description: The routing_key on the message in the queue.
|
||||
type: str
|
||||
json:
|
||||
description: If application/json is specified in content_type, json will be loaded into variables.
|
||||
type: dict
|
||||
|
||||
"""
|
||||
|
||||
from ansible.errors import AnsibleError, AnsibleParserError
|
||||
from ansible.plugins.lookup import LookupBase
|
||||
from ansible.module_utils._text import to_native, to_text
|
||||
import json
|
||||
|
||||
try:
|
||||
from __main__ import display
|
||||
except ImportError:
|
||||
from ansible.utils.display import Display
|
||||
display = Display()
|
||||
|
||||
try:
|
||||
import pika
|
||||
from pika import spec
|
||||
HAS_PIKA = True
|
||||
except ImportError:
|
||||
HAS_PIKA = False
|
||||
|
||||
|
||||
class LookupModule(LookupBase):
|
||||
|
||||
def run(self, terms, variables=None, url=None, queue=None, count=None):
|
||||
if not HAS_PIKA:
|
||||
raise AnsibleError('pika python package is required for rabbitmq lookup.')
|
||||
if not url:
|
||||
raise AnsibleError('URL is required for rabbitmq lookup.')
|
||||
if not queue:
|
||||
raise AnsibleError('Queue is required for rabbitmq lookup.')
|
||||
|
||||
display.vvv(u"terms:%s : variables:%s url:%s queue:%s count:%s" % (terms, variables, url, queue, count))
|
||||
|
||||
try:
|
||||
parameters = pika.URLParameters(url)
|
||||
except Exception as e:
|
||||
raise AnsibleError("URL malformed: %s" % to_native(e))
|
||||
|
||||
try:
|
||||
connection = pika.BlockingConnection(parameters)
|
||||
except Exception as e:
|
||||
raise AnsibleError("Connection issue: %s" % to_native(e))
|
||||
|
||||
try:
|
||||
conn_channel = connection.channel()
|
||||
except pika.exceptions.AMQPChannelError as e:
|
||||
try:
|
||||
connection.close()
|
||||
except pika.exceptions.AMQPConnectionError as ie:
|
||||
raise AnsibleError("Channel and connection closing issues: %s / %s" % to_native(e), to_native(ie))
|
||||
raise AnsibleError("Channel issue: %s" % to_native(e))
|
||||
|
||||
ret = []
|
||||
idx = 0
|
||||
|
||||
while True:
|
||||
method_frame, properties, body = conn_channel.basic_get(queue=queue)
|
||||
if method_frame:
|
||||
display.vvv(u"%s, %s, %s " % (method_frame, properties, to_text(body)))
|
||||
|
||||
# TODO: In the future consider checking content_type and handle text/binary data differently.
|
||||
msg_details = dict({
|
||||
'msg': to_text(body),
|
||||
'message_count': method_frame.message_count,
|
||||
'routing_key': method_frame.routing_key,
|
||||
'delivery_tag': method_frame.delivery_tag,
|
||||
'redelivered': method_frame.redelivered,
|
||||
'exchange': method_frame.exchange,
|
||||
'delivery_mode': properties.delivery_mode,
|
||||
'content_type': properties.content_type
|
||||
})
|
||||
if properties.content_type == 'application/json':
|
||||
try:
|
||||
msg_details['json'] = json.loads(msg_details['msg'])
|
||||
except ValueError as e:
|
||||
raise AnsibleError("Unable to decode JSON for message %s: %s" % (method_frame.delivery_tag, to_native(e)))
|
||||
|
||||
ret.append(msg_details)
|
||||
conn_channel.basic_ack(method_frame.delivery_tag)
|
||||
idx += 1
|
||||
if method_frame.message_count == 0 or idx == count:
|
||||
break
|
||||
# If we didn't get a method_frame, exit.
|
||||
else:
|
||||
break
|
||||
|
||||
if connection.is_closing or connection.is_closed:
|
||||
return [ret]
|
||||
else:
|
||||
try:
|
||||
connection.close()
|
||||
except pika.exceptions.AMQPConnectionError:
|
||||
pass
|
||||
return [ret]
|
5
test/integration/targets/rabbitmq_lookup/aliases
Normal file
5
test/integration/targets/rabbitmq_lookup/aliases
Normal file
|
@ -0,0 +1,5 @@
|
|||
destructive
|
||||
shippable/posix/group1
|
||||
skip/osx
|
||||
skip/freebsd
|
||||
skip/rhel
|
2
test/integration/targets/rabbitmq_lookup/meta/main.yml
Normal file
2
test/integration/targets/rabbitmq_lookup/meta/main.yml
Normal file
|
@ -0,0 +1,2 @@
|
|||
dependencies:
|
||||
- setup_rabbitmq
|
5
test/integration/targets/rabbitmq_lookup/tasks/main.yml
Normal file
5
test/integration/targets/rabbitmq_lookup/tasks/main.yml
Normal file
|
@ -0,0 +1,5 @@
|
|||
# Rabbitmq lookup
|
||||
- include: ubuntu.yml
|
||||
when:
|
||||
- ansible_distribution == 'Ubuntu'
|
||||
- ansible_distribution_release != 'trusty'
|
138
test/integration/targets/rabbitmq_lookup/tasks/ubuntu.yml
Normal file
138
test/integration/targets/rabbitmq_lookup/tasks/ubuntu.yml
Normal file
|
@ -0,0 +1,138 @@
|
|||
- name: Test failure without pika installed
|
||||
set_fact:
|
||||
rabbit_missing_pika: "{{ lookup('rabbitmq', url='amqp://guest:guest@192.168.250.1:5672/%2F', queue='hello', count=3) }}"
|
||||
ignore_errors: yes
|
||||
register: rabbitmq_missing_pika_error
|
||||
|
||||
- assert:
|
||||
that:
|
||||
- "'pika python package is required' in rabbitmq_missing_pika_error.msg"
|
||||
|
||||
- name: Install pika and requests
|
||||
pip:
|
||||
name: pika,requests
|
||||
state: latest
|
||||
|
||||
- name: Test that giving an incorrect amqp protocol in URL will error
|
||||
set_fact:
|
||||
rabbitmq_test_protocol: "{{ lookup('rabbitmq', url='zzzamqp://guest:guest@192.168.250.1:5672/%2F', queue='hello', count=3) }}"
|
||||
ignore_errors: yes
|
||||
register: rabbitmq_protocol_error
|
||||
|
||||
- assert:
|
||||
that:
|
||||
- "rabbitmq_protocol_error is failed"
|
||||
- "'URL malformed' in rabbitmq_protocol_error.msg"
|
||||
|
||||
- name: Test that giving an incorrect IP address in URL will error
|
||||
set_fact:
|
||||
rabbitmq_test_protocol: "{{ lookup('rabbitmq', url='amqp://guest:guest@xxxxx192.112312368.250.1:5672/%2F', queue='hello', count=3) }}"
|
||||
ignore_errors: yes
|
||||
register: rabbitmq_ip_error
|
||||
|
||||
- assert:
|
||||
that:
|
||||
- "rabbitmq_ip_error is failed"
|
||||
- "'Connection issue' in rabbitmq_ip_error.msg"
|
||||
|
||||
- name: Test missing parameters will error
|
||||
set_fact:
|
||||
rabbitmq_test_protocol: "{{ lookup('rabbitmq') }}"
|
||||
ignore_errors: yes
|
||||
register: rabbitmq_params_error
|
||||
|
||||
- assert:
|
||||
that:
|
||||
- "rabbitmq_params_error is failed"
|
||||
- "'URL is required for rabbitmq lookup.' in rabbitmq_params_error.msg"
|
||||
|
||||
- name: Test missing queue will error
|
||||
set_fact:
|
||||
rabbitmq_queue_protocol: "{{ lookup('rabbitmq', url='amqp://guest:guest@192.168.250.1:5672/%2F') }}"
|
||||
ignore_errors: yes
|
||||
register: rabbitmq_queue_error
|
||||
|
||||
- assert:
|
||||
that:
|
||||
- "rabbitmq_queue_error is failed"
|
||||
- "'Queue is required for rabbitmq lookup' in rabbitmq_queue_error.msg"
|
||||
|
||||
- name: Enables the rabbitmq_management plugin
|
||||
rabbitmq_plugin:
|
||||
names: rabbitmq_management
|
||||
state: enabled
|
||||
|
||||
- name: Setup test queue
|
||||
rabbitmq_queue:
|
||||
name: hello
|
||||
|
||||
- name: Post test message to the exchange (string)
|
||||
uri:
|
||||
url: http://localhost:15672/api/exchanges/%2f/amq.default/publish
|
||||
method: POST
|
||||
body: '{"properties":{},"routing_key":"hello","payload":"ansible-test","payload_encoding":"string"}'
|
||||
user: guest
|
||||
password: guest
|
||||
force_basic_auth: yes
|
||||
return_content: yes
|
||||
register: post_data
|
||||
headers:
|
||||
Content-Type: "application/json"
|
||||
|
||||
|
||||
- name: Post test message to the exchange (json)
|
||||
uri:
|
||||
url: http://localhost:15672/api/exchanges/%2f/amq.default/publish
|
||||
method: POST
|
||||
body: '{"properties":{"content_type": "application/json"},"routing_key":"hello","payload":"{\"key\": \"value\" }","payload_encoding":"string"}'
|
||||
user: guest
|
||||
password: guest
|
||||
force_basic_auth: yes
|
||||
return_content: yes
|
||||
register: post_data_json
|
||||
headers:
|
||||
Content-Type: "application/json"
|
||||
|
||||
- name: Test retrieve messages
|
||||
set_fact:
|
||||
rabbitmq_msg: "{{ lookup('rabbitmq', url='amqp://guest:guest@localhost:5672/%2f/hello', queue='hello') }}"
|
||||
ignore_errors: yes
|
||||
register: rabbitmq_msg_error
|
||||
|
||||
- name: Ensure two messages received
|
||||
assert:
|
||||
that:
|
||||
- "rabbitmq_msg_error is not failed"
|
||||
- rabbitmq_msg | length == 2
|
||||
|
||||
- name: Ensure first message is a string
|
||||
assert:
|
||||
that:
|
||||
- rabbitmq_msg[0].msg == "ansible-test"
|
||||
|
||||
- name: Ensure second message is json
|
||||
assert:
|
||||
that:
|
||||
- rabbitmq_msg[1].json.key == "value"
|
||||
|
||||
- name: Test missing vhost
|
||||
set_fact:
|
||||
rabbitmq_msg: "{{ lookup('rabbitmq', url='amqp://guest:guest@localhost:5672/missing/', queue='hello') }}"
|
||||
ignore_errors: yes
|
||||
register: rabbitmq_vhost_error
|
||||
|
||||
- assert:
|
||||
that:
|
||||
- "rabbitmq_vhost_error is failed"
|
||||
- "'NOT_ALLOWED' in rabbitmq_vhost_error.msg"
|
||||
|
||||
# Tidy up
|
||||
- name: Uninstall pika and requests
|
||||
pip:
|
||||
name: pika,requests
|
||||
state: absent
|
||||
|
||||
- name: Disable the rabbitmq_management plugin
|
||||
rabbitmq_plugin:
|
||||
names: rabbitmq_management
|
||||
state: disabled
|
Loading…
Reference in a new issue