Merge pull request #684 from gnuoy/bug/1949372
Search for message in rabbit tests
This commit is contained in:
@@ -49,6 +49,7 @@ class RmqTests(test_utils.OpenStackBaseTest):
|
||||
return '[{}-{}]'.format(uuid.uuid4(), time.time())
|
||||
|
||||
@tenacity.retry(
|
||||
reraise=True,
|
||||
retry=tenacity.retry_if_exception_type(RmqNoMessageException),
|
||||
wait=tenacity.wait_fixed(10),
|
||||
stop=tenacity.stop_after_attempt(2))
|
||||
@@ -57,6 +58,40 @@ class RmqTests(test_utils.OpenStackBaseTest):
|
||||
ssl=ssl,
|
||||
port=port)
|
||||
|
||||
def _search_for_message(self, amqp_msg, check_unit, ssl, port,
|
||||
amqp_msg_counter):
|
||||
"""Search for message in message queue.
|
||||
|
||||
WARNING: This will consume messages until it finds the target message.
|
||||
|
||||
:param amqp_msg: Message to search for
|
||||
:type amqp_msg: string
|
||||
:param check_unit: Unit to retrieve messages from
|
||||
:type check_unit: juju.unit.Unit
|
||||
:param ssl: Whether to use SSL when connecting to rabbit
|
||||
:type ssl: bool
|
||||
:param port: Port to use when connecting to rabbit
|
||||
:type port: Union[int, None]
|
||||
:param amqp_msg_counter: Number in test sequence of this message.
|
||||
:type amqp_msg: int
|
||||
:raises: RmqNoMessageException
|
||||
"""
|
||||
for i in range(100):
|
||||
amqp_msg_rcvd = self._retry_get_amqp_message(
|
||||
check_unit,
|
||||
ssl=ssl,
|
||||
port=port)
|
||||
if amqp_msg == amqp_msg_rcvd:
|
||||
logging.info(
|
||||
'Message {} received OK.'.format(amqp_msg_counter))
|
||||
break
|
||||
else:
|
||||
logging.info('Expected: {}'.format(amqp_msg))
|
||||
logging.info('Actual: {}'.format(amqp_msg_rcvd))
|
||||
else:
|
||||
msg = 'Message {} not found.'.format(amqp_msg_counter)
|
||||
raise RmqNoMessageException(msg)
|
||||
|
||||
def _test_rmq_amqp_messages_all_units(self, units,
|
||||
ssl=False, port=None):
|
||||
"""Reusable test to send/check amqp messages to every listed rmq unit.
|
||||
@@ -111,25 +146,22 @@ class RmqTests(test_utils.OpenStackBaseTest):
|
||||
port=port)
|
||||
|
||||
# Get amqp message
|
||||
logging.info('Get message from: {} '
|
||||
logging.info('Get messages from: {} '
|
||||
'({} {})'.format(check_unit_host,
|
||||
check_unit_name,
|
||||
check_unit_host_name))
|
||||
|
||||
amqp_msg_rcvd = self._retry_get_amqp_message(check_unit,
|
||||
ssl=ssl,
|
||||
port=port)
|
||||
|
||||
# Validate amqp message content
|
||||
if amqp_msg == amqp_msg_rcvd:
|
||||
logging.info('Message {} received '
|
||||
'OK.'.format(amqp_msg_counter))
|
||||
else:
|
||||
logging.error('Expected: {}'.format(amqp_msg))
|
||||
logging.error('Actual: {}'.format(amqp_msg_rcvd))
|
||||
msg = 'Message {} mismatch.'.format(amqp_msg_counter)
|
||||
try:
|
||||
self._search_for_message(
|
||||
amqp_msg,
|
||||
check_unit,
|
||||
ssl,
|
||||
port,
|
||||
amqp_msg_counter)
|
||||
except RmqNoMessageException:
|
||||
msg = 'Failed to retrieve message {}.'.format(
|
||||
amqp_msg_counter)
|
||||
raise Exception(msg)
|
||||
|
||||
amqp_msg_counter += 1
|
||||
|
||||
# Delete the test user
|
||||
|
||||
@@ -506,9 +506,9 @@ def get_amqp_message_by_unit(unit, queue="test",
|
||||
password=password)
|
||||
channel = connection.channel()
|
||||
method_frame, _, body = channel.basic_get(queue)
|
||||
body = body.decode()
|
||||
|
||||
if method_frame:
|
||||
body = body.decode()
|
||||
logging.debug('Retreived message from {} queue:\n{}'.format(queue,
|
||||
body))
|
||||
channel.basic_ack(method_frame.delivery_tag)
|
||||
|
||||
Reference in New Issue
Block a user