Merge pull request #365 from openstack-charmers/ovn-charm

Add OVS to OVN migration tests
This commit is contained in:
David Ames
2020-09-11 08:48:20 -07:00
committed by GitHub
7 changed files with 677 additions and 33 deletions
+20
View File
@@ -18,6 +18,26 @@ import zaza.openstack.charm_tests.test_utils as test_utils
from unittest.mock import patch
class TestBaseCharmTest(unittest.TestCase):
def test_get_my_tests_options(self):
class FakeTest(test_utils.BaseCharmTest):
def method(self, test_config):
self.test_config = test_config
return self.get_my_tests_options('aKey', 'aDefault')
f = FakeTest()
self.assertEquals(f.method({}), 'aDefault')
self.assertEquals(f.method({
'tests_options': {
'unit_tests.charm_tests.test_utils.'
'FakeTest.method.aKey': 'aValue',
},
}), 'aValue')
class TestOpenStackBaseTest(unittest.TestCase):
@patch.object(test_utils.openstack_utils, 'get_cacert')
@@ -1227,6 +1227,13 @@ class TestOpenStackUtils(ut_utils.BaseTestCase):
self.get_application.side_effect = [KeyError, KeyError]
self.assertFalse(openstack_utils.ovn_present())
def test_ngw_present(self):
self.patch_object(openstack_utils.model, 'get_application')
self.get_application.side_effect = None
self.assertTrue(openstack_utils.ngw_present())
self.get_application.side_effect = KeyError
self.assertFalse(openstack_utils.ngw_present())
def test_configure_gateway_ext_port(self):
# FIXME: this is not a complete unit test for the function as one did
# not exist at all I'm adding this to test one bit and we'll add more
@@ -1234,10 +1241,12 @@ class TestOpenStackUtils(ut_utils.BaseTestCase):
self.patch_object(openstack_utils, 'deprecated_external_networking')
self.patch_object(openstack_utils, 'dvr_enabled')
self.patch_object(openstack_utils, 'ovn_present')
self.patch_object(openstack_utils, 'ngw_present')
self.patch_object(openstack_utils, 'get_gateway_uuids')
self.patch_object(openstack_utils, 'get_admin_net')
self.dvr_enabled = False
self.ovn_present = False
self.dvr_enabled.return_value = False
self.ovn_present.return_value = False
self.ngw_present.return_value = True
self.get_admin_net.return_value = {'id': 'fakeid'}
novaclient = mock.MagicMock()
+127 -21
View File
@@ -24,6 +24,8 @@ import copy
import logging
import tenacity
from neutronclient.common import exceptions as neutronexceptions
import zaza
import zaza.openstack.charm_tests.nova.utils as nova_utils
import zaza.openstack.charm_tests.test_utils as test_utils
@@ -635,7 +637,8 @@ class NeutronNetworkingBase(test_utils.OpenStackBaseTest):
def validate_instance_can_reach_other(self,
instance_1,
instance_2,
verify):
verify,
mtu=None):
"""
Validate that an instance can reach a fixed and floating of another.
@@ -644,6 +647,12 @@ class NeutronNetworkingBase(test_utils.OpenStackBaseTest):
:param instance_2: The instance to check networking from
:type instance_2: nova_client.Server
:param verify: callback to verify result
:type verify: callable
:param mtu: Check that we can send non-fragmented packets of given size
:type mtu: Optional[int]
"""
floating_1 = floating_ips_from_instance(instance_1)[0]
floating_2 = floating_ips_from_instance(instance_2)[0]
@@ -653,19 +662,30 @@ class NeutronNetworkingBase(test_utils.OpenStackBaseTest):
password = guest.boot_tests['bionic'].get('password')
privkey = openstack_utils.get_private_key(nova_utils.KEYPAIR_NAME)
openstack_utils.ssh_command(
username, floating_1, 'instance-1',
'ping -c 1 {}'.format(address_2),
password=password, privkey=privkey, verify=verify)
cmds = [
'ping -c 1',
]
if mtu:
# the on-wire packet will be 28 bytes larger than the value
# provided to ping(8) -s parameter
packetsize = mtu - 28
cmds.append(
'ping -M do -s {} -c 1'.format(packetsize))
openstack_utils.ssh_command(
username, floating_1, 'instance-1',
'ping -c 1 {}'.format(floating_2),
password=password, privkey=privkey, verify=verify)
for cmd in cmds:
openstack_utils.ssh_command(
username, floating_1, 'instance-1',
'{} {}'.format(cmd, address_2),
password=password, privkey=privkey, verify=verify)
openstack_utils.ssh_command(
username, floating_1, 'instance-1',
'{} {}'.format(cmd, floating_2),
password=password, privkey=privkey, verify=verify)
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=1, max=60),
reraise=True, stop=tenacity.stop_after_attempt(8))
def validate_instance_can_reach_router(self, instance, verify):
def validate_instance_can_reach_router(self, instance, verify, mtu=None):
"""
Validate that an instance can reach it's primary gateway.
@@ -676,6 +696,12 @@ class NeutronNetworkingBase(test_utils.OpenStackBaseTest):
:param instance: The instance to check networking from
:type instance: nova_client.Server
:param verify: callback to verify result
:type verify: callable
:param mtu: Check that we can send non-fragmented packets of given size
:type mtu: Optional[int]
"""
address = floating_ips_from_instance(instance)[0]
@@ -683,9 +709,20 @@ class NeutronNetworkingBase(test_utils.OpenStackBaseTest):
password = guest.boot_tests['bionic'].get('password')
privkey = openstack_utils.get_private_key(nova_utils.KEYPAIR_NAME)
openstack_utils.ssh_command(
username, address, 'instance', 'ping -c 1 192.168.0.1',
password=password, privkey=privkey, verify=verify)
cmds = [
'ping -c 1',
]
if mtu:
# the on-wire packet will be 28 bytes larger than the value
# provided to ping(8) -s parameter
packetsize = mtu - 28
cmds.append(
'ping -M do -s {} -c 1'.format(packetsize))
for cmd in cmds:
openstack_utils.ssh_command(
username, address, 'instance', '{} 192.168.0.1'.format(cmd),
password=password, privkey=privkey, verify=verify)
@tenacity.retry(wait=tenacity.wait_exponential(min=5, max=60),
reraise=True, stop=tenacity.stop_after_attempt(8),
@@ -726,21 +763,67 @@ class NeutronNetworkingBase(test_utils.OpenStackBaseTest):
assert agent['admin_state_up']
assert agent['alive']
def effective_network_mtu(self, network_name):
"""Retrieve effective MTU for a network.
If the `instance-mtu` configuration option is set to a value lower than
the network MTU this method will return the value of that. Otherwise
Neutron's value for MTU on a network will be returned.
:param network_name: Name of network to query
:type network_name: str
:returns: MTU for network
:rtype: int
"""
cfg_instance_mtu = None
for app in ('neutron-gateway', 'neutron-openvswitch'):
try:
cfg = zaza.model.get_application_config(app)
cfg_instance_mtu = int(cfg['instance-mtu']['value'])
break
except KeyError:
pass
networks = self.neutron_client.show_network('', name=network_name)
network_mtu = int(next(iter(networks['networks']))['mtu'])
if cfg_instance_mtu and cfg_instance_mtu < network_mtu:
logging.info('Using MTU from application "{}" config: {}'
.format(app, cfg_instance_mtu))
return cfg_instance_mtu
else:
logging.info('Using MTU from network "{}": {}'
.format(network_name, network_mtu))
return network_mtu
def check_connectivity(self, instance_1, instance_2):
"""Run North/South and East/West connectivity tests."""
def verify(stdin, stdout, stderr):
"""Validate that the SSH command exited 0."""
self.assertEqual(stdout.channel.recv_exit_status(), 0)
try:
mtu_1 = self.effective_network_mtu(
network_name_from_instance(instance_1))
mtu_2 = self.effective_network_mtu(
network_name_from_instance(instance_2))
mtu_min = min(mtu_1, mtu_2)
except neutronexceptions.NotFound:
# Older versions of OpenStack cannot look up network by name, just
# skip the check if that is the case.
mtu_1 = mtu_2 = mtu_min = None
# Verify network from 1 to 2
self.validate_instance_can_reach_other(instance_1, instance_2, verify)
self.validate_instance_can_reach_other(
instance_1, instance_2, verify, mtu_min)
# Verify network from 2 to 1
self.validate_instance_can_reach_other(instance_2, instance_1, verify)
self.validate_instance_can_reach_other(
instance_2, instance_1, verify, mtu_min)
# Validate tenant to external network routing
self.validate_instance_can_reach_router(instance_1, verify)
self.validate_instance_can_reach_router(instance_2, verify)
self.validate_instance_can_reach_router(instance_1, verify, mtu_1)
self.validate_instance_can_reach_router(instance_2, verify, mtu_2)
def floating_ips_from_instance(instance):
@@ -769,6 +852,17 @@ def fixed_ips_from_instance(instance):
return ips_from_instance(instance, 'fixed')
def network_name_from_instance(instance):
"""Retrieve name of primary network the instance is attached to.
:param instance: The instance to fetch name of network from.
:type instance: nova_client.Server
:returns: Name of primary network the instance is attached to.
:rtype: str
"""
return next(iter(instance.addresses))
def ips_from_instance(instance, ip_type):
"""
Retrieve IPs of a certain type from an instance.
@@ -786,7 +880,8 @@ def ips_from_instance(instance, ip_type):
"Only 'floating' and 'fixed' are valid IP types to search for"
)
return list([
ip['addr'] for ip in instance.addresses['private']
ip['addr'] for ip in instance.addresses[
network_name_from_instance(instance)]
if ip['OS-EXT-IPS:type'] == ip_type])
@@ -794,11 +889,22 @@ class NeutronNetworkingTest(NeutronNetworkingBase):
"""Ensure that openstack instances have valid networking."""
def test_instances_have_networking(self):
"""Validate North/South and East/West networking."""
self.launch_guests()
"""Validate North/South and East/West networking.
Tear down can optionally be disabled by setting the module path +
class name + run_tearDown key under the `tests_options` key in
tests.yaml.
Abbreviated example:
...charm_tests.neutron.tests.NeutronNetworkingTest.run_tearDown: false
"""
instance_1, instance_2 = self.retrieve_guests()
if not all([instance_1, instance_2]):
self.launch_guests()
instance_1, instance_2 = self.retrieve_guests()
self.check_connectivity(instance_1, instance_2)
self.run_resource_cleanup = True
self.run_resource_cleanup = self.get_my_tests_options(
'run_resource_cleanup', True)
class NeutronNetworkingVRRPTests(NeutronNetworkingBase):
+154
View File
@@ -0,0 +1,154 @@
# Copyright 2020 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Code for configuring OVN tests."""
import logging
import zaza
import zaza.openstack.charm_tests.test_utils as test_utils
class _OVNSetupHelper(test_utils.BaseCharmTest):
"""Helper class to get at the common `config_change` helper."""
@staticmethod
def _get_instance_mtu_from_global_physnet_mtu():
"""Calculate instance mtu from Neutron API global-physnet-mtu.
:returns: Value for instance mtu after migration.
:rtype: int
"""
n_api_config = zaza.model.get_application_config('neutron-api')
# NOTE: we would have to adjust this calculation if we use IPv6 tunnel
# endpoints
GENEVE_ENCAP_OVERHEAD = 38
IP4_HEADER_SIZE = 20
return int(n_api_config['global-physnet-mtu']['value']) - (
GENEVE_ENCAP_OVERHEAD + IP4_HEADER_SIZE)
def _configure_apps(self, apps, cfg,
first_match_raise_if_none_found=False):
"""Conditionally configure a set of applications.
:param apps: Applications.
:type apps: Iterator[str]
:param cfg: Configuration to apply.
:type cfg: Dict[str,any]
:param first_match_raise_if_none_found: When set the method will
configure the first application
it finds in the model and raise
an exception if none are found.
:type first_match_raise_if_none_found: bool
:raises: RuntimeError
"""
for app in apps:
try:
zaza.model.get_application(app)
for k, v in cfg.items():
logging.info('Setting `{}` to "{}" on "{}"...'
.format(k, v, app))
with self.config_change(cfg, cfg, app):
# The intent here is to change the config and not
# restore it. We accomplish that by passing in the same
# value for default and alternate.
#
# The reason for using the `config_change` helper for
# this is that it already deals with all the
# permutations of config already being set etc and does
# not get into trouble if the test bundle already has
# the values we try to set.
if first_match_raise_if_none_found:
break
else:
continue
else:
if first_match_raise_if_none_found:
raise RuntimeError(
'None of the expected apps ({}) are present in '
'the model.'
.format(apps)
)
except KeyError:
pass
def configure_ngw_novs(self):
"""Configure n-ovs and n-gw units."""
cfg = {
# To be able to have instances successfully survive the migration
# without communication issues we need to lower the MTU announced
# to instances prior to migration.
#
# NOTE: In a real world scenario the end user would configure the
# MTU at least 24 hrs prior to doing the migration to allow
# instances to reconfigure as they renew the DHCP lease.
#
# NOTE: For classic n-gw topologies the `instance-mtu` config
# is a NOOP on neutron-openvswitch units, but that is ok.
'instance-mtu': self._get_instance_mtu_from_global_physnet_mtu()
}
apps = ('neutron-gateway', 'neutron-openvswitch')
self._configure_apps(apps, cfg)
cfg_ovs = {
# To be able to successfully clean up after the Neutron agents we
# need to use the 'openvswitch' `firewall-driver`.
'firewall-driver': 'openvswitch',
}
self._configure_apps(('neutron-openvswitch',), cfg_ovs)
def configure_ovn_mappings(self):
"""Copy mappings from n-gw or n-ovs application."""
dst_apps = ('ovn-dedicated-chassis', 'ovn-chassis')
src_apps = ('neutron-gateway', 'neutron-openvswitch')
ovn_cfg = {}
for app in src_apps:
try:
app_cfg = zaza.model.get_application_config(app)
ovn_cfg['bridge-interface-mappings'] = app_cfg[
'data-port']['value']
ovn_cfg['ovn-bridge-mappings'] = app_cfg[
'bridge-mappings']['value']
# Use values from neutron-gateway when present, otherwise use
# values from neutron-openvswitch
break
except KeyError:
pass
else:
raise RuntimeError(
'None of the expected apps ({}) are present in the model.'
.format(src_apps)
)
self._configure_apps(
dst_apps, ovn_cfg, first_match_raise_if_none_found=True)
def pre_migration_configuration():
"""Perform pre-migration configuration steps.
NOTE: Doing the configuration post-deploy and after doing initial network
configuration is an important part of the test as we need to prove that our
end users would be successful in doing this in the wild.
"""
# we use a helper class to leverage common setup code and the
# `config_change` helper
helper = _OVNSetupHelper()
helper.setUpClass()
# Configure `firewall-driver` and `instance-mtu` on n-gw and n-ovs units.
helper.configure_ngw_novs()
# Copy mappings from n-gw or n-ovs application to ovn-dedicated-chassis or
# ovn-chassis.
helper.configure_ovn_mappings()
+277
View File
@@ -15,8 +15,13 @@
"""Encapsulate OVN testing."""
import logging
import tenacity
import juju
import zaza
import zaza.openstack.charm_tests.test_utils as test_utils
import zaza.openstack.utilities.generic as generic_utils
import zaza.openstack.utilities.openstack as openstack_utils
@@ -71,3 +76,275 @@ class ChassisCharmOperationTest(BaseCharmOperationTest):
cls.services = [
'ovn-controller',
]
class OVSOVNMigrationTest(test_utils.BaseCharmTest):
"""OVS to OVN migration tests."""
def setUp(self):
"""Perform migration steps prior to validation."""
super(OVSOVNMigrationTest, self).setUp()
# These steps are here due to them having to be executed once and in a
# specific order prior to running any tests. The steps should still
# be idempotent if at all possible as a courtesy to anyone iterating
# on the test code.
try:
if self.one_time_init_done:
logging.debug('Skipping migration steps as they have already '
'run.')
return
except AttributeError:
logging.info('Performing migration steps.')
# as we progress through the steps our target deploy status changes
# store it in the class instance so the individual methods can
# update when appropriate.
self.target_deploy_status = self.test_config.get(
'target_deploy_status', {})
# Stop Neutron agents on hypervisors
self._pause_units('neutron-openvswitch')
try:
self._pause_units('neutron-gateway')
except KeyError:
logging.info(
'No neutron-gateway in deployment, skip pausing it.')
# Add the neutron-api-plugin-ovn subordinate which will make the
# `neutron-api-plugin-ovn` unit appear in the deployment.
#
# NOTE: The OVN drivers will not be activated until we change the
# value for the `manage-neutron-plugin-legacy-mode` config.
self._add_neutron_api_plugin_ovn_subordinate_relation()
# Adjust MTU on overlay networks
#
# Prior to this the end user will already have lowered the MTU on their
# running instances through the use of the `instance-mtu` configuration
# option and manual reconfiguration of instances that do not use DHCP.
#
# We update the value for the MTU on the overlay networks at this point
# in time because:
#
# - Agents are paused and will not actually reconfigure the networks.
#
# - Making changes to non-Geneve networks are prohibited as soon as the
# OVN drivers are activated.
#
# - Get the correct MTU value into the OVN database on first sync.
#
# - This will be particularly important for any instances using
# stateless IPv6 autoconfiguration (SLAAC) as there is currently
# no config knob to feed MTU information into the legacy ML2+OVS
# `radvd` configuration or the native OVN RA.
#
# - Said instances will reconfigure their IPv6 MTU as soon as they
# receive an RA with correct MTU when OVN takes over control.
self._run_migrate_mtu_action()
# Flip `manage-neutron-plugin-legacy-mode` to enable it
#
# NOTE(fnordahl): until we sync/repair the OVN DB this will make the
# `neutron-server` log errors. However we need the neutron unit to be
# unpaused while doing this to have the configuration rendered. The
# configuration is consumed by the `neutron-ovn-db-sync` tool.
self._configure_neutron_api()
# Stop the Neutron server prior to OVN DB sync/repair
self._pause_units('neutron-api')
# Sync the OVN DB
self._run_migrate_ovn_db_action()
# Perform the optional morphing of Neutron DB action
self._run_offline_neutron_morph_db_action()
self._resume_units('neutron-api')
# Run `cleanup` action on neutron-openvswitch units/hypervisors
self._run_cleanup_action('neutron-openvswitch')
# Run `cleanup` action on neutron-gateway units when present
try:
self._run_cleanup_action('neutron-gateway')
except KeyError:
logging.info(
'No neutron-gateway in deployment, skip cleanup of it.')
# Start the OVN controller on hypervisors
#
# NOTE(fnordahl): it is very important to have run cleanup prior to
# starting these, if you don't do that it is almost guaranteed that
# you will program the network to a state of infinite loop.
self._resume_units('ovn-chassis')
try:
self._resume_units('ovn-dedicated-chassis')
except KeyError:
logging.info(
'No ovn-dedicated-chassis in deployment, skip resume.')
# And we should be off to the races
self.one_time_init_done = True
def _add_neutron_api_plugin_ovn_subordinate_relation(self):
"""Add relation between neutron-api and neutron-api-plugin-ovn."""
try:
logging.info('Adding relation neutron-api-plugin-ovn '
'-> neutron-api')
zaza.model.add_relation(
'neutron-api-plugin-ovn', 'neutron-plugin',
'neutron-api:neutron-plugin-api-subordinate')
zaza.model.wait_for_agent_status()
zaza.model.wait_for_application_states(
states=self.test_config.get('target_deploy_status', {}))
except juju.errors.JujuAPIError:
# we were not able to add the relation, let's make sure it's
# because it's already there
assert (zaza.model.get_relation_id(
'neutron-api-plugin-ovn', 'neutron-api',
remote_interface_name='neutron-plugin-api-subordinate')
is not None), 'Unable to add relation required for test'
logging.info('--> On the other hand, did not need to add the '
'relation as it was already there.')
def _configure_neutron_api(self):
"""Set configuration option `manage-neutron-plugin-legacy-mode`."""
logging.info('Configuring `manage-neutron-plugin-legacy-mode` for '
'neutron-api...')
n_api_config = {
'manage-neutron-plugin-legacy-mode': False,
}
with self.config_change(
n_api_config, n_api_config, 'neutron-api'):
logging.info('done')
def _run_offline_neutron_morph_db_action(self):
"""Run offline-neutron-morph-db action."""
logging.info('Running the optional `offline-neutron-morph-db` action '
'on neutron-api-plugin-ovn/leader')
generic_utils.assertActionRanOK(
zaza.model.run_action_on_leader(
'neutron-api-plugin-ovn',
'offline-neutron-morph-db',
action_params={
'i-really-mean-it': True},
raise_on_failure=True,
)
)
def _run_migrate_ovn_db_action(self):
"""Run migrate-ovn-db action."""
logging.info('Running `migrate-ovn-db` action on '
'neutron-api-plugin-ovn/leader')
generic_utils.assertActionRanOK(
zaza.model.run_action_on_leader(
'neutron-api-plugin-ovn',
'migrate-ovn-db',
action_params={
'i-really-mean-it': True},
raise_on_failure=True,
)
)
# Charm readiness is no guarantee for API being ready to serve requests.
# https://bugs.launchpad.net/charm-neutron-api/+bug/1854518
@tenacity.retry(wait=tenacity.wait_exponential(min=5, max=60),
reraise=True, stop=tenacity.stop_after_attempt(3))
def _run_migrate_mtu_action(self):
"""Run migrate-mtu action with retry.
The action is idempotent.
Due to LP: #1854518 and the point in time of the test life cycle we run
this action the probability for the Neutron API not being available
for the script to do its job is high, thus we retry.
"""
logging.info('Running `migrate-mtu` action on '
'neutron-api-plugin-ovn/leader')
generic_utils.assertActionRanOK(
zaza.model.run_action_on_leader(
'neutron-api-plugin-ovn',
'migrate-mtu',
action_params={
'i-really-mean-it': True},
raise_on_failure=True,
)
)
def _pause_units(self, application):
"""Pause units of application.
:param application: Name of application
:type application: str
"""
logging.info('Pausing {} units'.format(application))
zaza.model.run_action_on_units(
[unit.entity_id
for unit in zaza.model.get_units(application)],
'pause',
raise_on_failure=True,
)
self.target_deploy_status.update(
{
application: {
'workload-status': 'maintenance',
'workload-status-message': 'Paused',
},
},
)
def _run_cleanup_action(self, application):
"""Run cleanup action on application units.
:param application: Name of application
:type application: str
"""
logging.info('Running `cleanup` action on {} units.'
.format(application))
zaza.model.run_action_on_units(
[unit.entity_id
for unit in zaza.model.get_units(application)],
'cleanup',
action_params={
'i-really-mean-it': True},
raise_on_failure=True,
)
def _resume_units(self, application):
"""Resume units of application.
:param application: Name of application
:type application: str
"""
logging.info('Resuming {} units'.format(application))
zaza.model.run_action_on_units(
[unit.entity_id
for unit in zaza.model.get_units(application)],
'resume',
raise_on_failure=True,
)
self.target_deploy_status.pop(application)
def test_ovs_ovn_migration(self):
"""Test migration of existing Neutron ML2+OVS deployment to OVN.
The test should be run after deployment and validation of a legacy
deployment combined with subsequent run of a network connectivity test
on instances created prior to the migration.
"""
# The setUp method of this test class will perform the migration steps.
# The tests.yaml is programmed to do further validation after the
# migration.
# Reset the n-gw and n-ovs instance-mtu configuration option so it does
# not influence how further tests are executed.
reset_config_keys = ['instance-mtu']
for app in ('neutron-gateway', 'neutron-openvswitch'):
try:
zaza.model.reset_application_config(app, reset_config_keys)
logging.info('Reset configuration to default on "{}" for "{}"'
.format(app, reset_config_keys))
except KeyError:
pass
zaza.model.wait_for_agent_status()
zaza.model.wait_for_application_states(
states=self.target_deploy_status)
+45
View File
@@ -15,6 +15,7 @@
import contextlib
import logging
import subprocess
import sys
import tenacity
import unittest
@@ -425,6 +426,50 @@ class BaseCharmTest(unittest.TestCase):
model_name=self.model_name,
pgrep_full=pgrep_full)
def get_my_tests_options(self, key, default=None):
"""Retrieve tests_options for specific test.
Prefix for key is built from dot-notated absolute path to calling
method or function.
Example:
# In tests.yaml:
tests_options:
zaza.charm_tests.noop.tests.NoopTest.test_foo.key: true
# called from zaza.charm_tests.noop.tests.NoopTest.test_foo()
>>> get_my_tests_options('key')
True
:param key: Suffix for tests_options key.
:type key: str
:param default: Default value to return if key is not found.
:type default: any
:returns: Value associated with key in tests_options.
:rtype: any
"""
# note that we need to do this in-line otherwise we would get the path
# to ourself. I guess we could create a common method that would go two
# frames back, but that would be kind of useless for anyone else than
# this method.
caller_path = []
# get path to module
caller_path.append(sys.modules[
sys._getframe().f_back.f_globals['__name__']].__name__)
# attempt to get class name
try:
caller_path.append(
sys._getframe().f_back.f_locals['self'].__class__.__name__)
except KeyError:
pass
# get method or function name
caller_path.append(sys._getframe().f_back.f_code.co_name)
return self.test_config.get('tests_options', {}).get(
'.'.join(caller_path + [key]), default)
class OpenStackBaseTest(BaseCharmTest):
"""Generic helpers for testing OpenStack API charms."""
+43 -10
View File
@@ -63,6 +63,9 @@ import tenacity
import textwrap
import urllib
import zaza
import zaza.charm_lifecycle.utils as lifecycle_utils
from zaza import model
from zaza.openstack.utilities import (
exceptions,
@@ -553,6 +556,20 @@ def dvr_enabled():
return get_application_config_option('neutron-api', 'enable-dvr')
def ngw_present():
"""Check whether Neutron Gateway is present in deployment.
:returns: True when Neutron Gateway is present, False otherwise
:rtype: bool
"""
try:
model.get_application('neutron-gateway')
return True
except KeyError:
pass
return False
def ovn_present():
"""Check whether OVN is present in deployment.
@@ -630,15 +647,20 @@ def add_interface_to_netplan(server_name, mac_address):
:type mac_address: string
"""
if dvr_enabled():
application_name = 'neutron-openvswitch'
application_names = ('neutron-openvswitch',)
elif ovn_present():
# OVN chassis is a subordinate to nova-compute
application_name = 'nova-compute'
application_names = ('nova-compute', 'ovn-dedicated-chassis')
else:
application_name = 'neutron-gateway'
application_names = ('neutron-gateway',)
unit_name = juju_utils.get_unit_name_from_host_name(
server_name, application_name)
for app_name in application_names:
unit_name = juju_utils.get_unit_name_from_host_name(
server_name, app_name)
if unit_name:
break
else:
raise RuntimeError('Unable to find unit to run commands on.')
run_cmd_nic = "ip -f link -br -o addr|grep {}".format(mac_address)
interface = model.run_on_unit(unit_name, run_cmd_nic)
interface = interface['Stdout'].split(' ')[0]
@@ -715,6 +737,9 @@ def configure_gateway_ext_port(novaclient, neutronclient, net_id=None,
except KeyError:
# neutron-gateway not in deployment
pass
elif ngw_present():
uuids = itertools.islice(get_gateway_uuids(), limit_gws)
application_names = ['neutron-gateway']
elif ovn_present():
uuids = itertools.islice(get_ovn_uuids(), limit_gws)
application_names = ['ovn-chassis']
@@ -729,8 +754,7 @@ def configure_gateway_ext_port(novaclient, neutronclient, net_id=None,
config.update({'ovn-bridge-mappings': 'physnet1:br-ex'})
add_dataport_to_netplan = True
else:
uuids = itertools.islice(get_gateway_uuids(), limit_gws)
application_names = ['neutron-gateway']
raise RuntimeError('Unable to determine charm network topology.')
if not net_id:
net_id = get_admin_net(neutronclient)['id']
@@ -745,8 +769,9 @@ def configure_gateway_ext_port(novaclient, neutronclient, net_id=None,
'Neutron Gateway already has additional port')
break
else:
logging.info('Attaching additional port to instance, '
'connected to net id: {}'.format(net_id))
logging.info('Attaching additional port to instance ("{}"), '
'connected to net id: {}'
.format(uuid, net_id))
body_value = {
"port": {
"admin_state_up": True,
@@ -796,7 +821,15 @@ def configure_gateway_ext_port(novaclient, neutronclient, net_id=None,
# NOTE(fnordahl): We are stuck with juju_wait until we figure out how
# to deal with all the non ['active', 'idle', 'Unit is ready.']
# workload/agent states and msgs that our mojo specs are exposed to.
juju_wait.wait(wait_for_workload=True)
if lifecycle_utils.get_config_options().get(
'configure_gateway_ext_port_use_juju_wait', True):
juju_wait.wait(wait_for_workload=True)
else:
zaza.model.wait_for_agent_status()
test_config = zaza.charm_lifecycle.utils.get_charm_config(
fatal=False)
zaza.model.wait_for_application_states(
states=test_config.get('target_deploy_status', {}))
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=1, max=60),