diff --git a/unit_tests/utilities/test_zaza_utilities_generic.py b/unit_tests/utilities/test_zaza_utilities_generic.py index 9e0f39e..4aa73f6 100644 --- a/unit_tests/utilities/test_zaza_utilities_generic.py +++ b/unit_tests/utilities/test_zaza_utilities_generic.py @@ -575,3 +575,87 @@ class TestGenericUtils(ut_utils.BaseTestCase): self.telnet.side_effect = generic_utils.socket.error self.assertFalse(generic_utils.is_port_open(_port, _addr)) + + def test_get_unit_hostnames(self): + self.patch( + "zaza.openstack.utilities.generic.model.run_on_unit", + new_callable=mock.MagicMock(), + name="_run" + ) + + _unit1 = mock.MagicMock() + _unit1.entity_id = "testunit/1" + _unit2 = mock.MagicMock() + _unit2.entity_id = "testunit/2" + + _hostname1 = "host1.domain" + _hostname2 = "host2.domain" + + expected = { + _unit1.entity_id: _hostname1, + _unit2.entity_id: _hostname2, + } + + _units = [_unit1, _unit2] + + self._run.side_effect = [{"Stdout": _hostname1}, + {"Stdout": _hostname2}] + + actual = generic_utils.get_unit_hostnames(_units) + + self.assertEqual(actual, expected) + + def test_port_knock_units(self): + self.patch( + "zaza.openstack.utilities.generic.is_port_open", + new_callable=mock.MagicMock(), + name="_is_port_open" + ) + + _units = [ + mock.MagicMock(), + mock.MagicMock(), + ] + + self._is_port_open.side_effect = [True, True] + self.assertIsNone(generic_utils.port_knock_units(_units)) + self.assertEqual(self._is_port_open.call_count, len(_units)) + + self._is_port_open.side_effect = [True, False] + self.assertIsNotNone(generic_utils.port_knock_units(_units)) + + # check when func is expecting failure, i.e. should succeed + self._is_port_open.reset_mock() + self._is_port_open.side_effect = [False, False] + self.assertIsNone(generic_utils.port_knock_units(_units, + expect_success=False)) + self.assertEqual(self._is_port_open.call_count, len(_units)) + + def test_check_commands_on_units(self): + self.patch( + "zaza.openstack.utilities.generic.model.run_on_unit", + new_callable=mock.MagicMock(), + name="_run" + ) + + num_units = 2 + _units = [mock.MagicMock() for i in range(num_units)] + + num_cmds = 3 + cmds = ["/usr/bin/fakecmd"] * num_cmds + + # Test success, all calls return 0 + # zero is a string to replicate run_on_unit return data type + _cmd_results = [{"Code": "0"}] * len(_units) * len(cmds) + self._run.side_effect = _cmd_results + + result = generic_utils.check_commands_on_units(cmds, _units) + self.assertIsNone(result) + self.assertEqual(self._run.call_count, len(_units) * len(cmds)) + + # Test failure, some call returns 1 + _cmd_results[2] = {"Code": "1"} + self._run.side_effect = _cmd_results + + result = generic_utils.check_commands_on_units(cmds, _units) + self.assertIsNotNone(result) diff --git a/zaza/openstack/charm_tests/rabbitmq_server/__init__.py b/zaza/openstack/charm_tests/rabbitmq_server/__init__.py new file mode 100644 index 0000000..cdac408 --- /dev/null +++ b/zaza/openstack/charm_tests/rabbitmq_server/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2019 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Collection of code for setting up and testing rabbitmq server.""" diff --git a/zaza/openstack/charm_tests/rabbitmq_server/tests.py b/zaza/openstack/charm_tests/rabbitmq_server/tests.py new file mode 100644 index 0000000..b114314 --- /dev/null +++ b/zaza/openstack/charm_tests/rabbitmq_server/tests.py @@ -0,0 +1,400 @@ +# Copyright 2019 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""RabbitMQ Testing.""" + +import json +import logging +import time +import uuid + +import juju +import tenacity +import zaza.model +import zaza.openstack.charm_tests.test_utils as test_utils +import zaza.openstack.utilities.generic as generic_utils + +from charmhelpers.core.host import CompareHostReleases +from zaza.openstack.utilities.generic import get_series + +from . import utils as rmq_utils +from .utils import RmqNoMessageException + + +class RmqTests(test_utils.OpenStackBaseTest): + """Zaza tests on a basic rabbitmq cluster deployment.""" + + @classmethod + def setUpClass(cls): + """Run class setup for running tests.""" + super(RmqTests, cls).setUpClass() + + def _get_uuid_epoch_stamp(self): + """Return a string based on uuid4 and epoch time. + + Useful in generating test messages which need to be unique-ish. + """ + return '[{}-{}]'.format(uuid.uuid4(), time.time()) + + @tenacity.retry( + retry=tenacity.retry_if_exception_type(RmqNoMessageException), + wait=tenacity.wait_fixed(10), + stop=tenacity.stop_after_attempt(2)) + def _retry_get_amqp_message(self, check_unit, ssl=None, port=None): + return rmq_utils.get_amqp_message_by_unit(check_unit, + ssl=ssl, + port=port) + + def _test_rmq_amqp_messages_all_units(self, units, + ssl=False, port=None): + """Reusable test to send/check amqp messages to every listed rmq unit. + + Reusable test to send amqp messages to every listed rmq + unit. Checks every listed rmq unit for messages. + :param units: list of units + :returns: None if successful. Raise on error. + + """ + # Add test user if it does not already exist + rmq_utils.add_user(units) + + # Handle ssl (includes wait-for-cluster) + if ssl: + rmq_utils.configure_ssl_on(units, port=port) + else: + rmq_utils.configure_ssl_off(units) + + # Publish and get amqp messages in all possible unit combinations. + # Qty of checks == (qty of units) ^ 2 + amqp_msg_counter = 1 + host_names = generic_utils.get_unit_hostnames(units) + + for dest_unit in units: + dest_unit_name = dest_unit.entity_id + dest_unit_host = dest_unit.public_address + dest_unit_host_name = host_names[dest_unit_name] + + for check_unit in units: + check_unit_name = check_unit.entity_id + check_unit_host = check_unit.public_address + check_unit_host_name = host_names[check_unit_name] + + amqp_msg_stamp = self._get_uuid_epoch_stamp() + amqp_msg = ('Message {}@{} {}'.format(amqp_msg_counter, + dest_unit_host, + amqp_msg_stamp)).upper() + # Publish amqp message + logging.debug('Publish message to: {} ' + '({} {})'.format(dest_unit_host, + dest_unit_name, + dest_unit_host_name)) + + rmq_utils.publish_amqp_message_by_unit(dest_unit, + amqp_msg, ssl=ssl, + port=port) + + # Get amqp message + logging.debug('Get message from: {} ' + '({} {})'.format(check_unit_host, + check_unit_name, + check_unit_host_name)) + + amqp_msg_rcvd = self._retry_get_amqp_message(check_unit, + ssl=ssl, + port=port) + + # Validate amqp message content + if amqp_msg == amqp_msg_rcvd: + logging.debug('Message {} received ' + 'OK.'.format(amqp_msg_counter)) + else: + logging.error('Expected: {}'.format(amqp_msg)) + logging.error('Actual: {}'.format(amqp_msg_rcvd)) + msg = 'Message {} mismatch.'.format(amqp_msg_counter) + raise Exception(msg) + + amqp_msg_counter += 1 + + # Delete the test user + rmq_utils.delete_user(units) + + def test_400_rmq_cluster_running_nodes(self): + """Verify cluster status shows every cluster node as running member.""" + logging.debug('Checking that all units are in cluster_status ' + 'running nodes...') + + units = zaza.model.get_units(self.application_name) + + ret = rmq_utils.validate_cluster_running_nodes(units) + self.assertIsNone(ret, msg=ret) + + logging.info('OK') + + def test_406_rmq_amqp_messages_all_units_ssl_off(self): + """Send (and check) amqp messages to every rmq unit. + + Sends amqp messages to every rmq unit, and check every rmq + unit for messages. Uses Standard amqp tcp port, no ssl. + + """ + logging.debug('Checking amqp message publish/get on all units ' + '(ssl off)...') + + units = zaza.model.get_units(self.application_name) + self._test_rmq_amqp_messages_all_units(units, ssl=False) + logging.info('OK') + + def test_408_rmq_amqp_messages_all_units_ssl_on(self): + """Send (and check) amqp messages to every rmq unit (ssl enabled). + + Sends amqp messages to every rmq unit, and check every rmq + unit for messages. Uses Standard ssl tcp port. + + """ + units = zaza.model.get_units(self.application_name) + + # http://pad.lv/1625044 + if CompareHostReleases(get_series(units[0])) <= 'trusty': + logging.info('SKIP') + logging.info('Skipping SSL tests due to client' + ' compatibility issues') + return + logging.debug('Checking amqp message publish/get on all units ' + '(ssl on)...') + + self._test_rmq_amqp_messages_all_units(units, + ssl=True, port=5671) + logging.info('OK') + + def test_410_rmq_amqp_messages_all_units_ssl_alt_port(self): + """Send (and check) amqp messages to every rmq unit (alt ssl port). + + Send amqp messages with ssl on, to every rmq unit and check + every rmq unit for messages. Custom ssl tcp port. + + """ + units = zaza.model.get_units(self.application_name) + + # http://pad.lv/1625044 + if CompareHostReleases(get_series(units[0])) <= 'trusty': + logging.info('SKIP') + logging.info('Skipping SSL tests due to client' + ' compatibility issues') + return + logging.debug('Checking amqp message publish/get on all units ' + '(ssl on)...') + + units = zaza.model.get_units(self.application_name) + self._test_rmq_amqp_messages_all_units(units, + ssl=True, port=5999) + logging.info('OK') + + @tenacity.retry( + retry=tenacity.retry_if_result(lambda ret: ret is not None), + wait=tenacity.wait_fixed(30), + stop=tenacity.stop_after_attempt(20), + after=rmq_utils._log_tenacity_retry) + def _retry_port_knock_units(self, units, port, expect_success=True): + return generic_utils.port_knock_units(units, port, + expect_success=expect_success) + + def test_412_rmq_management_plugin(self): + """Enable and check management plugin.""" + logging.debug('Checking tcp socket connect to management plugin ' + 'port on all rmq units...') + + units = zaza.model.get_units(self.application_name) + mgmt_port = 15672 + + # Enable management plugin + logging.debug('Enabling management_plugin charm config option...') + config = {'management_plugin': 'True'} + zaza.model.set_application_config('rabbitmq-server', config) + rmq_utils.wait_for_cluster() + + # Check tcp connect to management plugin port + ret = self._retry_port_knock_units(units, mgmt_port) + + self.assertIsNone(ret, msg=ret) + logging.debug('Connect to all units (OK)') + + # Disable management plugin + logging.debug('Disabling management_plugin charm config option...') + config = {'management_plugin': 'False'} + zaza.model.set_application_config('rabbitmq-server', config) + rmq_utils.wait_for_cluster() + + # Negative check - tcp connect to management plugin port + logging.info('Expect tcp connect fail since charm config ' + 'option is disabled.') + ret = self._retry_port_knock_units(units, + mgmt_port, + expect_success=False) + + self.assertIsNone(ret, msg=ret) + logging.info('Confirm mgmt port closed on all units (OK)') + + @tenacity.retry( + retry=tenacity.retry_if_result(lambda ret: ret is not None), + # sleep for 2mins to allow 1min cron job to run... + wait=tenacity.wait_fixed(120), + stop=tenacity.stop_after_attempt(2)) + def _retry_check_commands_on_units(self, cmds, units): + return generic_utils.check_commands_on_units(cmds, units) + + def test_414_rmq_nrpe_monitors(self): + """Check rabbimq-server nrpe monitor basic functionality.""" + units = zaza.model.get_units(self.application_name) + host_names = generic_utils.get_unit_hostnames(units) + + # check_rabbitmq monitor + logging.debug('Checking nrpe check_rabbitmq on units...') + cmds = ['egrep -oh /usr/local.* /etc/nagios/nrpe.d/' + 'check_rabbitmq.cfg'] + ret = self._retry_check_commands_on_units(cmds, units) + self.assertIsNone(ret, msg=ret) + + # check_rabbitmq_queue monitor + logging.debug('Checking nrpe check_rabbitmq_queue on units...') + cmds = ['egrep -oh /usr/local.* /etc/nagios/nrpe.d/' + 'check_rabbitmq_queue.cfg'] + ret = self._retry_check_commands_on_units(cmds, units) + self.assertIsNone(ret, msg=ret) + + # check dat file existence + logging.debug('Checking nrpe dat file existence on units...') + for u in units: + unit_host_name = host_names[u.entity_id] + + cmds = [ + 'stat /var/lib/rabbitmq/data/{}_general_stats.dat'.format( + unit_host_name), + 'stat /var/lib/rabbitmq/data/{}_queue_stats.dat'.format( + unit_host_name) + ] + + ret = generic_utils.check_commands_on_units(cmds, [u]) + self.assertIsNone(ret, msg=ret) + + logging.info('OK') + + def test_910_pause_and_resume(self): + """The services can be paused and resumed.""" + logging.debug('Checking pause and resume actions...') + + unit = zaza.model.get_units(self.application_name)[0] + assert unit.workload_status == "active" + + zaza.model.run_action(unit.entity_id, "pause") + zaza.model.block_until_unit_wl_status(unit.entity_id, "maintenance") + unit = zaza.model.get_unit_from_name(unit.entity_id) + assert unit.workload_status == "maintenance" + + zaza.model.run_action(unit.entity_id, "resume") + zaza.model.block_until_unit_wl_status(unit.entity_id, "active") + unit = zaza.model.get_unit_from_name(unit.entity_id) + assert unit.workload_status == "active" + + rmq_utils.wait_for_cluster() + logging.debug('OK') + + def test_911_cluster_status(self): + """Test rabbitmqctl cluster_status action can be returned.""" + logging.debug('Checking cluster status action...') + + unit = zaza.model.get_units(self.application_name)[0] + action = zaza.model.run_action(unit.entity_id, "cluster-status") + self.assertIsInstance(action, juju.action.Action) + + logging.debug('OK') + + def test_912_check_queues(self): + """Test rabbitmqctl check_queues action can be returned.""" + logging.debug('Checking cluster status action...') + + unit = zaza.model.get_units(self.application_name)[0] + action = zaza.model.run_action(unit.entity_id, "check-queues") + self.assertIsInstance(action, juju.action.Action) + + def test_913_list_unconsumed_queues(self): + """Test rabbitmqctl list-unconsumed-queues action can be returned.""" + logging.debug('Checking list-unconsumed-queues action...') + + unit = zaza.model.get_units(self.application_name)[0] + self._test_rmq_amqp_messages_all_units([unit]) + action = zaza.model.run_action(unit.entity_id, + 'list-unconsumed-queues') + self.assertIsInstance(action, juju.action.Action) + + queue_count = int(action.results['unconsumed-queue-count']) + assert queue_count > 0, 'Did not find any unconsumed queues.' + + queue_name = 'test' # publish_amqp_message_by_unit default queue name + for i in range(queue_count): + queue_data = json.loads( + action.results['unconsumed-queues'][str(i)]) + if queue_data['name'] == queue_name: + break + else: + assert False, 'Did not find expected queue in result.' + + # Since we just reused _test_rmq_amqp_messages_all_units, we should + # have created the queue if it didn't already exist, but all messages + # should have already been consumed. + assert queue_data['messages'] == 0, 'Found unexpected message count.' + + logging.debug('OK') + + @tenacity.retry( + retry=tenacity.retry_if_result(lambda errors: bool(errors)), + wait=tenacity.wait_fixed(10), + stop=tenacity.stop_after_attempt(2)) + def _retry_check_unit_cluster_nodes(self, u, unit_node_names): + return rmq_utils.check_unit_cluster_nodes(u, unit_node_names) + + def test_921_remove_unit(self): + """Test if unit cleans up when removed from Rmq cluster. + + Test if a unit correctly cleans up by removing itself from the + RabbitMQ cluster on removal + + """ + logging.debug('Checking that units correctly clean up after ' + 'themselves on unit removal...') + config = {'min-cluster-size': '2'} + zaza.model.set_application_config('rabbitmq-server', config) + rmq_utils.wait_for_cluster() + + units = zaza.model.get_units(self.application_name) + removed_unit = units[-1] + left_units = units[:-1] + + zaza.model.run_on_unit(removed_unit.entity_id, 'hooks/stop') + zaza.model.block_until_unit_wl_status(removed_unit.entity_id, + "waiting") + + unit_host_names = generic_utils.get_unit_hostnames(left_units) + unit_node_names = [] + for unit in unit_host_names: + unit_node_names.append('rabbit@{}'.format(unit_host_names[unit])) + errors = [] + + for u in left_units: + e = self._retry_check_unit_cluster_nodes(u, + unit_node_names) + if e: + errors.append(e) + + self.assertFalse(errors, msg=errors) + logging.debug('OK') diff --git a/zaza/openstack/charm_tests/rabbitmq_server/utils.py b/zaza/openstack/charm_tests/rabbitmq_server/utils.py new file mode 100644 index 0000000..dba2d92 --- /dev/null +++ b/zaza/openstack/charm_tests/rabbitmq_server/utils.py @@ -0,0 +1,476 @@ +# Copyright 2019 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""RabbitMQ Testing utility functions.""" + +import json +import logging + +import pika +import tenacity +import zaza.model + +import ssl as libssl +import zaza.openstack.utilities.generic as generic_utils + + +class RmqNoMessageException(Exception): + """Message retrieval from Rmq resulted in no message.""" + + pass + + +def _log_tenacity_retry(retry_state): + logging.info('Attempt {}: {}'.format(retry_state.attempt_number, + retry_state.outcome.result())) + + +def wait_for_cluster(model_name=None, timeout=1200): + """Wait for Rmq cluster status to show cluster readiness. + + Wait for rmq units extended status to show cluster readiness, + after an optional initial sleep period. Initial sleep is likely + necessary to be effective following a config change, as status + message may not instantly update to non-ready. + """ + states = { + 'rabbitmq-server': { + 'workload-status-messages': 'Unit is ready and clustered' + } + } + + zaza.model.wait_for_application_states(model_name=model_name, + states=states, + timeout=timeout) + + +def add_user(units, username="testuser1", password="changeme"): + """Add a user to a RabbitMQ cluster. + + Add a user via the first rmq juju unit, check connection as + the new user against all units. + :param units: list of unit pointers + :param username: amqp user name, default to testuser1 + :param password: amqp user password + :returns: None if successful. Raise on error. + """ + logging.debug('Adding rmq user ({})...'.format(username)) + + # Check that user does not already exist + cmd_user_list = 'rabbitmqctl list_users' + cmd_result = zaza.model.run_on_unit(units[0].entity_id, cmd_user_list) + output = cmd_result['Stdout'].strip() + if username in output: + logging.warning('User ({}) already exists, returning ' + 'gracefully.'.format(username)) + return + + perms = '".*" ".*" ".*"' + cmds = ['rabbitmqctl add_user {} {}'.format(username, password), + 'rabbitmqctl set_permissions {} {}'.format(username, perms)] + + # Add user via first unit + for cmd in cmds: + cmd_result = zaza.model.run_on_unit(units[0].entity_id, cmd) + output = cmd_result['Stdout'].strip() + + # Check connection against the other units + logging.debug('Checking user connect against units...') + for u in units: + connection = connect_amqp_by_unit(u, ssl=False, + username=username, + password=password) + connection.close() + + +def delete_user(units, username="testuser1"): + """Delete a user from a RabbitMQ cluster. + + Delete a rabbitmq user via the first rmq juju unit. + :param units: list of unit pointers + :param username: amqp user name, default to testuser1 + :param password: amqp user password + :returns: None if successful or no such user. + """ + logging.debug('Deleting rmq user ({})...'.format(username)) + + # Check that the user exists + cmd_user_list = 'rabbitmqctl list_users' + output = zaza.model.run_on_unit(units[0].entity_id, + cmd_user_list)['Stdout'].strip() + + if username not in output: + logging.warning('User ({}) does not exist, returning ' + 'gracefully.'.format(username)) + return + + # Delete the user + cmd_user_del = 'rabbitmqctl delete_user {}'.format(username) + output = zaza.model.run_on_unit(units[0].entity_id, cmd_user_del) + + +def get_cluster_status(unit): + """Get RabbitMQ cluster status output. + + Execute rabbitmq cluster status command on a unit and return + the full output. + :param unit: unit + :returns: String containing console output of cluster status command + """ + cmd = 'rabbitmqctl cluster_status' + output = zaza.model.run_on_unit(unit.entity_id, cmd)['Stdout'].strip() + logging.debug('{} cluster_status:\n{}'.format( + unit.entity_id, output)) + return str(output) + + +def get_cluster_running_nodes(unit): + """Get a list of RabbitMQ cluster's running nodes. + + Parse rabbitmqctl cluster_status output string, return list of + running rabbitmq cluster nodes. + :param unit: unit pointer + :returns: List containing node names of running nodes + """ + # NOTE(beisner): rabbitmqctl cluster_status output is not + # json-parsable, do string chop foo, then json.loads that. + str_stat = get_cluster_status(unit) + if 'running_nodes' in str_stat: + pos_start = str_stat.find("{running_nodes,") + 15 + pos_end = str_stat.find("]},", pos_start) + 1 + str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"') + run_nodes = json.loads(str_run_nodes) + return run_nodes + else: + return [] + + +def validate_cluster_running_nodes(units): + """Check all rmq unit hostnames are represented in cluster_status. + + Check that all rmq unit hostnames are represented in the + cluster_status output of all units. + :param host_names: dict of juju unit names to host names + :param units: list of unit pointers (all rmq units) + :returns: None if successful, otherwise return error message + """ + host_names = generic_utils.get_unit_hostnames(units) + errors = [] + + # Query every unit for cluster_status running nodes + for query_unit in units: + query_unit_name = query_unit.entity_id + running_nodes = get_cluster_running_nodes(query_unit) + + # Confirm that every unit is represented in the queried unit's + # cluster_status running nodes output. + for validate_unit in units: + val_host_name = host_names[validate_unit.entity_id] + val_node_name = 'rabbit@{}'.format(val_host_name) + + if val_node_name not in running_nodes: + errors.append('Cluster member check failed on {}: {} not ' + 'in {}\n'.format(query_unit_name, + val_node_name, + running_nodes)) + if errors: + return ''.join(errors) + + +def validate_ssl_enabled_units(units, port=None): + """Check that ssl is enabled on rmq juju units. + + :param units: list of all rmq units + :param port: optional ssl port override to validate + :returns: None if successful, otherwise return error message + """ + for u in units: + if not is_ssl_enabled_on_unit(u, port=port): + return ('Unexpected condition: ssl is disabled on unit ' + '({})'.format(u.info['unit_name'])) + return None + + +def validate_ssl_disabled_units(units): + """Check that ssl is enabled on listed rmq juju units. + + :param units: list of all rmq units + :returns: True if successful. Raise on error. + """ + for u in units: + if is_ssl_enabled_on_unit(u): + return ('Unexpected condition: ssl is enabled on unit ' + '({})'.format(u.entity_id)) + return None + + +@tenacity.retry( + retry=tenacity.retry_if_result(lambda errors: bool(errors)), + wait=tenacity.wait_fixed(4), + stop=tenacity.stop_after_attempt(15), + after=_log_tenacity_retry) +def _retry_validate_ssl_enabled_units(units, port=None): + return validate_ssl_enabled_units(units, port=port) + + +def configure_ssl_on(units, model_name=None, port=None): + """Turn RabbitMQ charm SSL config option on. + + Turn ssl charm config option on, with optional non-default + ssl port specification. Confirm that it is enabled on every + unit. + :param units: list of units + :param port: amqp port, use defaults if None + :returns: None if successful. Raise on error. + """ + logging.debug('Setting ssl charm config option: on') + + # Enable RMQ SSL + config = {'ssl': 'on'} + if port: + config['ssl_port'] = str(port) + + zaza.model.set_application_config('rabbitmq-server', + config, + model_name=model_name) + + # Wait for unit status + wait_for_cluster(model_name) + + ret = _retry_validate_ssl_enabled_units(units, port=port) + if ret: + raise Exception(ret) + + +@tenacity.retry( + retry=tenacity.retry_if_result(lambda errors: bool(errors)), + wait=tenacity.wait_fixed(4), + stop=tenacity.stop_after_attempt(15), + after=_log_tenacity_retry) +def _retry_validate_ssl_disabled_units(units): + return validate_ssl_disabled_units(units) + + +def configure_ssl_off(units, model_name=None, max_wait=60): + """Turn RabbitMQ charm SSL config option off. + + Turn ssl charm config option off, confirm that it is disabled + on every unit. + :param units: list of units + :param max_wait: maximum time to wait in seconds to confirm + :returns: None if successful. Raise on error. + """ + logging.debug('Setting ssl charm config option: off') + + # Disable RMQ SSL + config = {'ssl': 'off'} + zaza.model.set_application_config('rabbitmq-server', + config, + model_name=model_name) + + # Wait for unit status + wait_for_cluster(model_name) + + ret = _retry_validate_ssl_disabled_units(units) + + if ret: + raise Exception(ret) + + +def is_ssl_enabled_on_unit(unit, port=None): + """Check a single juju rmq unit for ssl and port in the config file.""" + host = unit.public_address + unit_name = unit.entity_id + + conf_file = '/etc/rabbitmq/rabbitmq.config' + conf_contents = str(generic_utils.get_file_contents(unit, + conf_file)) + # Checks + conf_ssl = 'ssl' in conf_contents + conf_port = str(port) in conf_contents + + # Port explicitly checked in config + if port and conf_port and conf_ssl: + logging.debug('SSL is enabled @{}:{} ' + '({})'.format(host, port, unit_name)) + return True + elif port and not conf_port and conf_ssl: + logging.debug('SSL is enabled @{} but not on port {} ' + '({})'.format(host, port, unit_name)) + return False + # Port not checked (useful when checking that ssl is disabled) + elif not port and conf_ssl: + logging.debug('SSL is enabled @{}:{} ' + '({})'.format(host, port, unit_name)) + return True + elif not conf_ssl: + logging.debug('SSL not enabled @{}:{} ' + '({})'.format(host, port, unit_name)) + return False + else: + msg = ('Unknown condition when checking SSL status @{}:{} ' + '({})'.format(host, port, unit_name)) + raise ValueError(msg) + + +def connect_amqp_by_unit(unit, ssl=False, + port=None, fatal=True, + username="testuser1", password="changeme"): + """Establish and return a pika amqp connection to the rabbitmq service. + + Establish and return a pika amqp connection to the rabbitmq service + running on a rmq juju unit. + :param unit: unit pointer + :param ssl: boolean, default to False + :param port: amqp port, use defaults if None + :param fatal: boolean, default to True (raises on connect error) + :param username: amqp user name, default to testuser1 + :param password: amqp user password + :returns: pika amqp connection pointer or None if failed and non-fatal + """ + host = unit.public_address + unit_name = unit.entity_id + + if ssl: + # TODO: when Python3.5 support is removed, investigate + # changing protocol to PROTOCOL_TLS + context = libssl.SSLContext(protocol=libssl.PROTOCOL_TLSv1_2) + ssl_options = pika.SSLOptions(context) + else: + ssl_options = None + + # Default port logic if port is not specified + if ssl and not port: + port = 5671 + elif not ssl and not port: + port = 5672 + + logging.debug('Connecting to amqp on {}:{} ({}) as ' + '{}...'.format(host, port, unit_name, username)) + + try: + credentials = pika.PlainCredentials(username, password) + parameters = pika.ConnectionParameters(host=host, port=port, + credentials=credentials, + ssl_options=ssl_options, + connection_attempts=3, + retry_delay=5, + socket_timeout=1) + connection = pika.BlockingConnection(parameters) + assert connection.is_open is True + logging.debug('Connect OK') + return connection + except Exception as e: + msg = ('amqp connection failed to {}:{} as ' + '{} ({})'.format(host, port, username, str(e))) + if fatal: + raise Exception(msg) + else: + logging.warn(msg) + return None + + +def publish_amqp_message_by_unit(unit, message, + queue="test", ssl=False, + username="testuser1", + password="changeme", + port=None): + """Publish an amqp message to a rmq juju unit. + + :param unit: unit pointer + :param message: amqp message string + :param queue: message queue, default to test + :param username: amqp user name, default to testuser1 + :param password: amqp user password + :param ssl: boolean, default to False + :param port: amqp port, use defaults if None + :returns: None. Raises exception if publish failed. + """ + logging.debug('Publishing message to {} queue:\n{}'.format(queue, + message)) + connection = connect_amqp_by_unit(unit, ssl=ssl, + port=port, + username=username, + password=password) + + # NOTE(beisner): extra debug here re: pika hang potential: + # https://github.com/pika/pika/issues/297 + # https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw + logging.debug('Defining channel...') + channel = connection.channel() + logging.debug('Declaring queue...') + channel.queue_declare(queue=queue, auto_delete=False, durable=True) + logging.debug('Publishing message...') + channel.basic_publish(exchange='', routing_key=queue, body=message) + logging.debug('Closing channel...') + channel.close() + logging.debug('Closing connection...') + connection.close() + + +def get_amqp_message_by_unit(unit, queue="test", + username="testuser1", + password="changeme", + ssl=False, port=None): + """Get an amqp message from a rmq juju unit. + + :param unit: unit pointer + :param queue: message queue, default to test + :param username: amqp user name, default to testuser1 + :param password: amqp user password + :param ssl: boolean, default to False + :param port: amqp port, use defaults if None + :returns: amqp message body as string. Raise if get fails. + """ + connection = connect_amqp_by_unit(unit, ssl=ssl, + port=port, + username=username, + password=password) + channel = connection.channel() + method_frame, _, body = channel.basic_get(queue) + body = body.decode() + + if method_frame: + logging.debug('Retreived message from {} queue:\n{}'.format(queue, + body)) + channel.basic_ack(method_frame.delivery_tag) + channel.close() + connection.close() + return body + else: + msg = 'No message retrieved.' + raise RmqNoMessageException(msg) + + +def check_unit_cluster_nodes(unit, unit_node_names): + """Check if unit exists in list of Rmq cluster node names.""" + unit_name = unit.entity_id + nodes = [] + errors = [] + str_stat = get_cluster_status(unit) + # make the interesting part of rabbitmqctl cluster_status output + # json-parseable. + if 'nodes,[{disc,' in str_stat: + pos_start = str_stat.find('nodes,[{disc,') + 13 + pos_end = str_stat.find(']}]},', pos_start) + 1 + str_nodes = str_stat[pos_start:pos_end].replace("'", '"') + nodes = json.loads(str_nodes) + for node in nodes: + if node not in unit_node_names: + errors.append('Cluster registration check failed on {}: ' + '{} should not be registered with RabbitMQ ' + 'after unit removal.\n' + ''.format(unit_name, node)) + + return errors diff --git a/zaza/openstack/utilities/generic.py b/zaza/openstack/utilities/generic.py index f1bb621..856e036 100644 --- a/zaza/openstack/utilities/generic.py +++ b/zaza/openstack/utilities/generic.py @@ -75,6 +75,16 @@ def get_network_config(net_topology, ignore_env_vars=False, return net_info +def get_unit_hostnames(units): + """Return a dict of juju unit names to hostnames.""" + host_names = {} + for unit in units: + output = model.run_on_unit(unit.entity_id, 'hostname') + hostname = output['Stdout'].strip() + host_names[unit.entity_id] = hostname + return host_names + + def get_pkg_version(application, pkg): """Return package version. @@ -526,6 +536,31 @@ def dist_upgrade(unit_name): model.run_on_unit(unit_name, dist_upgrade_cmd) +def check_commands_on_units(commands, units): + """Check that all commands in a list exit zero on all units in a list. + + :param commands: list of bash commands + :param units: list of unit pointers + :returns: None if successful; Failure message otherwise + """ + logging.debug('Checking exit codes for {} commands on {} ' + 'units...'.format(len(commands), + len(units))) + + for u in units: + for cmd in commands: + output = model.run_on_unit(u.entity_id, cmd) + if int(output['Code']) == 0: + logging.debug('{} `{}` returned {} ' + '(OK)'.format(u.entity_id, + cmd, output['Code'])) + else: + return ('{} `{}` returned {} ' + '{}'.format(u.entity_id, + cmd, output['Code'], output)) + return None + + def do_release_upgrade(unit_name): """Run do-release-upgrade noninteractive. @@ -729,6 +764,12 @@ def get_ubuntu_release(ubuntu_name): return index +def get_file_contents(unit, f): + """Get contents of a file on a remote unit.""" + return model.run_on_unit(unit.entity_id, + "cat {}".format(f))['Stdout'] + + def is_port_open(port, address): """Determine if TCP port is accessible. @@ -752,3 +793,29 @@ def is_port_open(port, address): logging.error("connection refused connecting" " to {}:{}".format(address, port)) return False + + +def port_knock_units(units, port=22, expect_success=True): + """Check if specific port is open on units. + + Open a TCP socket to check for a listening sevice on each listed juju unit. + :param units: list of unit pointers + :param port: TCP port number, default to 22 + :param timeout: Connect timeout, default to 15 seconds + :expect_success: True by default, set False to invert logic + :returns: None if successful, Failure message otherwise + """ + for u in units: + host = u.public_address + connected = is_port_open(port, host) + if not connected and expect_success: + return 'Socket connect failed.' + elif connected and not expect_success: + return 'Socket connected unexpectedly.' + + +def get_series(unit): + """Ubuntu release name running on unit.""" + result = model.run_on_unit(unit.entity_id, + "lsb_release -cs") + return result['Stdout'].strip()