diff --git a/zaza/openstack/charm_tests/rabbitmq_server/tests.py b/zaza/openstack/charm_tests/rabbitmq_server/tests.py index 6ee156e..9901270 100644 --- a/zaza/openstack/charm_tests/rabbitmq_server/tests.py +++ b/zaza/openstack/charm_tests/rabbitmq_server/tests.py @@ -57,6 +57,41 @@ class RmqTests(test_utils.OpenStackBaseTest): ssl=ssl, port=port) + @tenacity.retry(reraise=True, stop=tenacity.stop_after_attempt(30), + retry=tenacity.retry_if_exception_type(AssertionError)) + def _search_for_message(self, amqp_msg, check_unit, ssl, port, + amqp_msg_counter): + """Search for message in message queue. + + WARNING: This will consume messages until it finds the target message. + + :param amqp_msg: Message to search for + :type amqp_msg: string + :param check_unit: Unit to retrieve messages from + :type check_unit: juju.unit.Unit + :param ssl: Whether to use SSL when connecting to rabbit + :type ssl: bool + :param port: Port to use when connecting to rabbit + :type port: Union[int, None] + :param amqp_msg_counter: Number in test sequence of this message. + :type amqp_msg: int + :raises: tenacity.RetryError + """ + amqp_msg_rcvd = self._retry_get_amqp_message( + check_unit, + ssl=ssl, + port=port) + + try: + logging.info('Looking for message {}'.format(amqp_msg)) + # Validate amqp message content + assert amqp_msg == amqp_msg_rcvd + logging.info('Message {} received OK.'.format(amqp_msg_counter)) + except AssertionError as err: + logging.info('Expected: {}'.format(amqp_msg)) + logging.info('Actual: {}'.format(amqp_msg_rcvd)) + raise err + def _test_rmq_amqp_messages_all_units(self, units, ssl=False, port=None): """Reusable test to send/check amqp messages to every listed rmq unit. @@ -111,25 +146,21 @@ class RmqTests(test_utils.OpenStackBaseTest): port=port) # Get amqp message - logging.info('Get message from: {} ' + logging.info('Get messages from: {} ' '({} {})'.format(check_unit_host, check_unit_name, check_unit_host_name)) - amqp_msg_rcvd = self._retry_get_amqp_message(check_unit, - ssl=ssl, - port=port) - - # Validate amqp message content - if amqp_msg == amqp_msg_rcvd: - logging.info('Message {} received ' - 'OK.'.format(amqp_msg_counter)) - else: - logging.error('Expected: {}'.format(amqp_msg)) - logging.error('Actual: {}'.format(amqp_msg_rcvd)) - msg = 'Message {} mismatch.'.format(amqp_msg_counter) + try: + self._search_for_message( + amqp_msg, + check_unit, + ssl, + port, + amqp_msg_counter) + except tenacity.RetryError: + msg = 'Message {} not found.'.format(amqp_msg_counter) raise Exception(msg) - amqp_msg_counter += 1 # Delete the test user