Add a function for creating connection to Rmq instance on unit
This commit is contained in:
@@ -14,8 +14,11 @@
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
import pika
|
||||
import zaza.model
|
||||
|
||||
import ssl as libssl
|
||||
|
||||
def wait_for_cluster(model_name=None, timeout=1200):
|
||||
"""Wait for rmq units extended status to show cluster readiness,
|
||||
@@ -65,3 +68,55 @@ def get_cluster_running_nodes(unit):
|
||||
return []
|
||||
|
||||
|
||||
def connect_amqp_by_unit(unit, ssl=False,
|
||||
port=None, fatal=True,
|
||||
username="testuser1", password="changeme"):
|
||||
"""Establish and return a pika amqp connection to the rabbitmq service
|
||||
running on a rmq juju unit.
|
||||
:param unit: unit pointer
|
||||
:param ssl: boolean, default to False
|
||||
:param port: amqp port, use defaults if None
|
||||
:param fatal: boolean, default to True (raises on connect error)
|
||||
:param username: amqp user name, default to testuser1
|
||||
:param password: amqp user password
|
||||
:returns: pika amqp connection pointer or None if failed and non-fatal
|
||||
"""
|
||||
host = unit.public_address
|
||||
unit_name = unit.entity_id
|
||||
|
||||
if ssl:
|
||||
ssl_options = pika.SSLOptions(libssl.SSLContext())
|
||||
else:
|
||||
ssl_options = None
|
||||
|
||||
# Default port logic if port is not specified
|
||||
if ssl and not port:
|
||||
port = 5671
|
||||
elif not ssl and not port:
|
||||
port = 5672
|
||||
|
||||
logging.debug('Connecting to amqp on {}:{} ({}) as '
|
||||
'{}...'.format(host, port, unit_name, username))
|
||||
|
||||
try:
|
||||
credentials = pika.PlainCredentials(username, password)
|
||||
parameters = pika.ConnectionParameters(host=host, port=port,
|
||||
credentials=credentials,
|
||||
ssl_options=ssl_options,
|
||||
connection_attempts=3,
|
||||
retry_delay=5,
|
||||
socket_timeout=1)
|
||||
connection = pika.BlockingConnection(parameters)
|
||||
assert connection.is_open is True
|
||||
logging.debug('Connect OK')
|
||||
return connection
|
||||
except Exception as e:
|
||||
msg = ('amqp connection failed to {}:{} as '
|
||||
'{} ({})'.format(host, port, username, str(e)))
|
||||
if fatal:
|
||||
raise Exception(msg)
|
||||
else:
|
||||
logging.warn(msg)
|
||||
return None
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user