Files
zaza-openstack-tests/zaza/openstack/charm_tests/test_utils.py
Liam Young cf2b0f21b9 Merge pull request #222 from openstack-charmers/issue/221
octavia: tear down resources created during LBaasV2 test
2020-04-08 07:34:41 +01:00

396 lines
16 KiB
Python

# Copyright 2018 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module containing base class for implementing charm tests."""
import contextlib
import logging
import subprocess
import unittest
import zaza.model as model
import zaza.charm_lifecycle.utils as lifecycle_utils
import zaza.openstack.utilities.openstack as openstack_utils
import zaza.openstack.utilities.generic as generic_utils
def skipIfNotHA(service_name):
"""Run decorator to skip tests if application not in HA configuration."""
def _skipIfNotHA_inner_1(f):
def _skipIfNotHA_inner_2(*args, **kwargs):
ips = model.get_app_ips(
service_name)
if len(ips) > 1:
return f(*args, **kwargs)
else:
logging.warn("Skipping HA test for non-ha service {}".format(
service_name))
return _skipIfNotHA_inner_2
return _skipIfNotHA_inner_1
def skipUntilVersion(service, package, release):
"""Run decorator to skip this test if application version is too low."""
def _skipUntilVersion_inner_1(f):
def _skipUntilVersion_inner_2(*args, **kwargs):
package_version = generic_utils.get_pkg_version(service, package)
try:
subprocess.check_call(['dpkg', '--compare-versions',
package_version, 'ge', release],
stderr=subprocess.STDOUT,
universal_newlines=True)
return f(*args, **kwargs)
except subprocess.CalledProcessError:
logging.warn("Skipping test for older ({})"
"service {}, requested {}".format(
package_version, service, release))
return _skipUntilVersion_inner_2
return _skipUntilVersion_inner_1
def audit_assertions(action,
expected_passes,
expected_failures=None,
expected_to_pass=True):
"""Check expected assertion failures in security-checklist actions.
:param action: Action object from running the security-checklist action
:type action: juju.action.Action
:param expected_passes: List of test names that are expected to pass
:type expected_passes: List[str]
:param expected_failures: List of test names that are expected to fail
:type expected_failures: List[str]
:raises: AssertionError if the assertion fails.
"""
if expected_failures is None:
expected_failures = []
if expected_to_pass:
assert action.data["status"] == "completed", \
"Security check is expected to pass by default"
else:
assert action.data["status"] == "failed", \
"Security check is not expected to pass by default"
results = action.data['results']
for key, value in results.items():
if key in expected_failures:
assert "FAIL" in value, "Unexpected test pass: {}".format(key)
if key in expected_passes:
assert value == "PASS", "Unexpected failure: {}".format(key)
class BaseCharmTest(unittest.TestCase):
"""Generic helpers for testing charms."""
run_resource_cleanup = False
@classmethod
def resource_cleanup(cls):
"""Cleanup any resources created during the test run.
Override this method with a method which removes any resources
which were created during the test run. If the test sets
"self.run_resource_cleanup = True" then cleanup will be
performed.
"""
pass
@classmethod
def tearDown(cls):
"""Run teardown for test class."""
if cls.run_resource_cleanup:
logging.info('Running resource cleanup')
cls.resource_cleanup()
@classmethod
def setUpClass(cls, application_name=None, model_alias=None):
"""Run setup for test class to create common resources."""
cls.model_aliases = model.get_juju_model_aliases()
if model_alias:
cls.model_name = cls.model_aliases[model_alias]
else:
cls.model_name = model.get_juju_model()
cls.test_config = lifecycle_utils.get_charm_config(fatal=False)
if application_name:
cls.application_name = application_name
else:
cls.application_name = cls.test_config['charm_name']
cls.lead_unit = model.get_lead_unit_name(
cls.application_name,
model_name=cls.model_name)
logging.debug('Leader unit is {}'.format(cls.lead_unit))
def config_current(self, application_name=None, keys=None):
"""Get Current Config of an application normalized into key-values.
:param application_name: String application name for use when called
by a charm under test other than the object's
application.
:type application_name: Optional[str]
:param keys: iterable of strs to index into the current config. If
None, return all keys from the config
:type keys: Optional[Iterable[str]]
:return: Dictionary of requested config from application
:rtype: Dict[str, Any]
"""
if not application_name:
application_name = self.application_name
_app_config = model.get_application_config(application_name)
keys = keys or _app_config.keys()
return {
k: _app_config.get(k, {}).get('value')
for k in keys
}
@staticmethod
def _stringed_value_config(config):
"""Stringify values in a dict.
Workaround:
libjuju refuses to accept data with types other than strings
through the zuzu.model.set_application_config
:param config: Config dictionary with any typed values
:type config: Dict[str,Any]
:return: Config Dictionary with string-ly typed values
:rtype: Dict[str,str]
"""
# if v is None, stringify to ''
# otherwise use a strict cast with str(...)
return {
k: '' if v is None else str(v)
for k, v in config.items()
}
@contextlib.contextmanager
def config_change(self, default_config, alternate_config,
application_name=None):
"""Run change config tests.
Change config to `alternate_config`, wait for idle workload status,
yield, return config to `default_config` and wait for idle workload
status before return from function.
Example usage:
with self.config_change({'preferred-api-version': '2'},
{'preferred-api-version': '3'}):
do_something()
:param default_config: Dict of charm settings to set on completion
:type default_config: dict
:param alternate_config: Dict of charm settings to change to
:type alternate_config: dict
:param application_name: String application name for use when called
by a charm under test other than the object's
application.
:type application_name: str
"""
if not application_name:
application_name = self.application_name
# we need to compare config values to what is already applied before
# attempting to set them. otherwise the model will behave differently
# than we would expect while waiting for completion of the change
app_config = self.config_current(
application_name, keys=alternate_config.keys()
)
if all(item in app_config.items()
for item in alternate_config.items()):
logging.debug('alternate_config equals what is already applied '
'config')
yield
if default_config == alternate_config:
logging.debug('default_config also equals what is already '
'applied config')
return
logging.debug('alternate_config already set, and default_config '
'needs to be applied before return')
else:
logging.debug('Changing charm setting to {}'
.format(alternate_config))
model.set_application_config(
application_name,
self._stringed_value_config(alternate_config),
model_name=self.model_name)
logging.debug(
'Waiting for units to execute config-changed hook')
model.wait_for_agent_status(model_name=self.model_name)
logging.debug(
'Waiting for units to reach target states')
model.wait_for_application_states(
model_name=self.model_name,
states=self.test_config.get('target_deploy_status', {}))
# TODO: Optimize with a block on a specific application until idle.
model.block_until_all_units_idle()
yield
logging.debug('Restoring charm setting to {}'.format(default_config))
model.set_application_config(
application_name,
self._stringed_value_config(default_config),
model_name=self.model_name)
logging.debug(
'Waiting for units to reach target states')
model.wait_for_application_states(
model_name=self.model_name,
states=self.test_config.get('target_deploy_status', {}))
# TODO: Optimize with a block on a specific application until idle.
model.block_until_all_units_idle()
def restart_on_changed(self, config_file, default_config, alternate_config,
default_entry, alternate_entry, services,
pgrep_full=False):
"""Run restart on change tests.
Test that changing config results in config file being updates and
services restarted. Return config to default_config afterwards
:param config_file: Config file to check for settings
:type config_file: str
:param default_config: Dict of charm settings to set on completion
:type default_config: dict
:param alternate_config: Dict of charm settings to change to
:type alternate_config: dict
:param default_entry: Config file entries that correspond to
default_config
:type default_entry: dict
:param alternate_entry: Config file entries that correspond to
alternate_config
:type alternate_entry: dict
:param services: Services expected to be restarted when config_file is
changed.
:type services: list
:param pgrep_full: Should pgrep be used rather than pidof to identify
a service.
:type pgrep_full: bool
"""
# lead_unit is only useed to grab a timestamp, the assumption being
# that all the units times are in sync.
mtime = model.get_unit_time(
self.lead_unit,
model_name=self.model_name)
logging.debug('Remote unit timestamp {}'.format(mtime))
with self.config_change(default_config, alternate_config):
# If this is not an OSLO config file set default_config={}
if alternate_entry:
logging.debug(
'Waiting for updates to propagate to {}'
.format(config_file))
model.block_until_oslo_config_entries_match(
self.application_name,
config_file,
alternate_entry,
model_name=self.model_name)
else:
model.block_until_all_units_idle(model_name=self.model_name)
# Config update has occured and hooks are idle. Any services should
# have been restarted by now:
logging.debug(
'Waiting for services ({}) to be restarted'.format(services))
model.block_until_services_restarted(
self.application_name,
mtime,
services,
model_name=self.model_name,
pgrep_full=pgrep_full)
# If this is not an OSLO config file set default_config={}
if default_entry:
logging.debug(
'Waiting for updates to propagate to '.format(config_file))
model.block_until_oslo_config_entries_match(
self.application_name,
config_file,
default_entry,
model_name=self.model_name)
else:
model.block_until_all_units_idle(model_name=self.model_name)
@contextlib.contextmanager
def pause_resume(self, services, pgrep_full=False):
"""Run Pause and resume tests.
Pause and then resume a unit checking that services are in the
required state after each action
:param services: Services expected to be restarted when the unit is
paused/resumed.
:type services: list
:param pgrep_full: Should pgrep be used rather than pidof to identify
a service.
:type pgrep_full: bool
"""
model.block_until_service_status(
self.lead_unit,
services,
'running',
model_name=self.model_name,
pgrep_full=pgrep_full)
model.block_until_unit_wl_status(
self.lead_unit,
'active',
model_name=self.model_name)
generic_utils.assertActionRanOK(model.run_action(
self.lead_unit,
'pause',
model_name=self.model_name))
model.block_until_unit_wl_status(
self.lead_unit,
'maintenance',
model_name=self.model_name)
model.block_until_all_units_idle(model_name=self.model_name)
model.block_until_service_status(
self.lead_unit,
services,
'stopped',
model_name=self.model_name,
pgrep_full=pgrep_full)
yield
generic_utils.assertActionRanOK(model.run_action(
self.lead_unit,
'resume',
model_name=self.model_name))
model.block_until_unit_wl_status(
self.lead_unit,
'active',
model_name=self.model_name)
model.block_until_all_units_idle(model_name=self.model_name)
model.block_until_service_status(
self.lead_unit,
services,
'running',
model_name=self.model_name,
pgrep_full=pgrep_full)
class OpenStackBaseTest(BaseCharmTest):
"""Generic helpers for testing OpenStack API charms."""
@classmethod
def setUpClass(cls, application_name=None, model_alias=None):
"""Run setup for test class to create common resources."""
super(OpenStackBaseTest, cls).setUpClass()
cls.keystone_session = openstack_utils.get_overcloud_keystone_session(
model_name=cls.model_name)
cls.cacert = openstack_utils.get_cacert()