Merge pull request #163 from fnordahl/add-ovs-and-octavia-tests

Add Octavia tests
This commit is contained in:
David Ames
2018-11-30 08:22:45 -08:00
committed by GitHub
10 changed files with 265 additions and 30 deletions
+1
View File
@@ -27,6 +27,7 @@ python-heatclient
python-keystoneclient
python-neutronclient
python-novaclient
python-octaviaclient
python-swiftclient
tenacity
distro-info
+1
View File
@@ -38,6 +38,7 @@ install_require = [
'python-keystoneclient',
'python-novaclient',
'python-neutronclient',
'python-octaviaclient',
'python-cinderclient',
]
@@ -100,7 +100,8 @@ class TestUtilitiesCert(ut_utils.BaseTestCase):
self.patch_object(cert, 'rsa')
self.patch_object(cert, 'cryptography')
cert.generate_cert('unit_test.ci.local', password='secret')
self.serialization.BestAvailableEncryption.assert_called_with('secret')
self.serialization.BestAvailableEncryption.assert_called_with(
b'secret')
self.cryptography.x509.NameAttribute.assert_called_with(
self.cryptography.x509.oid.NameOID.COMMON_NAME,
'unit_test.ci.local',
@@ -163,7 +164,7 @@ class TestUtilitiesCert(ut_utils.BaseTestCase):
self.assertTrue(self.serialization.NoEncryption.called)
self.serialization.load_pem_private_key.assert_called_with(
'signing_key',
password='signing_key_password',
password=b'signing_key_password',
backend=self.cryptography.hazmat.backends.default_backend(),
)
self.cryptography.x509.NameAttribute.assert_called_with(
+42 -26
View File
@@ -30,6 +30,38 @@ def basic_setup():
"""
def add_image(image_url, glance_client=None, image_name=None, tags=[]):
"""Retrieve image from ``image_url`` and add it to glance.
:param image_url: Retrievable URL with image data
:type image_url: str
:param glance: Authenticated glanceclient
:type glance: glanceclient.Client
:param image_name: Label for the image in glance
:type image_name: str
:param tags: List of tags to add to image
:type tags: list of str
"""
if not glance_client:
keystone_session = openstack_utils.get_overcloud_keystone_session()
glance_client = openstack_utils.get_glance_session_client(
keystone_session)
if image_name:
image = openstack_utils.get_images_by_name(
glance_client, image_name)
if image:
logging.warning('Using existing glance image "{}" ({})'
.format(image_name, image[0].id))
else:
logging.info('Downloading image {}'.format(image_name or image_url))
openstack_utils.create_image(
glance_client,
image_url,
image_name,
tags=tags)
def add_cirros_image(glance_client=None, image_name=None):
"""Add a cirros image to the current deployment.
@@ -39,18 +71,10 @@ def add_cirros_image(glance_client=None, image_name=None):
:type image_name: str
"""
image_name = image_name or CIRROS_IMAGE_NAME
if not glance_client:
keystone_session = openstack_utils.get_overcloud_keystone_session()
glance_client = openstack_utils.get_glance_session_client(
keystone_session)
if openstack_utils.get_images_by_name(glance_client, image_name):
logging.warning('Using existing glance image')
else:
image_url = openstack_utils.find_cirros_image(arch='x86_64')
openstack_utils.create_image(
glance_client,
image_url,
image_name)
image_url = openstack_utils.find_cirros_image(arch='x86_64')
add_image(image_url,
glance_client=glance_client,
image_name=image_name)
def add_lts_image(glance_client=None, image_name=None, release=None):
@@ -65,17 +89,9 @@ def add_lts_image(glance_client=None, image_name=None, release=None):
"""
image_name = image_name or LTS_IMAGE_NAME
release = release or LTS_RELEASE
if not glance_client:
keystone_session = openstack_utils.get_overcloud_keystone_session()
glance_client = openstack_utils.get_glance_session_client(
keystone_session)
if openstack_utils.get_images_by_name(glance_client, image_name):
logging.warning('Using existing glance image')
else:
image_url = openstack_utils.find_ubuntu_image(
release=release,
arch='amd64')
openstack_utils.create_image(
glance_client,
image_url,
image_name)
image_url = openstack_utils.find_ubuntu_image(
release=release,
arch='amd64')
add_image(image_url,
glance_client=glance_client,
image_name=image_name)
@@ -42,6 +42,9 @@ class NeutronOpenvSwitchOverlayTest(unittest.TestCase):
ports = resp['ports']
host_port = {}
for port in ports:
if (port['device_owner'].startswith('network:') or
port['device_owner'].startswith('compute:')):
continue
host_port[port['binding:host_id']] = port
for unit in zaza.model.get_units('neutron-openvswitch'):
+15
View File
@@ -0,0 +1,15 @@
# Copyright 2018 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Collection of code for setting up and testing octavia."""
+83
View File
@@ -0,0 +1,83 @@
# Copyright 2018 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Code for configuring octavia."""
import os
import base64
import logging
import zaza.utilities.cert
import zaza.charm_lifecycle.utils
import zaza.charm_tests.test_utils
import zaza.charm_tests.glance.setup as glance_setup
def add_amphora_image(image_url=None):
"""Add Octavia ``amphora`` test image to glance.
:param image_url: URL where image resides
:type image_url: str
"""
image_name = 'amphora-x64-haproxy'
if not image_url:
image_url = (
os.environ.get('FUNCTEST_AMPHORA_LOCATION', None) or
'http://tarballs.openstack.org/octavia/test-images/'
'test-only-amphora-x64-haproxy-ubuntu-xenial.qcow2')
glance_setup.add_image(
image_url,
image_name=image_name,
tags=['octavia-amphora'])
def configure_amphora_certs():
"""Configure certificates for internal Octavia client/server auth."""
(issuing_cakey, issuing_cacert) = zaza.utilities.cert.generate_cert(
'OSCI Zaza Issuer',
password='zaza',
generate_ca=True)
(controller_cakey, controller_cacert) = zaza.utilities.cert.generate_cert(
'OSCI Zaza Octavia Controller',
generate_ca=True)
(controller_key, controller_cert) = zaza.utilities.cert.generate_cert(
'*.serverstack',
issuer_name='OSCI Zaza Octavia Controller',
signing_key=controller_cakey)
controller_bundle = controller_cert + controller_key
cert_config = {
'lb-mgmt-issuing-cacert': base64.b64encode(
issuing_cacert).decode('utf-8'),
'lb-mgmt-issuing-ca-private-key': base64.b64encode(
issuing_cakey).decode('utf-8'),
'lb-mgmt-issuing-ca-key-passphrase': 'zaza',
'lb-mgmt-controller-cacert': base64.b64encode(
controller_cacert).decode('utf-8'),
'lb-mgmt-controller-cert': base64.b64encode(
controller_bundle).decode('utf-8'),
}
logging.info('Configuring certificates for mandatory Octavia '
'client/server authentication '
'(client being the ``Amphorae`` load balancer instances)')
# Our expected workload status will change after we have configured the
# certificates
test_config = zaza.charm_lifecycle.utils.get_charm_config()
del test_config['target_deploy_status']['octavia']
_singleton = zaza.charm_tests.test_utils.OpenStackBaseTest()
_singleton.setUpClass()
with _singleton.config_change(cert_config, cert_config):
# wait for configuration to be applied then return
pass
+79
View File
@@ -0,0 +1,79 @@
# Copyright 2018 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Encapsulate octavia testing."""
import logging
import tenacity
import zaza.charm_tests.test_utils as test_utils
import zaza.utilities.openstack as openstack_utils
class CharmOperationTest(test_utils.OpenStackBaseTest):
"""Charm operation tests."""
@classmethod
def setUpClass(cls):
"""Run class setup for running Octavia charm operation tests."""
super(CharmOperationTest, cls).setUpClass()
def test_pause_resume(self):
"""Run pause and resume tests.
Pause service and check services are stopped, then resume and check
they are started.
"""
self.pause_resume(['apache2'])
class LBAASv2Test(test_utils.OpenStackBaseTest):
"""LBaaSv2 service tests."""
@classmethod
def setUpClass(cls):
"""Run class setup for running LBaaSv2 service tests."""
super(LBAASv2Test, cls).setUpClass()
def test_create_loadbalancer(self):
"""Create load balancer."""
keystone_session = openstack_utils.get_overcloud_keystone_session()
neutron_client = openstack_utils.get_neutron_session_client(
keystone_session)
resp = neutron_client.list_networks(name='private')
subnet_id = resp['networks'][0]['subnets'][0]
octavia_client = openstack_utils.get_octavia_session_client(
keystone_session)
result = octavia_client.load_balancer_create(
json={
'loadbalancer': {
'description': 'Created by Zaza',
'admin_state_up': True,
'vip_subnet_id': subnet_id,
'name': 'zaza-lb-0',
}})
lb_id = result['loadbalancer']['id']
@tenacity.retry(wait=tenacity.wait_fixed(1),
reraise=True, stop=tenacity.stop_after_delay(900))
def wait_for_loadbalancer(octavia_client, load_balancer_id):
resp = octavia_client.load_balancer_show(load_balancer_id)
if resp['provisioning_status'] != 'ACTIVE':
raise Exception('load balancer has not reached expected '
'status: {}'.format(resp))
return resp
logging.info('Awaiting loadbalancer to reach provisioning_status '
'"ACTIVE"')
resp = wait_for_loadbalancer(octavia_client, lb_id)
logging.info(resp)
+4 -1
View File
@@ -58,11 +58,14 @@ def generate_cert(common_name,
:rtype: cryptography.x509.Certificate
"""
if password is not None:
encryption_algorithm = serialization.BestAvailableEncryption(password)
encryption_algorithm = serialization.BestAvailableEncryption(
password.encode('utf-8'))
else:
encryption_algorithm = serialization.NoEncryption()
if signing_key:
if signing_key_password:
signing_key_password = signing_key_password.encode('utf-8')
_signing_key = serialization.load_pem_private_key(
signing_key,
password=signing_key_password,
+34 -1
View File
@@ -37,6 +37,7 @@ import zaza.utilities.cert as cert
from novaclient import client as novaclient_client
from neutronclient.v2_0 import client as neutronclient
from neutronclient.common import exceptions as neutronexceptions
from octaviaclient.api.v2 import octavia as octaviaclient
import io
import juju_wait
@@ -200,6 +201,31 @@ def get_neutron_session_client(session):
return neutronclient.Client(session=session)
def get_octavia_session_client(session, service_type='load-balancer',
interface='internal'):
"""Return neutronclient authenticated by keystone session.
:param session: Keystone session object
:type session: keystoneauth1.session.Session object
:param service_type: Service type to look for in catalog
:type service_type: str
:param interface: Interface to look for in catalog
:type interface: str
:returns: Authenticated octaviaclient
:rtype: octaviaclient.OctaviaAPI object
"""
keystone_client = get_keystone_session_client(session)
lbaas_service = keystone_client.services.list(type=service_type)
for service in lbaas_service:
lbaas_endpoint = keystone_client.endpoints.list(service=service,
interface='internal')
for endpoint in lbaas_endpoint:
break
return octaviaclient.OctaviaAPI(session=session,
service_type=service_type,
endpoint=endpoint.url)
def get_cinder_session_client(session):
"""Return cinderclient authenticated by keystone session.
@@ -1549,7 +1575,7 @@ def upload_image_to_glance(glance, local_path, image_name, disk_format='qcow2',
return image
def create_image(glance, image_url, image_name, image_cache_dir=None):
def create_image(glance, image_url, image_name, image_cache_dir=None, tags=[]):
"""Download the image and upload it to glance.
Download an image from image_url and upload it to glance labelling
@@ -1564,6 +1590,8 @@ def create_image(glance, image_url, image_name, image_cache_dir=None):
:param image_cache_dir: Directory to store image in before uploading. If it
is not passed, or is None, then a tmp directory is used.
:type image_cache_dir: Option[str, None]
:param tags: Tags to add to image
:type tags: list of str
:returns: glance image pointer
:rtype: glanceclient.common.utils.RequestIdProxy
"""
@@ -1580,6 +1608,11 @@ def create_image(glance, image_url, image_name, image_cache_dir=None):
download_image(image_url, local_path)
image = upload_image_to_glance(glance, local_path, image_name)
for tag in tags:
result = glance.image_tags.update(image.id, tag)
logging.debug(
'applying tag to image: glance.image_tags.update({}, {}) = {}'
.format(image.id, tags, result))
return image