Merge pull request #60 from exsdev0/rabbitmq-zaza-tests
Rabbitmq zaza tests
This commit is contained in:
@@ -575,3 +575,87 @@ class TestGenericUtils(ut_utils.BaseTestCase):
|
||||
|
||||
self.telnet.side_effect = generic_utils.socket.error
|
||||
self.assertFalse(generic_utils.is_port_open(_port, _addr))
|
||||
|
||||
def test_get_unit_hostnames(self):
|
||||
self.patch(
|
||||
"zaza.openstack.utilities.generic.model.run_on_unit",
|
||||
new_callable=mock.MagicMock(),
|
||||
name="_run"
|
||||
)
|
||||
|
||||
_unit1 = mock.MagicMock()
|
||||
_unit1.entity_id = "testunit/1"
|
||||
_unit2 = mock.MagicMock()
|
||||
_unit2.entity_id = "testunit/2"
|
||||
|
||||
_hostname1 = "host1.domain"
|
||||
_hostname2 = "host2.domain"
|
||||
|
||||
expected = {
|
||||
_unit1.entity_id: _hostname1,
|
||||
_unit2.entity_id: _hostname2,
|
||||
}
|
||||
|
||||
_units = [_unit1, _unit2]
|
||||
|
||||
self._run.side_effect = [{"Stdout": _hostname1},
|
||||
{"Stdout": _hostname2}]
|
||||
|
||||
actual = generic_utils.get_unit_hostnames(_units)
|
||||
|
||||
self.assertEqual(actual, expected)
|
||||
|
||||
def test_port_knock_units(self):
|
||||
self.patch(
|
||||
"zaza.openstack.utilities.generic.is_port_open",
|
||||
new_callable=mock.MagicMock(),
|
||||
name="_is_port_open"
|
||||
)
|
||||
|
||||
_units = [
|
||||
mock.MagicMock(),
|
||||
mock.MagicMock(),
|
||||
]
|
||||
|
||||
self._is_port_open.side_effect = [True, True]
|
||||
self.assertIsNone(generic_utils.port_knock_units(_units))
|
||||
self.assertEqual(self._is_port_open.call_count, len(_units))
|
||||
|
||||
self._is_port_open.side_effect = [True, False]
|
||||
self.assertIsNotNone(generic_utils.port_knock_units(_units))
|
||||
|
||||
# check when func is expecting failure, i.e. should succeed
|
||||
self._is_port_open.reset_mock()
|
||||
self._is_port_open.side_effect = [False, False]
|
||||
self.assertIsNone(generic_utils.port_knock_units(_units,
|
||||
expect_success=False))
|
||||
self.assertEqual(self._is_port_open.call_count, len(_units))
|
||||
|
||||
def test_check_commands_on_units(self):
|
||||
self.patch(
|
||||
"zaza.openstack.utilities.generic.model.run_on_unit",
|
||||
new_callable=mock.MagicMock(),
|
||||
name="_run"
|
||||
)
|
||||
|
||||
num_units = 2
|
||||
_units = [mock.MagicMock() for i in range(num_units)]
|
||||
|
||||
num_cmds = 3
|
||||
cmds = ["/usr/bin/fakecmd"] * num_cmds
|
||||
|
||||
# Test success, all calls return 0
|
||||
# zero is a string to replicate run_on_unit return data type
|
||||
_cmd_results = [{"Code": "0"}] * len(_units) * len(cmds)
|
||||
self._run.side_effect = _cmd_results
|
||||
|
||||
result = generic_utils.check_commands_on_units(cmds, _units)
|
||||
self.assertIsNone(result)
|
||||
self.assertEqual(self._run.call_count, len(_units) * len(cmds))
|
||||
|
||||
# Test failure, some call returns 1
|
||||
_cmd_results[2] = {"Code": "1"}
|
||||
self._run.side_effect = _cmd_results
|
||||
|
||||
result = generic_utils.check_commands_on_units(cmds, _units)
|
||||
self.assertIsNotNone(result)
|
||||
|
||||
15
zaza/openstack/charm_tests/rabbitmq_server/__init__.py
Normal file
15
zaza/openstack/charm_tests/rabbitmq_server/__init__.py
Normal file
@@ -0,0 +1,15 @@
|
||||
# Copyright 2019 Canonical Ltd.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Collection of code for setting up and testing rabbitmq server."""
|
||||
400
zaza/openstack/charm_tests/rabbitmq_server/tests.py
Normal file
400
zaza/openstack/charm_tests/rabbitmq_server/tests.py
Normal file
@@ -0,0 +1,400 @@
|
||||
# Copyright 2019 Canonical Ltd.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""RabbitMQ Testing."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import juju
|
||||
import tenacity
|
||||
import zaza.model
|
||||
import zaza.openstack.charm_tests.test_utils as test_utils
|
||||
import zaza.openstack.utilities.generic as generic_utils
|
||||
|
||||
from charmhelpers.core.host import CompareHostReleases
|
||||
from zaza.openstack.utilities.generic import get_series
|
||||
|
||||
from . import utils as rmq_utils
|
||||
from .utils import RmqNoMessageException
|
||||
|
||||
|
||||
class RmqTests(test_utils.OpenStackBaseTest):
|
||||
"""Zaza tests on a basic rabbitmq cluster deployment."""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
"""Run class setup for running tests."""
|
||||
super(RmqTests, cls).setUpClass()
|
||||
|
||||
def _get_uuid_epoch_stamp(self):
|
||||
"""Return a string based on uuid4 and epoch time.
|
||||
|
||||
Useful in generating test messages which need to be unique-ish.
|
||||
"""
|
||||
return '[{}-{}]'.format(uuid.uuid4(), time.time())
|
||||
|
||||
@tenacity.retry(
|
||||
retry=tenacity.retry_if_exception_type(RmqNoMessageException),
|
||||
wait=tenacity.wait_fixed(10),
|
||||
stop=tenacity.stop_after_attempt(2))
|
||||
def _retry_get_amqp_message(self, check_unit, ssl=None, port=None):
|
||||
return rmq_utils.get_amqp_message_by_unit(check_unit,
|
||||
ssl=ssl,
|
||||
port=port)
|
||||
|
||||
def _test_rmq_amqp_messages_all_units(self, units,
|
||||
ssl=False, port=None):
|
||||
"""Reusable test to send/check amqp messages to every listed rmq unit.
|
||||
|
||||
Reusable test to send amqp messages to every listed rmq
|
||||
unit. Checks every listed rmq unit for messages.
|
||||
:param units: list of units
|
||||
:returns: None if successful. Raise on error.
|
||||
|
||||
"""
|
||||
# Add test user if it does not already exist
|
||||
rmq_utils.add_user(units)
|
||||
|
||||
# Handle ssl (includes wait-for-cluster)
|
||||
if ssl:
|
||||
rmq_utils.configure_ssl_on(units, port=port)
|
||||
else:
|
||||
rmq_utils.configure_ssl_off(units)
|
||||
|
||||
# Publish and get amqp messages in all possible unit combinations.
|
||||
# Qty of checks == (qty of units) ^ 2
|
||||
amqp_msg_counter = 1
|
||||
host_names = generic_utils.get_unit_hostnames(units)
|
||||
|
||||
for dest_unit in units:
|
||||
dest_unit_name = dest_unit.entity_id
|
||||
dest_unit_host = dest_unit.public_address
|
||||
dest_unit_host_name = host_names[dest_unit_name]
|
||||
|
||||
for check_unit in units:
|
||||
check_unit_name = check_unit.entity_id
|
||||
check_unit_host = check_unit.public_address
|
||||
check_unit_host_name = host_names[check_unit_name]
|
||||
|
||||
amqp_msg_stamp = self._get_uuid_epoch_stamp()
|
||||
amqp_msg = ('Message {}@{} {}'.format(amqp_msg_counter,
|
||||
dest_unit_host,
|
||||
amqp_msg_stamp)).upper()
|
||||
# Publish amqp message
|
||||
logging.debug('Publish message to: {} '
|
||||
'({} {})'.format(dest_unit_host,
|
||||
dest_unit_name,
|
||||
dest_unit_host_name))
|
||||
|
||||
rmq_utils.publish_amqp_message_by_unit(dest_unit,
|
||||
amqp_msg, ssl=ssl,
|
||||
port=port)
|
||||
|
||||
# Get amqp message
|
||||
logging.debug('Get message from: {} '
|
||||
'({} {})'.format(check_unit_host,
|
||||
check_unit_name,
|
||||
check_unit_host_name))
|
||||
|
||||
amqp_msg_rcvd = self._retry_get_amqp_message(check_unit,
|
||||
ssl=ssl,
|
||||
port=port)
|
||||
|
||||
# Validate amqp message content
|
||||
if amqp_msg == amqp_msg_rcvd:
|
||||
logging.debug('Message {} received '
|
||||
'OK.'.format(amqp_msg_counter))
|
||||
else:
|
||||
logging.error('Expected: {}'.format(amqp_msg))
|
||||
logging.error('Actual: {}'.format(amqp_msg_rcvd))
|
||||
msg = 'Message {} mismatch.'.format(amqp_msg_counter)
|
||||
raise Exception(msg)
|
||||
|
||||
amqp_msg_counter += 1
|
||||
|
||||
# Delete the test user
|
||||
rmq_utils.delete_user(units)
|
||||
|
||||
def test_400_rmq_cluster_running_nodes(self):
|
||||
"""Verify cluster status shows every cluster node as running member."""
|
||||
logging.debug('Checking that all units are in cluster_status '
|
||||
'running nodes...')
|
||||
|
||||
units = zaza.model.get_units(self.application_name)
|
||||
|
||||
ret = rmq_utils.validate_cluster_running_nodes(units)
|
||||
self.assertIsNone(ret, msg=ret)
|
||||
|
||||
logging.info('OK')
|
||||
|
||||
def test_406_rmq_amqp_messages_all_units_ssl_off(self):
|
||||
"""Send (and check) amqp messages to every rmq unit.
|
||||
|
||||
Sends amqp messages to every rmq unit, and check every rmq
|
||||
unit for messages. Uses Standard amqp tcp port, no ssl.
|
||||
|
||||
"""
|
||||
logging.debug('Checking amqp message publish/get on all units '
|
||||
'(ssl off)...')
|
||||
|
||||
units = zaza.model.get_units(self.application_name)
|
||||
self._test_rmq_amqp_messages_all_units(units, ssl=False)
|
||||
logging.info('OK')
|
||||
|
||||
def test_408_rmq_amqp_messages_all_units_ssl_on(self):
|
||||
"""Send (and check) amqp messages to every rmq unit (ssl enabled).
|
||||
|
||||
Sends amqp messages to every rmq unit, and check every rmq
|
||||
unit for messages. Uses Standard ssl tcp port.
|
||||
|
||||
"""
|
||||
units = zaza.model.get_units(self.application_name)
|
||||
|
||||
# http://pad.lv/1625044
|
||||
if CompareHostReleases(get_series(units[0])) <= 'trusty':
|
||||
logging.info('SKIP')
|
||||
logging.info('Skipping SSL tests due to client'
|
||||
' compatibility issues')
|
||||
return
|
||||
logging.debug('Checking amqp message publish/get on all units '
|
||||
'(ssl on)...')
|
||||
|
||||
self._test_rmq_amqp_messages_all_units(units,
|
||||
ssl=True, port=5671)
|
||||
logging.info('OK')
|
||||
|
||||
def test_410_rmq_amqp_messages_all_units_ssl_alt_port(self):
|
||||
"""Send (and check) amqp messages to every rmq unit (alt ssl port).
|
||||
|
||||
Send amqp messages with ssl on, to every rmq unit and check
|
||||
every rmq unit for messages. Custom ssl tcp port.
|
||||
|
||||
"""
|
||||
units = zaza.model.get_units(self.application_name)
|
||||
|
||||
# http://pad.lv/1625044
|
||||
if CompareHostReleases(get_series(units[0])) <= 'trusty':
|
||||
logging.info('SKIP')
|
||||
logging.info('Skipping SSL tests due to client'
|
||||
' compatibility issues')
|
||||
return
|
||||
logging.debug('Checking amqp message publish/get on all units '
|
||||
'(ssl on)...')
|
||||
|
||||
units = zaza.model.get_units(self.application_name)
|
||||
self._test_rmq_amqp_messages_all_units(units,
|
||||
ssl=True, port=5999)
|
||||
logging.info('OK')
|
||||
|
||||
@tenacity.retry(
|
||||
retry=tenacity.retry_if_result(lambda ret: ret is not None),
|
||||
wait=tenacity.wait_fixed(30),
|
||||
stop=tenacity.stop_after_attempt(20),
|
||||
after=rmq_utils._log_tenacity_retry)
|
||||
def _retry_port_knock_units(self, units, port, expect_success=True):
|
||||
return generic_utils.port_knock_units(units, port,
|
||||
expect_success=expect_success)
|
||||
|
||||
def test_412_rmq_management_plugin(self):
|
||||
"""Enable and check management plugin."""
|
||||
logging.debug('Checking tcp socket connect to management plugin '
|
||||
'port on all rmq units...')
|
||||
|
||||
units = zaza.model.get_units(self.application_name)
|
||||
mgmt_port = 15672
|
||||
|
||||
# Enable management plugin
|
||||
logging.debug('Enabling management_plugin charm config option...')
|
||||
config = {'management_plugin': 'True'}
|
||||
zaza.model.set_application_config('rabbitmq-server', config)
|
||||
rmq_utils.wait_for_cluster()
|
||||
|
||||
# Check tcp connect to management plugin port
|
||||
ret = self._retry_port_knock_units(units, mgmt_port)
|
||||
|
||||
self.assertIsNone(ret, msg=ret)
|
||||
logging.debug('Connect to all units (OK)')
|
||||
|
||||
# Disable management plugin
|
||||
logging.debug('Disabling management_plugin charm config option...')
|
||||
config = {'management_plugin': 'False'}
|
||||
zaza.model.set_application_config('rabbitmq-server', config)
|
||||
rmq_utils.wait_for_cluster()
|
||||
|
||||
# Negative check - tcp connect to management plugin port
|
||||
logging.info('Expect tcp connect fail since charm config '
|
||||
'option is disabled.')
|
||||
ret = self._retry_port_knock_units(units,
|
||||
mgmt_port,
|
||||
expect_success=False)
|
||||
|
||||
self.assertIsNone(ret, msg=ret)
|
||||
logging.info('Confirm mgmt port closed on all units (OK)')
|
||||
|
||||
@tenacity.retry(
|
||||
retry=tenacity.retry_if_result(lambda ret: ret is not None),
|
||||
# sleep for 2mins to allow 1min cron job to run...
|
||||
wait=tenacity.wait_fixed(120),
|
||||
stop=tenacity.stop_after_attempt(2))
|
||||
def _retry_check_commands_on_units(self, cmds, units):
|
||||
return generic_utils.check_commands_on_units(cmds, units)
|
||||
|
||||
def test_414_rmq_nrpe_monitors(self):
|
||||
"""Check rabbimq-server nrpe monitor basic functionality."""
|
||||
units = zaza.model.get_units(self.application_name)
|
||||
host_names = generic_utils.get_unit_hostnames(units)
|
||||
|
||||
# check_rabbitmq monitor
|
||||
logging.debug('Checking nrpe check_rabbitmq on units...')
|
||||
cmds = ['egrep -oh /usr/local.* /etc/nagios/nrpe.d/'
|
||||
'check_rabbitmq.cfg']
|
||||
ret = self._retry_check_commands_on_units(cmds, units)
|
||||
self.assertIsNone(ret, msg=ret)
|
||||
|
||||
# check_rabbitmq_queue monitor
|
||||
logging.debug('Checking nrpe check_rabbitmq_queue on units...')
|
||||
cmds = ['egrep -oh /usr/local.* /etc/nagios/nrpe.d/'
|
||||
'check_rabbitmq_queue.cfg']
|
||||
ret = self._retry_check_commands_on_units(cmds, units)
|
||||
self.assertIsNone(ret, msg=ret)
|
||||
|
||||
# check dat file existence
|
||||
logging.debug('Checking nrpe dat file existence on units...')
|
||||
for u in units:
|
||||
unit_host_name = host_names[u.entity_id]
|
||||
|
||||
cmds = [
|
||||
'stat /var/lib/rabbitmq/data/{}_general_stats.dat'.format(
|
||||
unit_host_name),
|
||||
'stat /var/lib/rabbitmq/data/{}_queue_stats.dat'.format(
|
||||
unit_host_name)
|
||||
]
|
||||
|
||||
ret = generic_utils.check_commands_on_units(cmds, [u])
|
||||
self.assertIsNone(ret, msg=ret)
|
||||
|
||||
logging.info('OK')
|
||||
|
||||
def test_910_pause_and_resume(self):
|
||||
"""The services can be paused and resumed."""
|
||||
logging.debug('Checking pause and resume actions...')
|
||||
|
||||
unit = zaza.model.get_units(self.application_name)[0]
|
||||
assert unit.workload_status == "active"
|
||||
|
||||
zaza.model.run_action(unit.entity_id, "pause")
|
||||
zaza.model.block_until_unit_wl_status(unit.entity_id, "maintenance")
|
||||
unit = zaza.model.get_unit_from_name(unit.entity_id)
|
||||
assert unit.workload_status == "maintenance"
|
||||
|
||||
zaza.model.run_action(unit.entity_id, "resume")
|
||||
zaza.model.block_until_unit_wl_status(unit.entity_id, "active")
|
||||
unit = zaza.model.get_unit_from_name(unit.entity_id)
|
||||
assert unit.workload_status == "active"
|
||||
|
||||
rmq_utils.wait_for_cluster()
|
||||
logging.debug('OK')
|
||||
|
||||
def test_911_cluster_status(self):
|
||||
"""Test rabbitmqctl cluster_status action can be returned."""
|
||||
logging.debug('Checking cluster status action...')
|
||||
|
||||
unit = zaza.model.get_units(self.application_name)[0]
|
||||
action = zaza.model.run_action(unit.entity_id, "cluster-status")
|
||||
self.assertIsInstance(action, juju.action.Action)
|
||||
|
||||
logging.debug('OK')
|
||||
|
||||
def test_912_check_queues(self):
|
||||
"""Test rabbitmqctl check_queues action can be returned."""
|
||||
logging.debug('Checking cluster status action...')
|
||||
|
||||
unit = zaza.model.get_units(self.application_name)[0]
|
||||
action = zaza.model.run_action(unit.entity_id, "check-queues")
|
||||
self.assertIsInstance(action, juju.action.Action)
|
||||
|
||||
def test_913_list_unconsumed_queues(self):
|
||||
"""Test rabbitmqctl list-unconsumed-queues action can be returned."""
|
||||
logging.debug('Checking list-unconsumed-queues action...')
|
||||
|
||||
unit = zaza.model.get_units(self.application_name)[0]
|
||||
self._test_rmq_amqp_messages_all_units([unit])
|
||||
action = zaza.model.run_action(unit.entity_id,
|
||||
'list-unconsumed-queues')
|
||||
self.assertIsInstance(action, juju.action.Action)
|
||||
|
||||
queue_count = int(action.results['unconsumed-queue-count'])
|
||||
assert queue_count > 0, 'Did not find any unconsumed queues.'
|
||||
|
||||
queue_name = 'test' # publish_amqp_message_by_unit default queue name
|
||||
for i in range(queue_count):
|
||||
queue_data = json.loads(
|
||||
action.results['unconsumed-queues'][str(i)])
|
||||
if queue_data['name'] == queue_name:
|
||||
break
|
||||
else:
|
||||
assert False, 'Did not find expected queue in result.'
|
||||
|
||||
# Since we just reused _test_rmq_amqp_messages_all_units, we should
|
||||
# have created the queue if it didn't already exist, but all messages
|
||||
# should have already been consumed.
|
||||
assert queue_data['messages'] == 0, 'Found unexpected message count.'
|
||||
|
||||
logging.debug('OK')
|
||||
|
||||
@tenacity.retry(
|
||||
retry=tenacity.retry_if_result(lambda errors: bool(errors)),
|
||||
wait=tenacity.wait_fixed(10),
|
||||
stop=tenacity.stop_after_attempt(2))
|
||||
def _retry_check_unit_cluster_nodes(self, u, unit_node_names):
|
||||
return rmq_utils.check_unit_cluster_nodes(u, unit_node_names)
|
||||
|
||||
def test_921_remove_unit(self):
|
||||
"""Test if unit cleans up when removed from Rmq cluster.
|
||||
|
||||
Test if a unit correctly cleans up by removing itself from the
|
||||
RabbitMQ cluster on removal
|
||||
|
||||
"""
|
||||
logging.debug('Checking that units correctly clean up after '
|
||||
'themselves on unit removal...')
|
||||
config = {'min-cluster-size': '2'}
|
||||
zaza.model.set_application_config('rabbitmq-server', config)
|
||||
rmq_utils.wait_for_cluster()
|
||||
|
||||
units = zaza.model.get_units(self.application_name)
|
||||
removed_unit = units[-1]
|
||||
left_units = units[:-1]
|
||||
|
||||
zaza.model.run_on_unit(removed_unit.entity_id, 'hooks/stop')
|
||||
zaza.model.block_until_unit_wl_status(removed_unit.entity_id,
|
||||
"waiting")
|
||||
|
||||
unit_host_names = generic_utils.get_unit_hostnames(left_units)
|
||||
unit_node_names = []
|
||||
for unit in unit_host_names:
|
||||
unit_node_names.append('rabbit@{}'.format(unit_host_names[unit]))
|
||||
errors = []
|
||||
|
||||
for u in left_units:
|
||||
e = self._retry_check_unit_cluster_nodes(u,
|
||||
unit_node_names)
|
||||
if e:
|
||||
errors.append(e)
|
||||
|
||||
self.assertFalse(errors, msg=errors)
|
||||
logging.debug('OK')
|
||||
476
zaza/openstack/charm_tests/rabbitmq_server/utils.py
Normal file
476
zaza/openstack/charm_tests/rabbitmq_server/utils.py
Normal file
@@ -0,0 +1,476 @@
|
||||
# Copyright 2019 Canonical Ltd.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""RabbitMQ Testing utility functions."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
import pika
|
||||
import tenacity
|
||||
import zaza.model
|
||||
|
||||
import ssl as libssl
|
||||
import zaza.openstack.utilities.generic as generic_utils
|
||||
|
||||
|
||||
class RmqNoMessageException(Exception):
|
||||
"""Message retrieval from Rmq resulted in no message."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
def _log_tenacity_retry(retry_state):
|
||||
logging.info('Attempt {}: {}'.format(retry_state.attempt_number,
|
||||
retry_state.outcome.result()))
|
||||
|
||||
|
||||
def wait_for_cluster(model_name=None, timeout=1200):
|
||||
"""Wait for Rmq cluster status to show cluster readiness.
|
||||
|
||||
Wait for rmq units extended status to show cluster readiness,
|
||||
after an optional initial sleep period. Initial sleep is likely
|
||||
necessary to be effective following a config change, as status
|
||||
message may not instantly update to non-ready.
|
||||
"""
|
||||
states = {
|
||||
'rabbitmq-server': {
|
||||
'workload-status-messages': 'Unit is ready and clustered'
|
||||
}
|
||||
}
|
||||
|
||||
zaza.model.wait_for_application_states(model_name=model_name,
|
||||
states=states,
|
||||
timeout=timeout)
|
||||
|
||||
|
||||
def add_user(units, username="testuser1", password="changeme"):
|
||||
"""Add a user to a RabbitMQ cluster.
|
||||
|
||||
Add a user via the first rmq juju unit, check connection as
|
||||
the new user against all units.
|
||||
:param units: list of unit pointers
|
||||
:param username: amqp user name, default to testuser1
|
||||
:param password: amqp user password
|
||||
:returns: None if successful. Raise on error.
|
||||
"""
|
||||
logging.debug('Adding rmq user ({})...'.format(username))
|
||||
|
||||
# Check that user does not already exist
|
||||
cmd_user_list = 'rabbitmqctl list_users'
|
||||
cmd_result = zaza.model.run_on_unit(units[0].entity_id, cmd_user_list)
|
||||
output = cmd_result['Stdout'].strip()
|
||||
if username in output:
|
||||
logging.warning('User ({}) already exists, returning '
|
||||
'gracefully.'.format(username))
|
||||
return
|
||||
|
||||
perms = '".*" ".*" ".*"'
|
||||
cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
|
||||
'rabbitmqctl set_permissions {} {}'.format(username, perms)]
|
||||
|
||||
# Add user via first unit
|
||||
for cmd in cmds:
|
||||
cmd_result = zaza.model.run_on_unit(units[0].entity_id, cmd)
|
||||
output = cmd_result['Stdout'].strip()
|
||||
|
||||
# Check connection against the other units
|
||||
logging.debug('Checking user connect against units...')
|
||||
for u in units:
|
||||
connection = connect_amqp_by_unit(u, ssl=False,
|
||||
username=username,
|
||||
password=password)
|
||||
connection.close()
|
||||
|
||||
|
||||
def delete_user(units, username="testuser1"):
|
||||
"""Delete a user from a RabbitMQ cluster.
|
||||
|
||||
Delete a rabbitmq user via the first rmq juju unit.
|
||||
:param units: list of unit pointers
|
||||
:param username: amqp user name, default to testuser1
|
||||
:param password: amqp user password
|
||||
:returns: None if successful or no such user.
|
||||
"""
|
||||
logging.debug('Deleting rmq user ({})...'.format(username))
|
||||
|
||||
# Check that the user exists
|
||||
cmd_user_list = 'rabbitmqctl list_users'
|
||||
output = zaza.model.run_on_unit(units[0].entity_id,
|
||||
cmd_user_list)['Stdout'].strip()
|
||||
|
||||
if username not in output:
|
||||
logging.warning('User ({}) does not exist, returning '
|
||||
'gracefully.'.format(username))
|
||||
return
|
||||
|
||||
# Delete the user
|
||||
cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
|
||||
output = zaza.model.run_on_unit(units[0].entity_id, cmd_user_del)
|
||||
|
||||
|
||||
def get_cluster_status(unit):
|
||||
"""Get RabbitMQ cluster status output.
|
||||
|
||||
Execute rabbitmq cluster status command on a unit and return
|
||||
the full output.
|
||||
:param unit: unit
|
||||
:returns: String containing console output of cluster status command
|
||||
"""
|
||||
cmd = 'rabbitmqctl cluster_status'
|
||||
output = zaza.model.run_on_unit(unit.entity_id, cmd)['Stdout'].strip()
|
||||
logging.debug('{} cluster_status:\n{}'.format(
|
||||
unit.entity_id, output))
|
||||
return str(output)
|
||||
|
||||
|
||||
def get_cluster_running_nodes(unit):
|
||||
"""Get a list of RabbitMQ cluster's running nodes.
|
||||
|
||||
Parse rabbitmqctl cluster_status output string, return list of
|
||||
running rabbitmq cluster nodes.
|
||||
:param unit: unit pointer
|
||||
:returns: List containing node names of running nodes
|
||||
"""
|
||||
# NOTE(beisner): rabbitmqctl cluster_status output is not
|
||||
# json-parsable, do string chop foo, then json.loads that.
|
||||
str_stat = get_cluster_status(unit)
|
||||
if 'running_nodes' in str_stat:
|
||||
pos_start = str_stat.find("{running_nodes,") + 15
|
||||
pos_end = str_stat.find("]},", pos_start) + 1
|
||||
str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
|
||||
run_nodes = json.loads(str_run_nodes)
|
||||
return run_nodes
|
||||
else:
|
||||
return []
|
||||
|
||||
|
||||
def validate_cluster_running_nodes(units):
|
||||
"""Check all rmq unit hostnames are represented in cluster_status.
|
||||
|
||||
Check that all rmq unit hostnames are represented in the
|
||||
cluster_status output of all units.
|
||||
:param host_names: dict of juju unit names to host names
|
||||
:param units: list of unit pointers (all rmq units)
|
||||
:returns: None if successful, otherwise return error message
|
||||
"""
|
||||
host_names = generic_utils.get_unit_hostnames(units)
|
||||
errors = []
|
||||
|
||||
# Query every unit for cluster_status running nodes
|
||||
for query_unit in units:
|
||||
query_unit_name = query_unit.entity_id
|
||||
running_nodes = get_cluster_running_nodes(query_unit)
|
||||
|
||||
# Confirm that every unit is represented in the queried unit's
|
||||
# cluster_status running nodes output.
|
||||
for validate_unit in units:
|
||||
val_host_name = host_names[validate_unit.entity_id]
|
||||
val_node_name = 'rabbit@{}'.format(val_host_name)
|
||||
|
||||
if val_node_name not in running_nodes:
|
||||
errors.append('Cluster member check failed on {}: {} not '
|
||||
'in {}\n'.format(query_unit_name,
|
||||
val_node_name,
|
||||
running_nodes))
|
||||
if errors:
|
||||
return ''.join(errors)
|
||||
|
||||
|
||||
def validate_ssl_enabled_units(units, port=None):
|
||||
"""Check that ssl is enabled on rmq juju units.
|
||||
|
||||
:param units: list of all rmq units
|
||||
:param port: optional ssl port override to validate
|
||||
:returns: None if successful, otherwise return error message
|
||||
"""
|
||||
for u in units:
|
||||
if not is_ssl_enabled_on_unit(u, port=port):
|
||||
return ('Unexpected condition: ssl is disabled on unit '
|
||||
'({})'.format(u.info['unit_name']))
|
||||
return None
|
||||
|
||||
|
||||
def validate_ssl_disabled_units(units):
|
||||
"""Check that ssl is enabled on listed rmq juju units.
|
||||
|
||||
:param units: list of all rmq units
|
||||
:returns: True if successful. Raise on error.
|
||||
"""
|
||||
for u in units:
|
||||
if is_ssl_enabled_on_unit(u):
|
||||
return ('Unexpected condition: ssl is enabled on unit '
|
||||
'({})'.format(u.entity_id))
|
||||
return None
|
||||
|
||||
|
||||
@tenacity.retry(
|
||||
retry=tenacity.retry_if_result(lambda errors: bool(errors)),
|
||||
wait=tenacity.wait_fixed(4),
|
||||
stop=tenacity.stop_after_attempt(15),
|
||||
after=_log_tenacity_retry)
|
||||
def _retry_validate_ssl_enabled_units(units, port=None):
|
||||
return validate_ssl_enabled_units(units, port=port)
|
||||
|
||||
|
||||
def configure_ssl_on(units, model_name=None, port=None):
|
||||
"""Turn RabbitMQ charm SSL config option on.
|
||||
|
||||
Turn ssl charm config option on, with optional non-default
|
||||
ssl port specification. Confirm that it is enabled on every
|
||||
unit.
|
||||
:param units: list of units
|
||||
:param port: amqp port, use defaults if None
|
||||
:returns: None if successful. Raise on error.
|
||||
"""
|
||||
logging.debug('Setting ssl charm config option: on')
|
||||
|
||||
# Enable RMQ SSL
|
||||
config = {'ssl': 'on'}
|
||||
if port:
|
||||
config['ssl_port'] = str(port)
|
||||
|
||||
zaza.model.set_application_config('rabbitmq-server',
|
||||
config,
|
||||
model_name=model_name)
|
||||
|
||||
# Wait for unit status
|
||||
wait_for_cluster(model_name)
|
||||
|
||||
ret = _retry_validate_ssl_enabled_units(units, port=port)
|
||||
if ret:
|
||||
raise Exception(ret)
|
||||
|
||||
|
||||
@tenacity.retry(
|
||||
retry=tenacity.retry_if_result(lambda errors: bool(errors)),
|
||||
wait=tenacity.wait_fixed(4),
|
||||
stop=tenacity.stop_after_attempt(15),
|
||||
after=_log_tenacity_retry)
|
||||
def _retry_validate_ssl_disabled_units(units):
|
||||
return validate_ssl_disabled_units(units)
|
||||
|
||||
|
||||
def configure_ssl_off(units, model_name=None, max_wait=60):
|
||||
"""Turn RabbitMQ charm SSL config option off.
|
||||
|
||||
Turn ssl charm config option off, confirm that it is disabled
|
||||
on every unit.
|
||||
:param units: list of units
|
||||
:param max_wait: maximum time to wait in seconds to confirm
|
||||
:returns: None if successful. Raise on error.
|
||||
"""
|
||||
logging.debug('Setting ssl charm config option: off')
|
||||
|
||||
# Disable RMQ SSL
|
||||
config = {'ssl': 'off'}
|
||||
zaza.model.set_application_config('rabbitmq-server',
|
||||
config,
|
||||
model_name=model_name)
|
||||
|
||||
# Wait for unit status
|
||||
wait_for_cluster(model_name)
|
||||
|
||||
ret = _retry_validate_ssl_disabled_units(units)
|
||||
|
||||
if ret:
|
||||
raise Exception(ret)
|
||||
|
||||
|
||||
def is_ssl_enabled_on_unit(unit, port=None):
|
||||
"""Check a single juju rmq unit for ssl and port in the config file."""
|
||||
host = unit.public_address
|
||||
unit_name = unit.entity_id
|
||||
|
||||
conf_file = '/etc/rabbitmq/rabbitmq.config'
|
||||
conf_contents = str(generic_utils.get_file_contents(unit,
|
||||
conf_file))
|
||||
# Checks
|
||||
conf_ssl = 'ssl' in conf_contents
|
||||
conf_port = str(port) in conf_contents
|
||||
|
||||
# Port explicitly checked in config
|
||||
if port and conf_port and conf_ssl:
|
||||
logging.debug('SSL is enabled @{}:{} '
|
||||
'({})'.format(host, port, unit_name))
|
||||
return True
|
||||
elif port and not conf_port and conf_ssl:
|
||||
logging.debug('SSL is enabled @{} but not on port {} '
|
||||
'({})'.format(host, port, unit_name))
|
||||
return False
|
||||
# Port not checked (useful when checking that ssl is disabled)
|
||||
elif not port and conf_ssl:
|
||||
logging.debug('SSL is enabled @{}:{} '
|
||||
'({})'.format(host, port, unit_name))
|
||||
return True
|
||||
elif not conf_ssl:
|
||||
logging.debug('SSL not enabled @{}:{} '
|
||||
'({})'.format(host, port, unit_name))
|
||||
return False
|
||||
else:
|
||||
msg = ('Unknown condition when checking SSL status @{}:{} '
|
||||
'({})'.format(host, port, unit_name))
|
||||
raise ValueError(msg)
|
||||
|
||||
|
||||
def connect_amqp_by_unit(unit, ssl=False,
|
||||
port=None, fatal=True,
|
||||
username="testuser1", password="changeme"):
|
||||
"""Establish and return a pika amqp connection to the rabbitmq service.
|
||||
|
||||
Establish and return a pika amqp connection to the rabbitmq service
|
||||
running on a rmq juju unit.
|
||||
:param unit: unit pointer
|
||||
:param ssl: boolean, default to False
|
||||
:param port: amqp port, use defaults if None
|
||||
:param fatal: boolean, default to True (raises on connect error)
|
||||
:param username: amqp user name, default to testuser1
|
||||
:param password: amqp user password
|
||||
:returns: pika amqp connection pointer or None if failed and non-fatal
|
||||
"""
|
||||
host = unit.public_address
|
||||
unit_name = unit.entity_id
|
||||
|
||||
if ssl:
|
||||
# TODO: when Python3.5 support is removed, investigate
|
||||
# changing protocol to PROTOCOL_TLS
|
||||
context = libssl.SSLContext(protocol=libssl.PROTOCOL_TLSv1_2)
|
||||
ssl_options = pika.SSLOptions(context)
|
||||
else:
|
||||
ssl_options = None
|
||||
|
||||
# Default port logic if port is not specified
|
||||
if ssl and not port:
|
||||
port = 5671
|
||||
elif not ssl and not port:
|
||||
port = 5672
|
||||
|
||||
logging.debug('Connecting to amqp on {}:{} ({}) as '
|
||||
'{}...'.format(host, port, unit_name, username))
|
||||
|
||||
try:
|
||||
credentials = pika.PlainCredentials(username, password)
|
||||
parameters = pika.ConnectionParameters(host=host, port=port,
|
||||
credentials=credentials,
|
||||
ssl_options=ssl_options,
|
||||
connection_attempts=3,
|
||||
retry_delay=5,
|
||||
socket_timeout=1)
|
||||
connection = pika.BlockingConnection(parameters)
|
||||
assert connection.is_open is True
|
||||
logging.debug('Connect OK')
|
||||
return connection
|
||||
except Exception as e:
|
||||
msg = ('amqp connection failed to {}:{} as '
|
||||
'{} ({})'.format(host, port, username, str(e)))
|
||||
if fatal:
|
||||
raise Exception(msg)
|
||||
else:
|
||||
logging.warn(msg)
|
||||
return None
|
||||
|
||||
|
||||
def publish_amqp_message_by_unit(unit, message,
|
||||
queue="test", ssl=False,
|
||||
username="testuser1",
|
||||
password="changeme",
|
||||
port=None):
|
||||
"""Publish an amqp message to a rmq juju unit.
|
||||
|
||||
:param unit: unit pointer
|
||||
:param message: amqp message string
|
||||
:param queue: message queue, default to test
|
||||
:param username: amqp user name, default to testuser1
|
||||
:param password: amqp user password
|
||||
:param ssl: boolean, default to False
|
||||
:param port: amqp port, use defaults if None
|
||||
:returns: None. Raises exception if publish failed.
|
||||
"""
|
||||
logging.debug('Publishing message to {} queue:\n{}'.format(queue,
|
||||
message))
|
||||
connection = connect_amqp_by_unit(unit, ssl=ssl,
|
||||
port=port,
|
||||
username=username,
|
||||
password=password)
|
||||
|
||||
# NOTE(beisner): extra debug here re: pika hang potential:
|
||||
# https://github.com/pika/pika/issues/297
|
||||
# https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
|
||||
logging.debug('Defining channel...')
|
||||
channel = connection.channel()
|
||||
logging.debug('Declaring queue...')
|
||||
channel.queue_declare(queue=queue, auto_delete=False, durable=True)
|
||||
logging.debug('Publishing message...')
|
||||
channel.basic_publish(exchange='', routing_key=queue, body=message)
|
||||
logging.debug('Closing channel...')
|
||||
channel.close()
|
||||
logging.debug('Closing connection...')
|
||||
connection.close()
|
||||
|
||||
|
||||
def get_amqp_message_by_unit(unit, queue="test",
|
||||
username="testuser1",
|
||||
password="changeme",
|
||||
ssl=False, port=None):
|
||||
"""Get an amqp message from a rmq juju unit.
|
||||
|
||||
:param unit: unit pointer
|
||||
:param queue: message queue, default to test
|
||||
:param username: amqp user name, default to testuser1
|
||||
:param password: amqp user password
|
||||
:param ssl: boolean, default to False
|
||||
:param port: amqp port, use defaults if None
|
||||
:returns: amqp message body as string. Raise if get fails.
|
||||
"""
|
||||
connection = connect_amqp_by_unit(unit, ssl=ssl,
|
||||
port=port,
|
||||
username=username,
|
||||
password=password)
|
||||
channel = connection.channel()
|
||||
method_frame, _, body = channel.basic_get(queue)
|
||||
body = body.decode()
|
||||
|
||||
if method_frame:
|
||||
logging.debug('Retreived message from {} queue:\n{}'.format(queue,
|
||||
body))
|
||||
channel.basic_ack(method_frame.delivery_tag)
|
||||
channel.close()
|
||||
connection.close()
|
||||
return body
|
||||
else:
|
||||
msg = 'No message retrieved.'
|
||||
raise RmqNoMessageException(msg)
|
||||
|
||||
|
||||
def check_unit_cluster_nodes(unit, unit_node_names):
|
||||
"""Check if unit exists in list of Rmq cluster node names."""
|
||||
unit_name = unit.entity_id
|
||||
nodes = []
|
||||
errors = []
|
||||
str_stat = get_cluster_status(unit)
|
||||
# make the interesting part of rabbitmqctl cluster_status output
|
||||
# json-parseable.
|
||||
if 'nodes,[{disc,' in str_stat:
|
||||
pos_start = str_stat.find('nodes,[{disc,') + 13
|
||||
pos_end = str_stat.find(']}]},', pos_start) + 1
|
||||
str_nodes = str_stat[pos_start:pos_end].replace("'", '"')
|
||||
nodes = json.loads(str_nodes)
|
||||
for node in nodes:
|
||||
if node not in unit_node_names:
|
||||
errors.append('Cluster registration check failed on {}: '
|
||||
'{} should not be registered with RabbitMQ '
|
||||
'after unit removal.\n'
|
||||
''.format(unit_name, node))
|
||||
|
||||
return errors
|
||||
@@ -75,6 +75,16 @@ def get_network_config(net_topology, ignore_env_vars=False,
|
||||
return net_info
|
||||
|
||||
|
||||
def get_unit_hostnames(units):
|
||||
"""Return a dict of juju unit names to hostnames."""
|
||||
host_names = {}
|
||||
for unit in units:
|
||||
output = model.run_on_unit(unit.entity_id, 'hostname')
|
||||
hostname = output['Stdout'].strip()
|
||||
host_names[unit.entity_id] = hostname
|
||||
return host_names
|
||||
|
||||
|
||||
def get_pkg_version(application, pkg):
|
||||
"""Return package version.
|
||||
|
||||
@@ -526,6 +536,31 @@ def dist_upgrade(unit_name):
|
||||
model.run_on_unit(unit_name, dist_upgrade_cmd)
|
||||
|
||||
|
||||
def check_commands_on_units(commands, units):
|
||||
"""Check that all commands in a list exit zero on all units in a list.
|
||||
|
||||
:param commands: list of bash commands
|
||||
:param units: list of unit pointers
|
||||
:returns: None if successful; Failure message otherwise
|
||||
"""
|
||||
logging.debug('Checking exit codes for {} commands on {} '
|
||||
'units...'.format(len(commands),
|
||||
len(units)))
|
||||
|
||||
for u in units:
|
||||
for cmd in commands:
|
||||
output = model.run_on_unit(u.entity_id, cmd)
|
||||
if int(output['Code']) == 0:
|
||||
logging.debug('{} `{}` returned {} '
|
||||
'(OK)'.format(u.entity_id,
|
||||
cmd, output['Code']))
|
||||
else:
|
||||
return ('{} `{}` returned {} '
|
||||
'{}'.format(u.entity_id,
|
||||
cmd, output['Code'], output))
|
||||
return None
|
||||
|
||||
|
||||
def do_release_upgrade(unit_name):
|
||||
"""Run do-release-upgrade noninteractive.
|
||||
|
||||
@@ -729,6 +764,12 @@ def get_ubuntu_release(ubuntu_name):
|
||||
return index
|
||||
|
||||
|
||||
def get_file_contents(unit, f):
|
||||
"""Get contents of a file on a remote unit."""
|
||||
return model.run_on_unit(unit.entity_id,
|
||||
"cat {}".format(f))['Stdout']
|
||||
|
||||
|
||||
def is_port_open(port, address):
|
||||
"""Determine if TCP port is accessible.
|
||||
|
||||
@@ -752,3 +793,29 @@ def is_port_open(port, address):
|
||||
logging.error("connection refused connecting"
|
||||
" to {}:{}".format(address, port))
|
||||
return False
|
||||
|
||||
|
||||
def port_knock_units(units, port=22, expect_success=True):
|
||||
"""Check if specific port is open on units.
|
||||
|
||||
Open a TCP socket to check for a listening sevice on each listed juju unit.
|
||||
:param units: list of unit pointers
|
||||
:param port: TCP port number, default to 22
|
||||
:param timeout: Connect timeout, default to 15 seconds
|
||||
:expect_success: True by default, set False to invert logic
|
||||
:returns: None if successful, Failure message otherwise
|
||||
"""
|
||||
for u in units:
|
||||
host = u.public_address
|
||||
connected = is_port_open(port, host)
|
||||
if not connected and expect_success:
|
||||
return 'Socket connect failed.'
|
||||
elif connected and not expect_success:
|
||||
return 'Socket connected unexpectedly.'
|
||||
|
||||
|
||||
def get_series(unit):
|
||||
"""Ubuntu release name running on unit."""
|
||||
result = model.run_on_unit(unit.entity_id,
|
||||
"lsb_release -cs")
|
||||
return result['Stdout'].strip()
|
||||
|
||||
Reference in New Issue
Block a user