Search for message in rabbit tests
The current rabbit tests post a message on one unit and then consumes the first message on another unit. If the two do not match then the test fails. This means that a single pre-existing message can break all these tests as the consumer always gets the wrong message. This change updates the tests to search for the target message rather than assuming that the first message is the target message. Messages that are not the target message are not reposted so any pre-existing messages are consumed and in effect thrown away but given these are all test messages in a test queue and the tests do not run in parallel this should be fine.
This commit is contained in:
@@ -57,6 +57,41 @@ class RmqTests(test_utils.OpenStackBaseTest):
|
||||
ssl=ssl,
|
||||
port=port)
|
||||
|
||||
@tenacity.retry(reraise=True, stop=tenacity.stop_after_attempt(30),
|
||||
retry=tenacity.retry_if_exception_type(AssertionError))
|
||||
def _search_for_message(self, amqp_msg, check_unit, ssl, port,
|
||||
amqp_msg_counter):
|
||||
"""Search for message in message queue.
|
||||
|
||||
WARNING: This will consume messages until it finds the target message.
|
||||
|
||||
:param amqp_msg: Message to search for
|
||||
:type amqp_msg: string
|
||||
:param check_unit: Unit to retrieve messages from
|
||||
:type check_unit: juju.unit.Unit
|
||||
:param ssl: Whether to use SSL when connecting to rabbit
|
||||
:type ssl: bool
|
||||
:param port: Port to use when connecting to rabbit
|
||||
:type port: Union[int, None]
|
||||
:param amqp_msg_counter: Number in test sequence of this message.
|
||||
:type amqp_msg: int
|
||||
:raises: tenacity.RetryError
|
||||
"""
|
||||
amqp_msg_rcvd = self._retry_get_amqp_message(
|
||||
check_unit,
|
||||
ssl=ssl,
|
||||
port=port)
|
||||
|
||||
try:
|
||||
logging.info('Looking for message {}'.format(amqp_msg))
|
||||
# Validate amqp message content
|
||||
assert amqp_msg == amqp_msg_rcvd
|
||||
logging.info('Message {} received OK.'.format(amqp_msg_counter))
|
||||
except AssertionError as err:
|
||||
logging.info('Expected: {}'.format(amqp_msg))
|
||||
logging.info('Actual: {}'.format(amqp_msg_rcvd))
|
||||
raise err
|
||||
|
||||
def _test_rmq_amqp_messages_all_units(self, units,
|
||||
ssl=False, port=None):
|
||||
"""Reusable test to send/check amqp messages to every listed rmq unit.
|
||||
@@ -111,25 +146,21 @@ class RmqTests(test_utils.OpenStackBaseTest):
|
||||
port=port)
|
||||
|
||||
# Get amqp message
|
||||
logging.info('Get message from: {} '
|
||||
logging.info('Get messages from: {} '
|
||||
'({} {})'.format(check_unit_host,
|
||||
check_unit_name,
|
||||
check_unit_host_name))
|
||||
|
||||
amqp_msg_rcvd = self._retry_get_amqp_message(check_unit,
|
||||
ssl=ssl,
|
||||
port=port)
|
||||
|
||||
# Validate amqp message content
|
||||
if amqp_msg == amqp_msg_rcvd:
|
||||
logging.info('Message {} received '
|
||||
'OK.'.format(amqp_msg_counter))
|
||||
else:
|
||||
logging.error('Expected: {}'.format(amqp_msg))
|
||||
logging.error('Actual: {}'.format(amqp_msg_rcvd))
|
||||
msg = 'Message {} mismatch.'.format(amqp_msg_counter)
|
||||
try:
|
||||
self._search_for_message(
|
||||
amqp_msg,
|
||||
check_unit,
|
||||
ssl,
|
||||
port,
|
||||
amqp_msg_counter)
|
||||
except tenacity.RetryError:
|
||||
msg = 'Message {} not found.'.format(amqp_msg_counter)
|
||||
raise Exception(msg)
|
||||
|
||||
amqp_msg_counter += 1
|
||||
|
||||
# Delete the test user
|
||||
|
||||
Reference in New Issue
Block a user