Add a function for retrieving a message to an Rmq unit
This commit is contained in:
@@ -380,3 +380,34 @@ def publish_amqp_message_by_unit(unit, message,
|
||||
connection.close()
|
||||
|
||||
|
||||
def get_amqp_message_by_unit(unit, queue="test",
|
||||
username="testuser1",
|
||||
password="changeme",
|
||||
ssl=False, port=None):
|
||||
"""Get an amqp message from a rmq juju unit.
|
||||
:param unit: unit pointer
|
||||
:param queue: message queue, default to test
|
||||
:param username: amqp user name, default to testuser1
|
||||
:param password: amqp user password
|
||||
:param ssl: boolean, default to False
|
||||
:param port: amqp port, use defaults if None
|
||||
:returns: amqp message body as string. Raise if get fails.
|
||||
"""
|
||||
connection = connect_amqp_by_unit(unit, ssl=ssl,
|
||||
port=port,
|
||||
username=username,
|
||||
password=password)
|
||||
channel = connection.channel()
|
||||
method_frame, _, body = channel.basic_get(queue)
|
||||
body = body.decode()
|
||||
|
||||
if method_frame:
|
||||
logging.debug('Retreived message from {} queue:\n{}'.format(queue,
|
||||
body))
|
||||
channel.basic_ack(method_frame.delivery_tag)
|
||||
channel.close()
|
||||
connection.close()
|
||||
return body
|
||||
else:
|
||||
msg = 'No message retrieved.'
|
||||
raise Exception(msg)
|
||||
|
||||
Reference in New Issue
Block a user