diff --git a/zaza/openstack/charm_tests/rabbitmq_server/utils.py b/zaza/openstack/charm_tests/rabbitmq_server/utils.py index 450855a..fd08ee5 100644 --- a/zaza/openstack/charm_tests/rabbitmq_server/utils.py +++ b/zaza/openstack/charm_tests/rabbitmq_server/utils.py @@ -343,3 +343,40 @@ def connect_amqp_by_unit(unit, ssl=False, return None +def publish_amqp_message_by_unit(unit, message, + queue="test", ssl=False, + username="testuser1", + password="changeme", + port=None): + """Publish an amqp message to a rmq juju unit. + :param unit: unit pointer + :param message: amqp message string + :param queue: message queue, default to test + :param username: amqp user name, default to testuser1 + :param password: amqp user password + :param ssl: boolean, default to False + :param port: amqp port, use defaults if None + :returns: None. Raises exception if publish failed. + """ + logging.debug('Publishing message to {} queue:\n{}'.format(queue, + message)) + connection = connect_amqp_by_unit(unit, ssl=ssl, + port=port, + username=username, + password=password) + + # NOTE(beisner): extra debug here re: pika hang potential: + # https://github.com/pika/pika/issues/297 + # https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw + logging.debug('Defining channel...') + channel = connection.channel() + logging.debug('Declaring queue...') + channel.queue_declare(queue=queue, auto_delete=False, durable=True) + logging.debug('Publishing message...') + channel.basic_publish(exchange='', routing_key=queue, body=message) + logging.debug('Closing channel...') + channel.close() + logging.debug('Closing connection...') + connection.close() + +