octavia: tear down resources created during LBaasV2 test

Fixes #221
This commit is contained in:
Frode Nordahl
2020-04-07 10:55:09 +02:00
parent f35da80f27
commit 116da9ca6c

View File

@@ -18,6 +18,8 @@ import logging
import subprocess
import tenacity
import osc_lib.exceptions
import zaza.openstack.charm_tests.test_utils as test_utils
import zaza.openstack.utilities.openstack as openstack_utils
@@ -47,15 +49,52 @@ class LBAASv2Test(test_utils.OpenStackBaseTest):
"""Run class setup for running LBaaSv2 service tests."""
super(LBAASv2Test, cls).setUpClass()
cls.keystone_session = openstack_utils.get_overcloud_keystone_session()
cls.neutron_client = openstack_utils.get_neutron_session_client(
cls.keystone_session)
cls.octavia_client = openstack_utils.get_octavia_session_client(
cls.keystone_session)
# NOTE(fnordahl): in the event of a test failure we do not want to run
# tear down code as it will make debugging a problem virtually
# impossible. To alleviate each test method will set the
# `run_tearDown` instance variable at the end which will let us run
# tear down only when there were no failure.
cls.run_tearDown = False
# List of load balancers created by this test
cls.loadbalancers = []
# LIst of floating IPs created by this test
cls.fips = []
@classmethod
def tearDown(cls):
"""Remove resources created during test execution.
Note that resources created in the configure step prior to executing
the test should not be touched here.
"""
for lb in cls.loadbalancers:
cls.octavia_client.load_balancer_delete(lb['id'], cascade=True)
try:
cls.wait_for_lb_resource(
cls.octavia_client.load_balancer_show, lb['id'],
provisioning_status='DELETED')
except osc_lib.exceptions.NotFound:
pass
for fip in cls.fips:
cls.neutron_client.delete_floatingip(fip)
@staticmethod
@tenacity.retry(wait=tenacity.wait_fixed(1),
reraise=True, stop=tenacity.stop_after_delay(900))
@tenacity.retry(retry=tenacity.retry_if_exception_type(AssertionError),
wait=tenacity.wait_fixed(1), reraise=True,
stop=tenacity.stop_after_delay(900))
def wait_for_lb_resource(octavia_show_func, resource_id,
operating_status=None):
provisioning_status=None, operating_status=None):
"""Wait for loadbalancer resource to reach expected status."""
provisioning_status = provisioning_status or 'ACTIVE'
resp = octavia_show_func(resource_id)
logging.info(resp['provisioning_status'])
assert resp['provisioning_status'] == 'ACTIVE', (
assert resp['provisioning_status'] == provisioning_status, (
'load balancer resource has not reached '
'expected provisioning status: {}'
.format(resp))
@@ -197,11 +236,8 @@ class LBAASv2Test(test_utils.OpenStackBaseTest):
def test_create_loadbalancer(self):
"""Create load balancer."""
keystone_session = openstack_utils.get_overcloud_keystone_session()
neutron_client = openstack_utils.get_neutron_session_client(
keystone_session)
nova_client = openstack_utils.get_nova_session_client(
keystone_session)
self.keystone_session)
# Get IP of the prepared payload instances
payload_ips = []
@@ -209,24 +245,24 @@ class LBAASv2Test(test_utils.OpenStackBaseTest):
payload_ips.append(server.networks['private'][0])
self.assertTrue(len(payload_ips) > 0)
resp = neutron_client.list_networks(name='private')
resp = self.neutron_client.list_networks(name='private')
subnet_id = resp['networks'][0]['subnets'][0]
if openstack_utils.dvr_enabled():
resp = neutron_client.list_networks(name='private_lb_fip_network')
resp = self.neutron_client.list_networks(
name='private_lb_fip_network')
vip_subnet_id = resp['networks'][0]['subnets'][0]
else:
vip_subnet_id = subnet_id
octavia_client = openstack_utils.get_octavia_session_client(
keystone_session)
for provider in self.get_lb_providers(octavia_client).keys():
for provider in self.get_lb_providers(self.octavia_client).keys():
logging.info('Creating loadbalancer with provider {}'
.format(provider))
lb = self._create_lb_resources(octavia_client, provider,
lb = self._create_lb_resources(self.octavia_client, provider,
vip_subnet_id, subnet_id,
payload_ips)
self.loadbalancers.append(lb)
lb_fp = openstack_utils.create_floating_ip(
neutron_client, 'ext_net', port={'id': lb['vip_port_id']})
self.neutron_client, 'ext_net', port={'id': lb['vip_port_id']})
snippet = 'This is the default welcome page'
assert snippet in self._get_payload(lb_fp['floating_ip_address'])
@@ -234,3 +270,6 @@ class LBAASv2Test(test_utils.OpenStackBaseTest):
' (provider="{}") at "http://{}/"'
.format(snippet, provider,
lb_fp['floating_ip_address']))
# If we get here, it means the tests passed
self.run_tearDown = True