Changes to enable testing creainting a guest

This is a bundle of disparate changes that enable testing the
creating of a guest instance. At a high level these are:

* Add helpers for downloading ubuntu images.
* Add method for setting up a basic overcloud network. This is very
  similar to zaza.configure.network.run_from_cli
* Add nova setup module that includes create_flavors and
  manage_ssh_key
* Add nova test base class for launching an instance and two
  derived classes for launching a cirros and lts image respectively.
* Fixes to zaza.charm_tests.test_utils after the refactor of the model
  argument in zaza.models
* New certs.is_keys_valid function to check if a public and private
  key are a pair.
* Methods in utilities.openstack for managing ssh keys. On creation of
  a new key the private key is stored locally and the public key is
  retrieved from Openstack
* Methods in utilities.openstack for testing guests at various points
  of the creation process.
This commit is contained in:
Liam Young
2018-07-31 10:04:56 +00:00
parent 43dea67b00
commit 7a901108c4
13 changed files with 810 additions and 41 deletions

View File

@@ -3,6 +3,38 @@ import mock
import unit_tests.utils as ut_utils
import zaza.utilities.cert as cert
TEST_SSH_PRIVATE_KEY = """
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAvWNz+tJAVyudNsDYrFK4CnJV+/nBmjYJXC3Zf42RFmzJ/Sff
bMSXM/OBPOPtpJg/FawzsTgoHQRMQ/oEcKRSJ0ZGQINlwlrfHdyJcdyH4ifad2oT
42cYRW0yMJggQGe7ttruCvY0mZugwrCjHoX3bqKjSg7YaMpyUKBa2cwCWJu/GUlp
sT1jjY89QYvzb/Auj5lMfk8Qmc4fIcC7EZ+lf+1iuwg7OJjRKbsBqVhUgTKisTxD
kzvo6SLy49j+mWUjfWlCI74D6QhW8OH9sN6MxI1sYiomPrCo+eBc1fkr0dUT8xd6
t1UL6HHx8XkO16BMbLc+lIVNiifZtAK3SL1BnwIDAQABAoIBABZYxtWgu3DNt6Y/
SRHETO0GorixtrNwjtgunMxdMvJ3cboKW2WlKMY7hFNf/al/QWpYQF036BvMZwda
V+3Gpd72ftGb74ToXg1S+XDS+cGovDF89c3OW2HNya9MM/oFg3PHD3GBraE2aNiw
KP8wBYsra6MQb16mDKkQ0seCOACmY/4jYlZ/7YbFtZPBPeZpnlxg4hgFIkiJmuPf
pmHpLFBhpmyo0yf3DGf/rsL9ti6LPBo8vCH6anM9ljn/BW2a3JA/ap4uUGb+FuoV
lwa1by1L6uLNYQb3fSEtmEEIy1mn89SjlEPfHnooXdadTM+9zT9xIc1ArNOSZagU
UHibUXECgYEA5C3h8d1MU5tKppoIM8aOC+OQFZbFd+bF8z+t7RdX9P6br5J0Ugtj
GcykUz6IRQWGZRCsyM9zshK2gQRCul0byNcFjzIHhR62Va6h1u7iwN1F4qc8a5WS
bb/1TEVprTSu9guW8OO/EmUgWkBgej4/j31F8PZG4+m1defLZXNo7skCgYEA1HrN
UOaBMzaFujGRZjlF54v7flCa1YYcv51Dfk8LEScs/jJvTY664ofj6AfQQN37Akmh
6B6jBfP8K7RxcJvAXE00oNliDvwo9TxoTc/F59HbgsEcR739fMjwvpOBWJg0zJy8
28/29dy9e0Fcy6ay55l050+0CBdzkvTWNHBVWScCgYEAhsTi0qvWTPtHmCcZ+Rqp
AzShAV9PuoW/HPDblVFYTgejhIuH0H2RRserts8URVACFOdIZkLBHsgWqxUNJG2h
33nAetcdwe5l2y2NwRjPLQKEKF6GPTTWi6P5CddllzuqqwAlYpnhXMgF18h2Mz1Y
5TMkgDG1pR+AYedKJt2HeKECgYAKR+LVTkHkG3g++RUC8DR8rp49j2Lef/22G8Lf
Qq3TZ6Taq9AM3aIXQeH6IR6ndNYnVy65T3ot2I9UAggXHcIh9S5dtgbzmKnWq9SU
J0B5JgNMAVH/+qZgOkzDu9lfUwYC/HZ64EYfwU19wDzgMbGoWRl587ZPSesyqhwP
L3xBswKBgQDc3WnWDP/KFjzWKY8KG4XZYKvvOy1en4hytbWrFssu5HlYoQeRgAog
K8ZAFLW2Mn0QebwL/gXSDYlZHmu6EbnO4v1kzRMi6aQxYOgKJWLEwj3r3hzJh6YU
QEGH15IncVqMch6HIir4oTF7RY2BsikDDY/GB/l0pRfZrGl9mnrY6Q==
-----END RSA PRIVATE KEY-----
"""
TEST_SSH_PUB_KEY = """ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC9Y3P60kBXK502wNisUrgKclX7+cGaNglcLdl/jZEWbMn9J99sxJcz84E84+2kmD8VrDOxOCgdBExD+gRwpFInRkZAg2XCWt8d3Ilx3IfiJ9p3ahPjZxhFbTIwmCBAZ7u22u4K9jSZm6DCsKMehfduoqNKDthoynJQoFrZzAJYm78ZSWmxPWONjz1Bi/Nv8C6PmUx+TxCZzh8hwLsRn6V/7WK7CDs4mNEpuwGpWFSBMqKxPEOTO+jpIvLj2P6ZZSN9aUIjvgPpCFbw4f2w3ozEjWxiKiY+sKj54FzV+SvR1RPzF3q3VQvocfHxeQ7XoExstz6UhU2KJ9m0ArdIvUGf ubuntu@gnuoy-bastion """ # noqa
TEST_SSH_PUB_KEY_INVALID = """ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDMz6U88GVhwAjjhzSrcyKKWe3LfB4pK4Ap6XpIfSmiVDPTBiBU3wzj1YAIBo26OMHDkfUnmtBgtzOfcb64QPaUmMfCkzadxrd8inYlpz+0AoahCTTONkElMxj+wa7SYVF4GphrDKDvlPi83bcLmO39veNVcLYHcDa+9mWBP3AlI3TdKqJpgOtCzLu9qbhlpmYa7YD6ijQrTJI3wOOw0uZeEARVCCKU44BVUFnWrNx5ioihETj9rAxRFrm1dx8mKDP0fCf53/Xn+LKLcYBPVovT6BpBHkaLuG6mTYU7puHN607wRhRwYhc3Y9y0sd6rHykYKL3G27w08s597paFtXg5 ubuntu@gnuoy-bastion""" # noqa
class TestUtilitiesCert(ut_utils.BaseTestCase):
@@ -235,3 +267,11 @@ class TestUtilitiesCert(ut_utils.BaseTestCase):
self.builder_mock.add_extension.assert_called_once_with(
self.bcons_mock(),
critical=True)
def test_is_keys_valid(self):
self.assertTrue(
cert.is_keys_valid(TEST_SSH_PUB_KEY, TEST_SSH_PRIVATE_KEY))
def test_is_keys_valid_invalid(self):
self.assertFalse(
cert.is_keys_valid(TEST_SSH_PUB_KEY_INVALID, TEST_SSH_PRIVATE_KEY))

View File

@@ -1,9 +1,11 @@
import copy
import io
import mock
import tenacity
import unit_tests.utils as ut_utils
from zaza.utilities import openstack as openstack_utils
from zaza.utilities import exceptions
class TestOpenStackUtils(ut_utils.BaseTestCase):
@@ -280,6 +282,12 @@ class TestOpenStackUtils(ut_utils.BaseTestCase):
openstack_utils.find_cirros_image('aarch64'),
'http://download.cirros-cloud.net/12/cirros-12-aarch64-disk.img')
def test_find_ubuntu_image(self):
self.assertEqual(
openstack_utils.find_ubuntu_image('bionic', 'aarch64'),
('http://cloud-images.ubuntu.com/bionic/current/'
'bionic-server-cloudimg-aarch64.img'))
def test_download_image(self):
urllib_opener_mock = mock.MagicMock()
self.patch_object(openstack_utils, "get_urllib_opener")
@@ -400,3 +408,188 @@ class TestOpenStackUtils(ut_utils.BaseTestCase):
glance_mock,
'tests/c.img',
'bob')
def test_create_ssh_key(self):
nova_mock = mock.MagicMock()
nova_mock.keypairs.findall.return_value = []
openstack_utils.create_ssh_key(
nova_mock,
'mykeys')
nova_mock.keypairs.create.assert_called_once_with(name='mykeys')
def test_create_ssh_key_existing(self):
nova_mock = mock.MagicMock()
nova_mock.keypairs.findall.return_value = ['akey']
self.assertEqual(
openstack_utils.create_ssh_key(
nova_mock,
'mykeys'),
'akey')
self.assertFalse(nova_mock.keypairs.create.called)
def test_create_ssh_key_existing_replace(self):
nova_mock = mock.MagicMock()
nova_mock.keypairs.findall.return_value = ['key1']
openstack_utils.create_ssh_key(
nova_mock,
'mykeys',
replace=True),
nova_mock.keypairs.delete.assert_called_once_with('key1')
nova_mock.keypairs.create.assert_called_once_with(name='mykeys')
def test_get_private_key_file(self):
self.assertEqual(
openstack_utils.get_private_key_file('mykeys'),
'tests/id_rsa_mykeys')
def test_write_private_key(self):
m = mock.mock_open()
with mock.patch('zaza.utilities.openstack.open', m, create=False):
openstack_utils.write_private_key('mykeys', 'keycontents')
m.assert_called_once_with('tests/id_rsa_mykeys', 'w')
handle = m()
handle.write.assert_called_once_with('keycontents')
def test_get_private_key(self):
self.patch_object(openstack_utils.os.path, "isfile",
return_value=True)
m = mock.mock_open(read_data='myprivkey')
with mock.patch('zaza.utilities.openstack.open', m, create=True):
self.assertEqual(
openstack_utils.get_private_key('mykeys'),
'myprivkey')
def test_get_private_key_file_missing(self):
self.patch_object(openstack_utils.os.path, "isfile",
return_value=False)
self.assertIsNone(openstack_utils.get_private_key('mykeys'))
def test_get_public_key(self):
key_mock = mock.MagicMock(public_key='mypubkey')
nova_mock = mock.MagicMock()
nova_mock.keypairs.findall.return_value = [key_mock]
self.assertEqual(
openstack_utils.get_public_key(nova_mock, 'mykeys'),
'mypubkey')
def test_valid_key_exists(self):
nova_mock = mock.MagicMock()
self.patch_object(openstack_utils, 'get_public_key',
return_value='pubkey')
self.patch_object(openstack_utils, 'get_private_key',
return_value='privkey')
self.patch_object(openstack_utils.cert, 'is_keys_valid',
return_value=True)
self.assertTrue(openstack_utils.valid_key_exists(nova_mock, 'mykeys'))
self.get_public_key.assert_called_once_with(nova_mock, 'mykeys')
self.get_private_key.assert_called_once_with('mykeys')
self.is_keys_valid.assert_called_once_with('pubkey', 'privkey')
def test_valid_key_exists_missing(self):
nova_mock = mock.MagicMock()
self.patch_object(openstack_utils, 'get_public_key',
return_value='pubkey')
self.patch_object(openstack_utils, 'get_private_key',
return_value=None)
self.patch_object(openstack_utils.cert, 'is_keys_valid',
return_value=True)
self.assertFalse(openstack_utils.valid_key_exists(nova_mock, 'mykeys'))
self.get_public_key.assert_called_once_with(nova_mock, 'mykeys')
self.get_private_key.assert_called_once_with('mykeys')
def test_get_ports_from_device_id(self):
port_mock = {'device_id': 'dev1'}
neutron_mock = mock.MagicMock()
neutron_mock.list_ports.return_value = {
'ports': [port_mock]}
self.assertEqual(
openstack_utils.get_ports_from_device_id(
neutron_mock,
'dev1'),
[port_mock])
def test_get_ports_from_device_id_no_match(self):
port_mock = {'device_id': 'dev2'}
neutron_mock = mock.MagicMock()
neutron_mock.list_ports.return_value = {
'ports': [port_mock]}
self.assertEqual(
openstack_utils.get_ports_from_device_id(
neutron_mock,
'dev1'),
[])
def test_ping_response(self):
self.patch_object(openstack_utils.subprocess, 'check_call')
openstack_utils.ping_response('10.0.0.10')
self.check_call.assert_called_once_with(
['ping', '-c', '1', '-W', '1', '10.0.0.10'], stdout=-3)
def test_ping_response_fail(self):
openstack_utils.ping_response.retry.wait = \
tenacity.wait_none()
self.patch_object(openstack_utils.subprocess, 'check_call')
self.check_call.side_effect = Exception()
with self.assertRaises(Exception):
openstack_utils.ping_response('10.0.0.10')
def test_ssh_test(self):
paramiko_mock = mock.MagicMock()
self.patch_object(openstack_utils.paramiko, 'SSHClient',
return_value=paramiko_mock)
self.patch_object(openstack_utils.paramiko, 'AutoAddPolicy',
return_value='some_policy')
stdout = io.StringIO("myvm")
paramiko_mock.exec_command.return_value = ('stdin', stdout, 'stderr')
openstack_utils.ssh_test(
'bob',
'10.0.0.10',
'myvm',
password='reallyhardpassord')
paramiko_mock.connect.assert_called_once_with(
'10.0.0.10',
password='reallyhardpassord',
username='bob')
def test_ssh_test_wrong_server(self):
paramiko_mock = mock.MagicMock()
self.patch_object(openstack_utils.paramiko, 'SSHClient',
return_value=paramiko_mock)
self.patch_object(openstack_utils.paramiko, 'AutoAddPolicy',
return_value='some_policy')
stdout = io.StringIO("anothervm")
paramiko_mock.exec_command.return_value = ('stdin', stdout, 'stderr')
with self.assertRaises(exceptions.SSHFailed):
openstack_utils.ssh_test(
'bob',
'10.0.0.10',
'myvm',
password='reallyhardpassord')
paramiko_mock.connect.assert_called_once_with(
'10.0.0.10',
password='reallyhardpassord',
username='bob')
def test_ssh_test_key_auth(self):
paramiko_mock = mock.MagicMock()
self.patch_object(openstack_utils.paramiko, 'SSHClient',
return_value=paramiko_mock)
self.patch_object(openstack_utils.paramiko, 'AutoAddPolicy',
return_value='some_policy')
self.patch_object(openstack_utils.paramiko.RSAKey, 'from_private_key',
return_value='akey')
stdout = io.StringIO("myvm")
paramiko_mock.exec_command.return_value = ('stdin', stdout, 'stderr')
openstack_utils.ssh_test(
'bob',
'10.0.0.10',
'myvm',
privkey='myprivkey')
paramiko_mock.connect.assert_called_once_with(
'10.0.0.10',
password='',
pkey='akey',
username='bob')

View File

@@ -3,6 +3,14 @@
import zaza.utilities.openstack as openstack_utils
def basic_setup():
"""Run setup for testing glance.
Glance setup for testing glance is currently part of glance functional
tests. Image setup for other tests to use should go here.
"""
def add_cirros_image(glance_client=None):
"""Add a cirros image to the current deployment.
@@ -17,13 +25,24 @@ def add_cirros_image(glance_client=None):
openstack_utils.create_image(
glance_client,
image_url,
'cirrosimage')
'cirros')
def basic_setup():
"""Run setup for testing glance.
def add_lts_image(glance_client=None):
"""Add an Ubuntu LTS image to the current deployment.
Glance setup for testing glance is currently part of glance functional
tests. Image setup for other tests to use should go here.
:param glance: Authenticated glanceclient
:type glance: glanceclient.Client
"""
add_cirros_image()
if not glance_client:
keystone_session = openstack_utils.get_overcloud_keystone_session()
glance_client = openstack_utils.get_glance_session_client(
keystone_session)
image_url = openstack_utils.find_ubuntu_image(
release='bionic',
arch='amd64')
print(image_url)
openstack_utils.create_image(
glance_client,
image_url,
'bionic')

View File

@@ -0,0 +1 @@
"""Collection of code for setting up and testing neutron."""

View File

@@ -0,0 +1,71 @@
#!/usr/bin/env python3
"""Setup for Neutron deployments."""
from zaza.configure import (
network,
)
from zaza.utilities import (
cli as cli_utils,
generic as generic_utils,
juju as juju_utils,
openstack as openstack_utils,
)
# The overcloud network configuration settings are declared.
# These are the network configuration settings under test.
OVERCLOUD_NETWORK_CONFIG = {
"network_type": "gre",
"router_name": "provider-router",
"ip_version": "4",
"address_scope": "public",
"external_net_name": "ext_net",
"external_subnet_name": "ext_net_subnet",
"prefix_len": "24",
"subnetpool_name": "pooled_subnets",
"subnetpool_prefix": "192.168.0.0/16",
}
# The undercloud network configuration settings are substrate specific to
# the environment where the tests are being executed. These settings may be
# overridden by environment variables. See the doc string documentation for
# zaza.utilities.generic_utils.get_undercloud_env_vars for the environment
# variables required to be exported and available to zaza.
# These are default settings provided as an example.
DEFAULT_UNDERCLOUD_NETWORK_CONFIG = {
"start_floating_ip": "10.5.150.0",
"end_floating_ip": "10.5.150.254",
"external_dns": "10.5.0.2",
"external_net_cidr": "10.5.0.0/16",
"default_gateway": "10.5.0.1",
}
def basic_overcloud_network():
"""Run setup for neutron networking.
Configure the following:
The overcloud network using subnet pools
"""
cli_utils.setup_logging()
# Get network configuration settings
network_config = {}
# Declared overcloud settings
network_config.update(OVERCLOUD_NETWORK_CONFIG)
# Default undercloud settings
network_config.update(DEFAULT_UNDERCLOUD_NETWORK_CONFIG)
# Environment specific settings
network_config.update(generic_utils.get_undercloud_env_vars())
# Get keystone session
keystone_session = openstack_utils.get_overcloud_keystone_session()
# Handle network for Openstack-on-Openstack scenarios
if juju_utils.get_provider_type() == "openstack":
undercloud_ks_sess = openstack_utils.get_undercloud_keystone_session()
network.setup_gateway_ext_port(network_config,
keystone_session=undercloud_ks_sess)
# Confugre the overcloud network
network.setup_sdn(network_config, keystone_session=keystone_session)

View File

@@ -0,0 +1 @@
"""Collection of code for setting up and testing nova."""

View File

@@ -0,0 +1,51 @@
"""Code for configureing nova."""
import zaza.utilities.openstack as openstack_utils
from zaza.utilities import (
cli as cli_utils,
)
import zaza.charm_tests.nova.utils as nova_utils
def create_flavors(nova_client=None):
"""Create basic flavors.
:param nova_client: Authenticated nova client
:type nova_client: novaclient.v2.client.Client
"""
if not nova_client:
keystone_session = openstack_utils.get_overcloud_keystone_session()
nova_client = openstack_utils.get_nova_session_client(
keystone_session)
cli_utils.setup_logging()
names = [flavor.name for flavor in nova_client.flavors.list()]
for flavor in nova_utils.FLAVORS.keys():
if flavor not in names:
nova_client.flavors.create(
name=flavor,
ram=nova_utils.FLAVORS[flavor]['ram'],
vcpus=nova_utils.FLAVORS[flavor]['vcpus'],
disk=nova_utils.FLAVORS[flavor]['disk'],
flavorid=nova_utils.FLAVORS[flavor]['flavorid'])
def manage_ssh_key(nova_client=None):
"""Create basic flavors.
:param nova_client: Authenticated nova client
:type nova_client: novaclient.v2.client.Client
"""
if not nova_client:
keystone_session = openstack_utils.get_overcloud_keystone_session()
nova_client = openstack_utils.get_nova_session_client(
keystone_session)
cli_utils.setup_logging()
if not openstack_utils.valid_key_exists(nova_client,
nova_utils.KEYPAIR_NAME):
key = openstack_utils.create_ssh_key(
nova_client,
nova_utils.KEYPAIR_NAME,
replace=True)
openstack_utils.write_private_key(
nova_utils.KEYPAIR_NAME,
key.private_key)

View File

@@ -0,0 +1,109 @@
#!/usr/bin/env python3
"""Encapsulate nova testing."""
import logging
import time
import unittest
import zaza.model as model
import zaza.utilities.openstack as openstack_utils
import zaza.charm_tests.nova.utils as nova_utils
class BaseGuestCreateTest(unittest.TestCase):
"""Base for tests to launch a guest."""
boot_tests = {
'cirros': {
'image_name': 'cirrosimage',
'flavor_name': 'm1.tiny',
'username': 'cirros',
'bootstring': 'gocubsgo',
'password': 'gocubsgo'},
'bionic': {
'image_name': 'bionic',
'flavor_name': 'm1.small',
'username': 'ubuntu',
'bootstring': 'finished at'}}
@classmethod
def setUpClass(cls):
"""Run class setup for running glance tests."""
cls.keystone_session = openstack_utils.get_overcloud_keystone_session()
cls.model_name = model.get_juju_model()
cls.nova_client = openstack_utils.get_nova_session_client(
cls.keystone_session)
cls.neutron_client = openstack_utils.get_neutron_session_client(
cls.keystone_session)
def launch_instance(self, instance_key):
"""Launch an instance.
:param instance_key: Key to collect associated config data with.
:type instance_key: str
"""
# Collect resource information.
vm_name = time.strftime("%Y%m%d%H%M%S")
image = self.nova_client.glance.find_image(
self.boot_tests[instance_key]['image_name'])
flavor = self.nova_client.flavors.find(
name=self.boot_tests[instance_key]['flavor_name'])
net = self.neutron_client.find_resource("network", "private")
nics = [{'net-id': net.get('id')}]
# Launch instance.
logging.info('Launching instance {}'.format(vm_name))
instance = self.nova_client.servers.create(
name=vm_name,
image=image,
flavor=flavor,
key_name=nova_utils.KEYPAIR_NAME,
nics=nics)
# Test Instance is ready.
logging.info('Checking instance is active')
openstack_utils.resource_reaches_status(
self.nova_client.servers,
instance.id,
expected_status='ACTIVE')
logging.info('Checking cloud init is complete')
openstack_utils.cloud_init_complete(
self.nova_client,
instance.id,
self.boot_tests[instance_key]['bootstring'])
port = openstack_utils.get_ports_from_device_id(
self.neutron_client,
instance.id)[0]
logging.info('Assigning floating ip.')
ip = openstack_utils.create_floating_ip(
self.neutron_client,
"ext_net",
port=port)['floating_ip_address']
logging.info('Assigned floating IP {} to {}'.format(ip, vm_name))
openstack_utils.ping_response(ip)
# Check ssh'ing to instance.
logging.info('Testing ssh access.')
openstack_utils.ssh_test(
username=self.boot_tests[instance_key]['username'],
ip=ip,
vm_name=vm_name,
password=self.boot_tests[instance_key].get('password'),
privkey=openstack_utils.get_private_key(nova_utils.KEYPAIR_NAME))
class CirrosGuestCreateTest(BaseGuestCreateTest):
"""Tests to launch a cirros image."""
def test_launch_small_cirros_instance(self):
"""Launch a cirros instance and test connectivity."""
self.launch_instance('cirros')
class LTSGuestCreateTest(BaseGuestCreateTest):
"""Tests to launch a LTS image."""
def test_launch_small_cirros_instance(self):
"""Launch a cirros instance and test connectivity."""
self.launch_instance('bionic')

View File

@@ -0,0 +1,25 @@
"""Data for nova tests."""
FLAVORS = {
'm1.tiny': {
'flavorid': 1,
'ram': 512,
'disk': 1,
'vcpus': 1},
'm1.small': {
'flavorid': 2,
'ram': 2048,
'disk': 20,
'vcpus': 1},
'm1.medium': {
'flavorid': 3,
'ram': 4096,
'disk': 40,
'vcpus': 2},
'm1.large': {
'flavorid': 4,
'ram': 8192,
'disk': 40,
'vcpus': 4},
}
KEYPAIR_NAME = 'zaza'

View File

@@ -35,8 +35,8 @@ class OpenStackBaseTest(unittest.TestCase):
cls.test_config = lifecycle_utils.get_charm_config()
cls.application_name = cls.test_config['charm_name']
cls.first_unit = model.get_first_unit_name(
cls.model_name,
cls.application_name)
cls.application_name,
model_name=cls.model_name)
logging.debug('First unit is {}'.format(cls.first_unit))
def restart_on_changed(self, config_file, default_config, alternate_config,
@@ -65,58 +65,60 @@ class OpenStackBaseTest(unittest.TestCase):
# first_unit is only useed to grab a timestamp, the assumption being
# that all the units times are in sync.
mtime = model.get_unit_time(self.model_name, self.first_unit)
mtime = model.get_unit_time(
self.first_unit,
model_name=self.model_name)
logging.debug('Remote unit timestamp {}'.format(mtime))
logging.debug('Changing charm setting to {}'.format(alternate_config))
model.set_application_config(
self.model_name,
self.application_name,
alternate_config)
alternate_config,
model_name=self.model_name)
logging.debug(
'Waiting for updates to propagate to {}'.format(config_file))
model.block_until_oslo_config_entries_match(
self.model_name,
self.application_name,
config_file,
alternate_entry)
alternate_entry,
model_name=self.model_name)
logging.debug(
'Waiting for units to reach target states'.format(config_file))
model.wait_for_application_states(
self.model_name,
self.test_config.get('target_deploy_status', {}))
self.test_config.get('target_deploy_status', {}),
model_name=self.model_name)
# Config update has occured and hooks are idle. Any services should
# have been restarted by now:
logging.debug(
'Waiting for services ({}) to be restarted'.format(services))
model.block_until_services_restarted(
self.model_name,
self.application_name,
mtime,
services)
services,
model_name=self.model_name)
logging.debug('Restoring charm setting to {}'.format(default_config))
model.set_application_config(
self.model_name,
self.application_name,
default_config)
default_config,
model_name=self.model_name)
logging.debug(
'Waiting for updates to propagate to '.format(config_file))
model.block_until_oslo_config_entries_match(
self.model_name,
self.application_name,
config_file,
default_entry)
default_entry,
model_name=self.model_name)
logging.debug(
'Waiting for units to reach target states'.format(config_file))
model.wait_for_application_states(
self.model_name,
self.test_config.get('target_deploy_status', {}))
self.test_config.get('target_deploy_status', {}),
model_name=self.model_name)
def pause_resume(self, services):
"""Run Pause and resume tests.
@@ -129,33 +131,41 @@ class OpenStackBaseTest(unittest.TestCase):
:type services: list
"""
model.block_until_service_status(
self.model_name,
self.first_unit,
services,
'running')
'running',
model_name=self.model_name)
model.block_until_unit_wl_status(
self.model_name,
self.first_unit,
'active')
model.run_action(self.model_name, self.first_unit, 'pause', {})
'active',
model_name=self.model_name)
model.run_action(
self.first_unit,
'pause',
{},
model_name=self.model_name)
model.block_until_unit_wl_status(
self.model_name,
self.first_unit,
'maintenance')
model.block_until_all_units_idle(self.model_name)
'maintenance',
model_name=self.model_name)
model.block_until_all_units_idle(model_name=self.model_name)
model.block_until_service_status(
self.model_name,
self.first_unit,
services,
'stopped')
model.run_action(self.model_name, self.first_unit, 'resume', {})
'stopped',
model_name=self.model_name)
model.run_action(
self.first_unit,
'resume',
{},
model_name=self.model_name)
model.block_until_unit_wl_status(
self.model_name,
self.first_unit,
'active')
model.block_until_all_units_idle(self.model_name)
'active',
model_name=self.model_name)
model.block_until_all_units_idle(model_name=self.model_name)
model.block_until_service_status(
self.model_name,
self.first_unit,
services,
'running')
'running',
model_name=self.model_name)

View File

@@ -15,7 +15,7 @@
"""Module for working with x.509 certificates."""
import cryptography
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import padding, rsa
import cryptography.hazmat.primitives.hashes as hashes
import cryptography.hazmat.primitives.serialization as serialization
import datetime
@@ -205,3 +205,40 @@ def sign_csr(csr, ca_private_key, ca_cert=None, issuer_name=None,
backend=backend)
return signer_ca_cert.public_bytes(encoding=serialization.Encoding.PEM)
def is_keys_valid(public_key_string, private_key_string):
"""Test whether these are a valid public/private key pair.
:param public_key_string: PEM encoded key data.
:type public_key_string: str
:param private_key_string: OpenSSH encoded key data.
:type private_key_string: str
"""
private_key = serialization.load_pem_private_key(
private_key_string.encode(),
password=None,
backend=cryptography.hazmat.backends.default_backend()
)
public_key = serialization.load_ssh_public_key(
public_key_string.encode(),
backend=cryptography.hazmat.backends.default_backend()
)
message = b"encrypted data"
ciphertext = public_key.encrypt(
message,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None))
try:
plaintext = private_key.decrypt(
ciphertext,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None))
except ValueError:
plaintext = ''
return plaintext == message

View File

@@ -5,3 +5,15 @@ class MissingOSAthenticationException(Exception):
"""Exception when some data needed to authenticate is missing."""
pass
class CloudInitIncomplete(Exception):
"""Cloud init has not completed properly."""
pass
class SSHFailed(Exception):
"""SSH failed."""
pass

View File

@@ -16,15 +16,19 @@ from keystoneauth1.identity import (
v3,
v2,
)
import zaza.utilities.cert as cert
from novaclient import client as novaclient_client
from neutronclient.v2_0 import client as neutronclient
from neutronclient.common import exceptions as neutronexceptions
import io
import juju_wait
import logging
import os
import paramiko
import re
import six
import subprocess
import sys
import tenacity
import urllib
@@ -38,6 +42,10 @@ from zaza.utilities import (
CIRROS_RELEASE_URL = 'http://download.cirros-cloud.net/version/released'
CIRROS_IMAGE_URL = 'http://download.cirros-cloud.net'
UBUNTU_IMAGE_URLS = {
'bionic': ('http://cloud-images.ubuntu.com/{release}/current/'
'{release}-server-cloudimg-{arch}.img')
}
CHARM_TYPES = {
'neutron': {
@@ -1289,7 +1297,7 @@ def get_urllib_opener():
def find_cirros_image(arch):
"""Return the url for the latest cirros image for the given architecture.
:param arch: aarch64, arm, i386, x86_64 etc
:param arch: aarch64, arm, i386, amd64, x86_64 etc
:type arch: str
:returns: URL for latest cirros image
:rtype: str
@@ -1301,6 +1309,11 @@ def find_cirros_image(arch):
return '{}/{}/{}'.format(CIRROS_IMAGE_URL, version, cirros_img)
def find_ubuntu_image(release, arch):
"""Return url for image."""
return UBUNTU_IMAGE_URLS[release].format(release=release, arch=arch)
def download_image(image_url, target_file):
"""Download the image from the given url to the specified file.
@@ -1337,6 +1350,7 @@ def resource_reaches_status(resource, resource_id,
:raises: AssertionError
"""
resource_status = resource.get(resource_id).status
logging.info(resource_status)
assert resource_status == expected_status, (
"Resource in {} state, waiting for {}" .format(resource_status,
expected_status,))
@@ -1456,3 +1470,189 @@ def create_image(glance, image_url, image_name, image_cache_dir='tests'):
image = upload_image_to_glance(glance, local_path, image_name)
return image
def create_ssh_key(nova_client, keypair_name, replace=False):
"""Create ssh key.
:param nova_client: Authenticated nova client
:type nova_client: novaclient.v2.client.Client
:param keypair_name: Label to apply to keypair in Openstack.
:type keypair_name: str
:param replace: Whether to replace the existing keypair if it already
exists.
:type replace: str
:returns: The keypair
:rtype: nova.objects.keypair
"""
existing_keys = nova_client.keypairs.findall(name=keypair_name)
if existing_keys:
if replace:
logging.info('Deleting key(s) {}'.format(keypair_name))
for key in existing_keys:
nova_client.keypairs.delete(key)
else:
return existing_keys[0]
logging.info('Creating key %s' % (keypair_name))
return nova_client.keypairs.create(name=keypair_name)
def get_private_key_file(keypair_name):
"""Location of the file containing the private key with the given label.
:param keypair_name: Label of keypair in Openstack.
:type keypair_name: str
:returns: Path to file containing key
:rtype: str
"""
return 'tests/id_rsa_{}'.format(keypair_name)
def write_private_key(keypair_name, key):
"""Store supplied private key in file.
:param keypair_name: Label of keypair in Openstack.
:type keypair_name: str
:param key: PEM Encoded Private Key
:type key: str
"""
with open(get_private_key_file(keypair_name), 'w') as key_file:
key_file.write(key)
def get_private_key(keypair_name):
"""Return private key.
:param keypair_name: Label of keypair in Openstack.
:type keypair_name: str
:returns: PEM Encoded Private Key
:rtype: str
"""
key_file = get_private_key_file(keypair_name)
if not os.path.isfile(key_file):
return None
with open(key_file, 'r') as key_file:
key = key_file.read()
return key
def get_public_key(nova_client, keypair_name):
"""Return public key from Openstack.
:param nova_client: Authenticated nova client
:type nova_client: novaclient.v2.client.Client
:param keypair_name: Label of keypair in Openstack.
:type keypair_name: str
:returns: OpenSSH Encoded Public Key
:rtype: str or None
"""
keys = nova_client.keypairs.findall(name=keypair_name)
if keys:
return keys[0].public_key
else:
return None
def valid_key_exists(nova_client, keypair_name):
"""Check if a valid public/private keypair exists for keypair_name.
:param nova_client: Authenticated nova client
:type nova_client: novaclient.v2.client.Client
:param keypair_name: Label of keypair in Openstack.
:type keypair_name: str
"""
pub_key = get_public_key(nova_client, keypair_name)
priv_key = get_private_key(keypair_name)
if not all([pub_key, priv_key]):
return False
return cert.is_keys_valid(pub_key, priv_key)
def get_ports_from_device_id(neutron_client, device_id):
"""Return the ports associated with a given device.
:param neutron_client: Authenticated neutronclient
:type neutron_client: neutronclient.Client object
:param device_id: The id of the device to look for
:type device_id: str
:returns: List of port objects
:rtype: []
"""
ports = []
for _port in neutron_client.list_ports().get('ports'):
if device_id in _port.get('device_id'):
ports.append(_port)
return ports
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=1, max=60),
reraise=True, stop=tenacity.stop_after_attempt(8))
def cloud_init_complete(nova_client, vm_id, bootstring):
"""Wait for cloud init to complete on the given vm.
If cloud init does not complete in the alloted time then
exceptions.CloudInitIncomplete is raised.
:param nova_client: Authenticated nova client
:type nova_client: novaclient.v2.client.Client
:param vm_id,: The id of the server to monitor.
:type vm_id: str (uuid)
:param bootstring: The string to look for in the console output that will
indicate cloud init is complete.
:type bootstring: str
:raises: exceptions.CloudInitIncomplete
"""
instance = nova_client.servers.find(id=vm_id)
console_log = instance.get_console_output()
if bootstring not in console_log:
raise exceptions.CloudInitIncomplete()
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=1, max=60),
reraise=True, stop=tenacity.stop_after_attempt(8))
def ping_response(ip):
"""Wait for ping to respond on the given IP.
:param ip: IP address to ping
:type ip: str
:raises: subprocess.CalledProcessError
"""
cmd = ['ping', '-c', '1', '-W', '1', ip]
subprocess.check_call(cmd, stdout=subprocess.DEVNULL)
def ssh_test(username, ip, vm_name, password=None, privkey=None):
"""SSH to given ip using supplied credentials.
:param username: Username to connect with
:type username: str
:param ip: IP address to ssh to.
:type ip: str
:param vm_name: Name of VM.
:type vm_name: str
:param password: Password to authenticate with. If supplied it is used
rather than privkey.
:type password: str
:param privkey: Private key to authenticate with. If a password is
supplied it is used rather than the private key.
:type privkey: str
:raises: exceptions.SSHFailed
"""
logging.info('Attempting to ssh to %s(%s)' % (vm_name, ip))
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if password:
ssh.connect(ip, username=username, password=password)
else:
key = paramiko.RSAKey.from_private_key(io.StringIO(privkey))
ssh.connect(ip, username=username, password='', pkey=key)
stdin, stdout, stderr = ssh.exec_command('uname -n')
return_string = stdout.readlines()[0].strip()
ssh.close()
if return_string == vm_name:
logging.info('SSH to %s(%s) succesfull' % (vm_name, ip))
else:
logging.info('SSH to %s(%s) failed (%s != %s)' % (vm_name, ip,
return_string,
vm_name))
raise exceptions.SSHFailed()