From 1acbfd874163112fbd7a681f152b348d1f08fc41 Mon Sep 17 00:00:00 2001 From: Edin Sarajlic Date: Thu, 19 Sep 2019 15:29:56 +1000 Subject: [PATCH] Add a function for retrieving a message to an Rmq unit --- .../charm_tests/rabbitmq_server/utils.py | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/zaza/openstack/charm_tests/rabbitmq_server/utils.py b/zaza/openstack/charm_tests/rabbitmq_server/utils.py index fd08ee5..2447f8f 100644 --- a/zaza/openstack/charm_tests/rabbitmq_server/utils.py +++ b/zaza/openstack/charm_tests/rabbitmq_server/utils.py @@ -380,3 +380,34 @@ def publish_amqp_message_by_unit(unit, message, connection.close() +def get_amqp_message_by_unit(unit, queue="test", + username="testuser1", + password="changeme", + ssl=False, port=None): + """Get an amqp message from a rmq juju unit. + :param unit: unit pointer + :param queue: message queue, default to test + :param username: amqp user name, default to testuser1 + :param password: amqp user password + :param ssl: boolean, default to False + :param port: amqp port, use defaults if None + :returns: amqp message body as string. Raise if get fails. + """ + connection = connect_amqp_by_unit(unit, ssl=ssl, + port=port, + username=username, + password=password) + channel = connection.channel() + method_frame, _, body = channel.basic_get(queue) + body = body.decode() + + if method_frame: + logging.debug('Retreived message from {} queue:\n{}'.format(queue, + body)) + channel.basic_ack(method_frame.delivery_tag) + channel.close() + connection.close() + return body + else: + msg = 'No message retrieved.' + raise Exception(msg)