Changes to enable testing creainting a guest
This is a bundle of disparate changes that enable testing the creating of a guest instance. At a high level these are: * Add helpers for downloading ubuntu images. * Add method for setting up a basic overcloud network. This is very similar to zaza.configure.network.run_from_cli * Add nova setup module that includes create_flavors and manage_ssh_key * Add nova test base class for launching an instance and two derived classes for launching a cirros and lts image respectively. * Fixes to zaza.charm_tests.test_utils after the refactor of the model argument in zaza.models * New certs.is_keys_valid function to check if a public and private key are a pair. * Methods in utilities.openstack for managing ssh keys. On creation of a new key the private key is stored locally and the public key is retrieved from Openstack * Methods in utilities.openstack for testing guests at various points of the creation process.
This commit is contained in:
@@ -3,6 +3,38 @@ import mock
|
||||
import unit_tests.utils as ut_utils
|
||||
import zaza.utilities.cert as cert
|
||||
|
||||
TEST_SSH_PRIVATE_KEY = """
|
||||
-----BEGIN RSA PRIVATE KEY-----
|
||||
MIIEpAIBAAKCAQEAvWNz+tJAVyudNsDYrFK4CnJV+/nBmjYJXC3Zf42RFmzJ/Sff
|
||||
bMSXM/OBPOPtpJg/FawzsTgoHQRMQ/oEcKRSJ0ZGQINlwlrfHdyJcdyH4ifad2oT
|
||||
42cYRW0yMJggQGe7ttruCvY0mZugwrCjHoX3bqKjSg7YaMpyUKBa2cwCWJu/GUlp
|
||||
sT1jjY89QYvzb/Auj5lMfk8Qmc4fIcC7EZ+lf+1iuwg7OJjRKbsBqVhUgTKisTxD
|
||||
kzvo6SLy49j+mWUjfWlCI74D6QhW8OH9sN6MxI1sYiomPrCo+eBc1fkr0dUT8xd6
|
||||
t1UL6HHx8XkO16BMbLc+lIVNiifZtAK3SL1BnwIDAQABAoIBABZYxtWgu3DNt6Y/
|
||||
SRHETO0GorixtrNwjtgunMxdMvJ3cboKW2WlKMY7hFNf/al/QWpYQF036BvMZwda
|
||||
V+3Gpd72ftGb74ToXg1S+XDS+cGovDF89c3OW2HNya9MM/oFg3PHD3GBraE2aNiw
|
||||
KP8wBYsra6MQb16mDKkQ0seCOACmY/4jYlZ/7YbFtZPBPeZpnlxg4hgFIkiJmuPf
|
||||
pmHpLFBhpmyo0yf3DGf/rsL9ti6LPBo8vCH6anM9ljn/BW2a3JA/ap4uUGb+FuoV
|
||||
lwa1by1L6uLNYQb3fSEtmEEIy1mn89SjlEPfHnooXdadTM+9zT9xIc1ArNOSZagU
|
||||
UHibUXECgYEA5C3h8d1MU5tKppoIM8aOC+OQFZbFd+bF8z+t7RdX9P6br5J0Ugtj
|
||||
GcykUz6IRQWGZRCsyM9zshK2gQRCul0byNcFjzIHhR62Va6h1u7iwN1F4qc8a5WS
|
||||
bb/1TEVprTSu9guW8OO/EmUgWkBgej4/j31F8PZG4+m1defLZXNo7skCgYEA1HrN
|
||||
UOaBMzaFujGRZjlF54v7flCa1YYcv51Dfk8LEScs/jJvTY664ofj6AfQQN37Akmh
|
||||
6B6jBfP8K7RxcJvAXE00oNliDvwo9TxoTc/F59HbgsEcR739fMjwvpOBWJg0zJy8
|
||||
28/29dy9e0Fcy6ay55l050+0CBdzkvTWNHBVWScCgYEAhsTi0qvWTPtHmCcZ+Rqp
|
||||
AzShAV9PuoW/HPDblVFYTgejhIuH0H2RRserts8URVACFOdIZkLBHsgWqxUNJG2h
|
||||
33nAetcdwe5l2y2NwRjPLQKEKF6GPTTWi6P5CddllzuqqwAlYpnhXMgF18h2Mz1Y
|
||||
5TMkgDG1pR+AYedKJt2HeKECgYAKR+LVTkHkG3g++RUC8DR8rp49j2Lef/22G8Lf
|
||||
Qq3TZ6Taq9AM3aIXQeH6IR6ndNYnVy65T3ot2I9UAggXHcIh9S5dtgbzmKnWq9SU
|
||||
J0B5JgNMAVH/+qZgOkzDu9lfUwYC/HZ64EYfwU19wDzgMbGoWRl587ZPSesyqhwP
|
||||
L3xBswKBgQDc3WnWDP/KFjzWKY8KG4XZYKvvOy1en4hytbWrFssu5HlYoQeRgAog
|
||||
K8ZAFLW2Mn0QebwL/gXSDYlZHmu6EbnO4v1kzRMi6aQxYOgKJWLEwj3r3hzJh6YU
|
||||
QEGH15IncVqMch6HIir4oTF7RY2BsikDDY/GB/l0pRfZrGl9mnrY6Q==
|
||||
-----END RSA PRIVATE KEY-----
|
||||
"""
|
||||
TEST_SSH_PUB_KEY = """ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC9Y3P60kBXK502wNisUrgKclX7+cGaNglcLdl/jZEWbMn9J99sxJcz84E84+2kmD8VrDOxOCgdBExD+gRwpFInRkZAg2XCWt8d3Ilx3IfiJ9p3ahPjZxhFbTIwmCBAZ7u22u4K9jSZm6DCsKMehfduoqNKDthoynJQoFrZzAJYm78ZSWmxPWONjz1Bi/Nv8C6PmUx+TxCZzh8hwLsRn6V/7WK7CDs4mNEpuwGpWFSBMqKxPEOTO+jpIvLj2P6ZZSN9aUIjvgPpCFbw4f2w3ozEjWxiKiY+sKj54FzV+SvR1RPzF3q3VQvocfHxeQ7XoExstz6UhU2KJ9m0ArdIvUGf ubuntu@gnuoy-bastion """ # noqa
|
||||
TEST_SSH_PUB_KEY_INVALID = """ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDMz6U88GVhwAjjhzSrcyKKWe3LfB4pK4Ap6XpIfSmiVDPTBiBU3wzj1YAIBo26OMHDkfUnmtBgtzOfcb64QPaUmMfCkzadxrd8inYlpz+0AoahCTTONkElMxj+wa7SYVF4GphrDKDvlPi83bcLmO39veNVcLYHcDa+9mWBP3AlI3TdKqJpgOtCzLu9qbhlpmYa7YD6ijQrTJI3wOOw0uZeEARVCCKU44BVUFnWrNx5ioihETj9rAxRFrm1dx8mKDP0fCf53/Xn+LKLcYBPVovT6BpBHkaLuG6mTYU7puHN607wRhRwYhc3Y9y0sd6rHykYKL3G27w08s597paFtXg5 ubuntu@gnuoy-bastion""" # noqa
|
||||
|
||||
|
||||
class TestUtilitiesCert(ut_utils.BaseTestCase):
|
||||
|
||||
@@ -235,3 +267,11 @@ class TestUtilitiesCert(ut_utils.BaseTestCase):
|
||||
self.builder_mock.add_extension.assert_called_once_with(
|
||||
self.bcons_mock(),
|
||||
critical=True)
|
||||
|
||||
def test_is_keys_valid(self):
|
||||
self.assertTrue(
|
||||
cert.is_keys_valid(TEST_SSH_PUB_KEY, TEST_SSH_PRIVATE_KEY))
|
||||
|
||||
def test_is_keys_valid_invalid(self):
|
||||
self.assertFalse(
|
||||
cert.is_keys_valid(TEST_SSH_PUB_KEY_INVALID, TEST_SSH_PRIVATE_KEY))
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
import copy
|
||||
import io
|
||||
import mock
|
||||
import tenacity
|
||||
|
||||
import unit_tests.utils as ut_utils
|
||||
from zaza.utilities import openstack as openstack_utils
|
||||
from zaza.utilities import exceptions
|
||||
|
||||
|
||||
class TestOpenStackUtils(ut_utils.BaseTestCase):
|
||||
@@ -280,6 +282,12 @@ class TestOpenStackUtils(ut_utils.BaseTestCase):
|
||||
openstack_utils.find_cirros_image('aarch64'),
|
||||
'http://download.cirros-cloud.net/12/cirros-12-aarch64-disk.img')
|
||||
|
||||
def test_find_ubuntu_image(self):
|
||||
self.assertEqual(
|
||||
openstack_utils.find_ubuntu_image('bionic', 'aarch64'),
|
||||
('http://cloud-images.ubuntu.com/bionic/current/'
|
||||
'bionic-server-cloudimg-aarch64.img'))
|
||||
|
||||
def test_download_image(self):
|
||||
urllib_opener_mock = mock.MagicMock()
|
||||
self.patch_object(openstack_utils, "get_urllib_opener")
|
||||
@@ -400,3 +408,188 @@ class TestOpenStackUtils(ut_utils.BaseTestCase):
|
||||
glance_mock,
|
||||
'tests/c.img',
|
||||
'bob')
|
||||
|
||||
def test_create_ssh_key(self):
|
||||
nova_mock = mock.MagicMock()
|
||||
nova_mock.keypairs.findall.return_value = []
|
||||
openstack_utils.create_ssh_key(
|
||||
nova_mock,
|
||||
'mykeys')
|
||||
nova_mock.keypairs.create.assert_called_once_with(name='mykeys')
|
||||
|
||||
def test_create_ssh_key_existing(self):
|
||||
nova_mock = mock.MagicMock()
|
||||
nova_mock.keypairs.findall.return_value = ['akey']
|
||||
self.assertEqual(
|
||||
openstack_utils.create_ssh_key(
|
||||
nova_mock,
|
||||
'mykeys'),
|
||||
'akey')
|
||||
self.assertFalse(nova_mock.keypairs.create.called)
|
||||
|
||||
def test_create_ssh_key_existing_replace(self):
|
||||
nova_mock = mock.MagicMock()
|
||||
nova_mock.keypairs.findall.return_value = ['key1']
|
||||
openstack_utils.create_ssh_key(
|
||||
nova_mock,
|
||||
'mykeys',
|
||||
replace=True),
|
||||
nova_mock.keypairs.delete.assert_called_once_with('key1')
|
||||
nova_mock.keypairs.create.assert_called_once_with(name='mykeys')
|
||||
|
||||
def test_get_private_key_file(self):
|
||||
self.assertEqual(
|
||||
openstack_utils.get_private_key_file('mykeys'),
|
||||
'tests/id_rsa_mykeys')
|
||||
|
||||
def test_write_private_key(self):
|
||||
m = mock.mock_open()
|
||||
with mock.patch('zaza.utilities.openstack.open', m, create=False):
|
||||
openstack_utils.write_private_key('mykeys', 'keycontents')
|
||||
m.assert_called_once_with('tests/id_rsa_mykeys', 'w')
|
||||
handle = m()
|
||||
handle.write.assert_called_once_with('keycontents')
|
||||
|
||||
def test_get_private_key(self):
|
||||
self.patch_object(openstack_utils.os.path, "isfile",
|
||||
return_value=True)
|
||||
m = mock.mock_open(read_data='myprivkey')
|
||||
with mock.patch('zaza.utilities.openstack.open', m, create=True):
|
||||
self.assertEqual(
|
||||
openstack_utils.get_private_key('mykeys'),
|
||||
'myprivkey')
|
||||
|
||||
def test_get_private_key_file_missing(self):
|
||||
self.patch_object(openstack_utils.os.path, "isfile",
|
||||
return_value=False)
|
||||
self.assertIsNone(openstack_utils.get_private_key('mykeys'))
|
||||
|
||||
def test_get_public_key(self):
|
||||
key_mock = mock.MagicMock(public_key='mypubkey')
|
||||
nova_mock = mock.MagicMock()
|
||||
nova_mock.keypairs.findall.return_value = [key_mock]
|
||||
self.assertEqual(
|
||||
openstack_utils.get_public_key(nova_mock, 'mykeys'),
|
||||
'mypubkey')
|
||||
|
||||
def test_valid_key_exists(self):
|
||||
nova_mock = mock.MagicMock()
|
||||
self.patch_object(openstack_utils, 'get_public_key',
|
||||
return_value='pubkey')
|
||||
self.patch_object(openstack_utils, 'get_private_key',
|
||||
return_value='privkey')
|
||||
self.patch_object(openstack_utils.cert, 'is_keys_valid',
|
||||
return_value=True)
|
||||
self.assertTrue(openstack_utils.valid_key_exists(nova_mock, 'mykeys'))
|
||||
self.get_public_key.assert_called_once_with(nova_mock, 'mykeys')
|
||||
self.get_private_key.assert_called_once_with('mykeys')
|
||||
self.is_keys_valid.assert_called_once_with('pubkey', 'privkey')
|
||||
|
||||
def test_valid_key_exists_missing(self):
|
||||
nova_mock = mock.MagicMock()
|
||||
self.patch_object(openstack_utils, 'get_public_key',
|
||||
return_value='pubkey')
|
||||
self.patch_object(openstack_utils, 'get_private_key',
|
||||
return_value=None)
|
||||
self.patch_object(openstack_utils.cert, 'is_keys_valid',
|
||||
return_value=True)
|
||||
self.assertFalse(openstack_utils.valid_key_exists(nova_mock, 'mykeys'))
|
||||
self.get_public_key.assert_called_once_with(nova_mock, 'mykeys')
|
||||
self.get_private_key.assert_called_once_with('mykeys')
|
||||
|
||||
def test_get_ports_from_device_id(self):
|
||||
port_mock = {'device_id': 'dev1'}
|
||||
neutron_mock = mock.MagicMock()
|
||||
neutron_mock.list_ports.return_value = {
|
||||
'ports': [port_mock]}
|
||||
self.assertEqual(
|
||||
openstack_utils.get_ports_from_device_id(
|
||||
neutron_mock,
|
||||
'dev1'),
|
||||
[port_mock])
|
||||
|
||||
def test_get_ports_from_device_id_no_match(self):
|
||||
port_mock = {'device_id': 'dev2'}
|
||||
neutron_mock = mock.MagicMock()
|
||||
neutron_mock.list_ports.return_value = {
|
||||
'ports': [port_mock]}
|
||||
self.assertEqual(
|
||||
openstack_utils.get_ports_from_device_id(
|
||||
neutron_mock,
|
||||
'dev1'),
|
||||
[])
|
||||
|
||||
def test_ping_response(self):
|
||||
self.patch_object(openstack_utils.subprocess, 'check_call')
|
||||
openstack_utils.ping_response('10.0.0.10')
|
||||
self.check_call.assert_called_once_with(
|
||||
['ping', '-c', '1', '-W', '1', '10.0.0.10'], stdout=-3)
|
||||
|
||||
def test_ping_response_fail(self):
|
||||
openstack_utils.ping_response.retry.wait = \
|
||||
tenacity.wait_none()
|
||||
self.patch_object(openstack_utils.subprocess, 'check_call')
|
||||
self.check_call.side_effect = Exception()
|
||||
with self.assertRaises(Exception):
|
||||
openstack_utils.ping_response('10.0.0.10')
|
||||
|
||||
def test_ssh_test(self):
|
||||
paramiko_mock = mock.MagicMock()
|
||||
self.patch_object(openstack_utils.paramiko, 'SSHClient',
|
||||
return_value=paramiko_mock)
|
||||
self.patch_object(openstack_utils.paramiko, 'AutoAddPolicy',
|
||||
return_value='some_policy')
|
||||
stdout = io.StringIO("myvm")
|
||||
|
||||
paramiko_mock.exec_command.return_value = ('stdin', stdout, 'stderr')
|
||||
openstack_utils.ssh_test(
|
||||
'bob',
|
||||
'10.0.0.10',
|
||||
'myvm',
|
||||
password='reallyhardpassord')
|
||||
paramiko_mock.connect.assert_called_once_with(
|
||||
'10.0.0.10',
|
||||
password='reallyhardpassord',
|
||||
username='bob')
|
||||
|
||||
def test_ssh_test_wrong_server(self):
|
||||
paramiko_mock = mock.MagicMock()
|
||||
self.patch_object(openstack_utils.paramiko, 'SSHClient',
|
||||
return_value=paramiko_mock)
|
||||
self.patch_object(openstack_utils.paramiko, 'AutoAddPolicy',
|
||||
return_value='some_policy')
|
||||
stdout = io.StringIO("anothervm")
|
||||
|
||||
paramiko_mock.exec_command.return_value = ('stdin', stdout, 'stderr')
|
||||
with self.assertRaises(exceptions.SSHFailed):
|
||||
openstack_utils.ssh_test(
|
||||
'bob',
|
||||
'10.0.0.10',
|
||||
'myvm',
|
||||
password='reallyhardpassord')
|
||||
paramiko_mock.connect.assert_called_once_with(
|
||||
'10.0.0.10',
|
||||
password='reallyhardpassord',
|
||||
username='bob')
|
||||
|
||||
def test_ssh_test_key_auth(self):
|
||||
paramiko_mock = mock.MagicMock()
|
||||
self.patch_object(openstack_utils.paramiko, 'SSHClient',
|
||||
return_value=paramiko_mock)
|
||||
self.patch_object(openstack_utils.paramiko, 'AutoAddPolicy',
|
||||
return_value='some_policy')
|
||||
self.patch_object(openstack_utils.paramiko.RSAKey, 'from_private_key',
|
||||
return_value='akey')
|
||||
stdout = io.StringIO("myvm")
|
||||
|
||||
paramiko_mock.exec_command.return_value = ('stdin', stdout, 'stderr')
|
||||
openstack_utils.ssh_test(
|
||||
'bob',
|
||||
'10.0.0.10',
|
||||
'myvm',
|
||||
privkey='myprivkey')
|
||||
paramiko_mock.connect.assert_called_once_with(
|
||||
'10.0.0.10',
|
||||
password='',
|
||||
pkey='akey',
|
||||
username='bob')
|
||||
|
||||
@@ -3,6 +3,14 @@
|
||||
import zaza.utilities.openstack as openstack_utils
|
||||
|
||||
|
||||
def basic_setup():
|
||||
"""Run setup for testing glance.
|
||||
|
||||
Glance setup for testing glance is currently part of glance functional
|
||||
tests. Image setup for other tests to use should go here.
|
||||
"""
|
||||
|
||||
|
||||
def add_cirros_image(glance_client=None):
|
||||
"""Add a cirros image to the current deployment.
|
||||
|
||||
@@ -17,13 +25,24 @@ def add_cirros_image(glance_client=None):
|
||||
openstack_utils.create_image(
|
||||
glance_client,
|
||||
image_url,
|
||||
'cirrosimage')
|
||||
'cirros')
|
||||
|
||||
|
||||
def basic_setup():
|
||||
"""Run setup for testing glance.
|
||||
def add_lts_image(glance_client=None):
|
||||
"""Add an Ubuntu LTS image to the current deployment.
|
||||
|
||||
Glance setup for testing glance is currently part of glance functional
|
||||
tests. Image setup for other tests to use should go here.
|
||||
:param glance: Authenticated glanceclient
|
||||
:type glance: glanceclient.Client
|
||||
"""
|
||||
add_cirros_image()
|
||||
if not glance_client:
|
||||
keystone_session = openstack_utils.get_overcloud_keystone_session()
|
||||
glance_client = openstack_utils.get_glance_session_client(
|
||||
keystone_session)
|
||||
image_url = openstack_utils.find_ubuntu_image(
|
||||
release='bionic',
|
||||
arch='amd64')
|
||||
print(image_url)
|
||||
openstack_utils.create_image(
|
||||
glance_client,
|
||||
image_url,
|
||||
'bionic')
|
||||
|
||||
1
zaza/charm_tests/neutron/__init__.py
Normal file
1
zaza/charm_tests/neutron/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Collection of code for setting up and testing neutron."""
|
||||
71
zaza/charm_tests/neutron/setup.py
Normal file
71
zaza/charm_tests/neutron/setup.py
Normal file
@@ -0,0 +1,71 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Setup for Neutron deployments."""
|
||||
|
||||
from zaza.configure import (
|
||||
network,
|
||||
)
|
||||
from zaza.utilities import (
|
||||
cli as cli_utils,
|
||||
generic as generic_utils,
|
||||
juju as juju_utils,
|
||||
openstack as openstack_utils,
|
||||
)
|
||||
|
||||
# The overcloud network configuration settings are declared.
|
||||
# These are the network configuration settings under test.
|
||||
OVERCLOUD_NETWORK_CONFIG = {
|
||||
"network_type": "gre",
|
||||
"router_name": "provider-router",
|
||||
"ip_version": "4",
|
||||
"address_scope": "public",
|
||||
"external_net_name": "ext_net",
|
||||
"external_subnet_name": "ext_net_subnet",
|
||||
"prefix_len": "24",
|
||||
"subnetpool_name": "pooled_subnets",
|
||||
"subnetpool_prefix": "192.168.0.0/16",
|
||||
}
|
||||
|
||||
# The undercloud network configuration settings are substrate specific to
|
||||
# the environment where the tests are being executed. These settings may be
|
||||
# overridden by environment variables. See the doc string documentation for
|
||||
# zaza.utilities.generic_utils.get_undercloud_env_vars for the environment
|
||||
# variables required to be exported and available to zaza.
|
||||
# These are default settings provided as an example.
|
||||
DEFAULT_UNDERCLOUD_NETWORK_CONFIG = {
|
||||
"start_floating_ip": "10.5.150.0",
|
||||
"end_floating_ip": "10.5.150.254",
|
||||
"external_dns": "10.5.0.2",
|
||||
"external_net_cidr": "10.5.0.0/16",
|
||||
"default_gateway": "10.5.0.1",
|
||||
}
|
||||
|
||||
|
||||
def basic_overcloud_network():
|
||||
"""Run setup for neutron networking.
|
||||
|
||||
Configure the following:
|
||||
The overcloud network using subnet pools
|
||||
|
||||
"""
|
||||
cli_utils.setup_logging()
|
||||
|
||||
# Get network configuration settings
|
||||
network_config = {}
|
||||
# Declared overcloud settings
|
||||
network_config.update(OVERCLOUD_NETWORK_CONFIG)
|
||||
# Default undercloud settings
|
||||
network_config.update(DEFAULT_UNDERCLOUD_NETWORK_CONFIG)
|
||||
# Environment specific settings
|
||||
network_config.update(generic_utils.get_undercloud_env_vars())
|
||||
|
||||
# Get keystone session
|
||||
keystone_session = openstack_utils.get_overcloud_keystone_session()
|
||||
|
||||
# Handle network for Openstack-on-Openstack scenarios
|
||||
if juju_utils.get_provider_type() == "openstack":
|
||||
undercloud_ks_sess = openstack_utils.get_undercloud_keystone_session()
|
||||
network.setup_gateway_ext_port(network_config,
|
||||
keystone_session=undercloud_ks_sess)
|
||||
|
||||
# Confugre the overcloud network
|
||||
network.setup_sdn(network_config, keystone_session=keystone_session)
|
||||
1
zaza/charm_tests/nova/__init__.py
Normal file
1
zaza/charm_tests/nova/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Collection of code for setting up and testing nova."""
|
||||
51
zaza/charm_tests/nova/setup.py
Normal file
51
zaza/charm_tests/nova/setup.py
Normal file
@@ -0,0 +1,51 @@
|
||||
"""Code for configureing nova."""
|
||||
|
||||
import zaza.utilities.openstack as openstack_utils
|
||||
from zaza.utilities import (
|
||||
cli as cli_utils,
|
||||
)
|
||||
import zaza.charm_tests.nova.utils as nova_utils
|
||||
|
||||
|
||||
def create_flavors(nova_client=None):
|
||||
"""Create basic flavors.
|
||||
|
||||
:param nova_client: Authenticated nova client
|
||||
:type nova_client: novaclient.v2.client.Client
|
||||
"""
|
||||
if not nova_client:
|
||||
keystone_session = openstack_utils.get_overcloud_keystone_session()
|
||||
nova_client = openstack_utils.get_nova_session_client(
|
||||
keystone_session)
|
||||
cli_utils.setup_logging()
|
||||
names = [flavor.name for flavor in nova_client.flavors.list()]
|
||||
for flavor in nova_utils.FLAVORS.keys():
|
||||
if flavor not in names:
|
||||
nova_client.flavors.create(
|
||||
name=flavor,
|
||||
ram=nova_utils.FLAVORS[flavor]['ram'],
|
||||
vcpus=nova_utils.FLAVORS[flavor]['vcpus'],
|
||||
disk=nova_utils.FLAVORS[flavor]['disk'],
|
||||
flavorid=nova_utils.FLAVORS[flavor]['flavorid'])
|
||||
|
||||
|
||||
def manage_ssh_key(nova_client=None):
|
||||
"""Create basic flavors.
|
||||
|
||||
:param nova_client: Authenticated nova client
|
||||
:type nova_client: novaclient.v2.client.Client
|
||||
"""
|
||||
if not nova_client:
|
||||
keystone_session = openstack_utils.get_overcloud_keystone_session()
|
||||
nova_client = openstack_utils.get_nova_session_client(
|
||||
keystone_session)
|
||||
cli_utils.setup_logging()
|
||||
if not openstack_utils.valid_key_exists(nova_client,
|
||||
nova_utils.KEYPAIR_NAME):
|
||||
key = openstack_utils.create_ssh_key(
|
||||
nova_client,
|
||||
nova_utils.KEYPAIR_NAME,
|
||||
replace=True)
|
||||
openstack_utils.write_private_key(
|
||||
nova_utils.KEYPAIR_NAME,
|
||||
key.private_key)
|
||||
109
zaza/charm_tests/nova/tests.py
Normal file
109
zaza/charm_tests/nova/tests.py
Normal file
@@ -0,0 +1,109 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Encapsulate nova testing."""
|
||||
|
||||
import logging
|
||||
import time
|
||||
import unittest
|
||||
|
||||
import zaza.model as model
|
||||
import zaza.utilities.openstack as openstack_utils
|
||||
import zaza.charm_tests.nova.utils as nova_utils
|
||||
|
||||
|
||||
class BaseGuestCreateTest(unittest.TestCase):
|
||||
"""Base for tests to launch a guest."""
|
||||
|
||||
boot_tests = {
|
||||
'cirros': {
|
||||
'image_name': 'cirrosimage',
|
||||
'flavor_name': 'm1.tiny',
|
||||
'username': 'cirros',
|
||||
'bootstring': 'gocubsgo',
|
||||
'password': 'gocubsgo'},
|
||||
'bionic': {
|
||||
'image_name': 'bionic',
|
||||
'flavor_name': 'm1.small',
|
||||
'username': 'ubuntu',
|
||||
'bootstring': 'finished at'}}
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
"""Run class setup for running glance tests."""
|
||||
cls.keystone_session = openstack_utils.get_overcloud_keystone_session()
|
||||
cls.model_name = model.get_juju_model()
|
||||
cls.nova_client = openstack_utils.get_nova_session_client(
|
||||
cls.keystone_session)
|
||||
cls.neutron_client = openstack_utils.get_neutron_session_client(
|
||||
cls.keystone_session)
|
||||
|
||||
def launch_instance(self, instance_key):
|
||||
"""Launch an instance.
|
||||
|
||||
:param instance_key: Key to collect associated config data with.
|
||||
:type instance_key: str
|
||||
"""
|
||||
# Collect resource information.
|
||||
vm_name = time.strftime("%Y%m%d%H%M%S")
|
||||
image = self.nova_client.glance.find_image(
|
||||
self.boot_tests[instance_key]['image_name'])
|
||||
flavor = self.nova_client.flavors.find(
|
||||
name=self.boot_tests[instance_key]['flavor_name'])
|
||||
net = self.neutron_client.find_resource("network", "private")
|
||||
nics = [{'net-id': net.get('id')}]
|
||||
|
||||
# Launch instance.
|
||||
logging.info('Launching instance {}'.format(vm_name))
|
||||
instance = self.nova_client.servers.create(
|
||||
name=vm_name,
|
||||
image=image,
|
||||
flavor=flavor,
|
||||
key_name=nova_utils.KEYPAIR_NAME,
|
||||
nics=nics)
|
||||
|
||||
# Test Instance is ready.
|
||||
logging.info('Checking instance is active')
|
||||
openstack_utils.resource_reaches_status(
|
||||
self.nova_client.servers,
|
||||
instance.id,
|
||||
expected_status='ACTIVE')
|
||||
|
||||
logging.info('Checking cloud init is complete')
|
||||
openstack_utils.cloud_init_complete(
|
||||
self.nova_client,
|
||||
instance.id,
|
||||
self.boot_tests[instance_key]['bootstring'])
|
||||
port = openstack_utils.get_ports_from_device_id(
|
||||
self.neutron_client,
|
||||
instance.id)[0]
|
||||
logging.info('Assigning floating ip.')
|
||||
ip = openstack_utils.create_floating_ip(
|
||||
self.neutron_client,
|
||||
"ext_net",
|
||||
port=port)['floating_ip_address']
|
||||
logging.info('Assigned floating IP {} to {}'.format(ip, vm_name))
|
||||
openstack_utils.ping_response(ip)
|
||||
|
||||
# Check ssh'ing to instance.
|
||||
logging.info('Testing ssh access.')
|
||||
openstack_utils.ssh_test(
|
||||
username=self.boot_tests[instance_key]['username'],
|
||||
ip=ip,
|
||||
vm_name=vm_name,
|
||||
password=self.boot_tests[instance_key].get('password'),
|
||||
privkey=openstack_utils.get_private_key(nova_utils.KEYPAIR_NAME))
|
||||
|
||||
|
||||
class CirrosGuestCreateTest(BaseGuestCreateTest):
|
||||
"""Tests to launch a cirros image."""
|
||||
|
||||
def test_launch_small_cirros_instance(self):
|
||||
"""Launch a cirros instance and test connectivity."""
|
||||
self.launch_instance('cirros')
|
||||
|
||||
|
||||
class LTSGuestCreateTest(BaseGuestCreateTest):
|
||||
"""Tests to launch a LTS image."""
|
||||
|
||||
def test_launch_small_cirros_instance(self):
|
||||
"""Launch a cirros instance and test connectivity."""
|
||||
self.launch_instance('bionic')
|
||||
25
zaza/charm_tests/nova/utils.py
Normal file
25
zaza/charm_tests/nova/utils.py
Normal file
@@ -0,0 +1,25 @@
|
||||
"""Data for nova tests."""
|
||||
|
||||
FLAVORS = {
|
||||
'm1.tiny': {
|
||||
'flavorid': 1,
|
||||
'ram': 512,
|
||||
'disk': 1,
|
||||
'vcpus': 1},
|
||||
'm1.small': {
|
||||
'flavorid': 2,
|
||||
'ram': 2048,
|
||||
'disk': 20,
|
||||
'vcpus': 1},
|
||||
'm1.medium': {
|
||||
'flavorid': 3,
|
||||
'ram': 4096,
|
||||
'disk': 40,
|
||||
'vcpus': 2},
|
||||
'm1.large': {
|
||||
'flavorid': 4,
|
||||
'ram': 8192,
|
||||
'disk': 40,
|
||||
'vcpus': 4},
|
||||
}
|
||||
KEYPAIR_NAME = 'zaza'
|
||||
@@ -35,8 +35,8 @@ class OpenStackBaseTest(unittest.TestCase):
|
||||
cls.test_config = lifecycle_utils.get_charm_config()
|
||||
cls.application_name = cls.test_config['charm_name']
|
||||
cls.first_unit = model.get_first_unit_name(
|
||||
cls.model_name,
|
||||
cls.application_name)
|
||||
cls.application_name,
|
||||
model_name=cls.model_name)
|
||||
logging.debug('First unit is {}'.format(cls.first_unit))
|
||||
|
||||
def restart_on_changed(self, config_file, default_config, alternate_config,
|
||||
@@ -65,58 +65,60 @@ class OpenStackBaseTest(unittest.TestCase):
|
||||
# first_unit is only useed to grab a timestamp, the assumption being
|
||||
# that all the units times are in sync.
|
||||
|
||||
mtime = model.get_unit_time(self.model_name, self.first_unit)
|
||||
mtime = model.get_unit_time(
|
||||
self.first_unit,
|
||||
model_name=self.model_name)
|
||||
logging.debug('Remote unit timestamp {}'.format(mtime))
|
||||
|
||||
logging.debug('Changing charm setting to {}'.format(alternate_config))
|
||||
model.set_application_config(
|
||||
self.model_name,
|
||||
self.application_name,
|
||||
alternate_config)
|
||||
alternate_config,
|
||||
model_name=self.model_name)
|
||||
|
||||
logging.debug(
|
||||
'Waiting for updates to propagate to {}'.format(config_file))
|
||||
model.block_until_oslo_config_entries_match(
|
||||
self.model_name,
|
||||
self.application_name,
|
||||
config_file,
|
||||
alternate_entry)
|
||||
alternate_entry,
|
||||
model_name=self.model_name)
|
||||
|
||||
logging.debug(
|
||||
'Waiting for units to reach target states'.format(config_file))
|
||||
model.wait_for_application_states(
|
||||
self.model_name,
|
||||
self.test_config.get('target_deploy_status', {}))
|
||||
self.test_config.get('target_deploy_status', {}),
|
||||
model_name=self.model_name)
|
||||
|
||||
# Config update has occured and hooks are idle. Any services should
|
||||
# have been restarted by now:
|
||||
logging.debug(
|
||||
'Waiting for services ({}) to be restarted'.format(services))
|
||||
model.block_until_services_restarted(
|
||||
self.model_name,
|
||||
self.application_name,
|
||||
mtime,
|
||||
services)
|
||||
services,
|
||||
model_name=self.model_name)
|
||||
|
||||
logging.debug('Restoring charm setting to {}'.format(default_config))
|
||||
model.set_application_config(
|
||||
self.model_name,
|
||||
self.application_name,
|
||||
default_config)
|
||||
default_config,
|
||||
model_name=self.model_name)
|
||||
|
||||
logging.debug(
|
||||
'Waiting for updates to propagate to '.format(config_file))
|
||||
model.block_until_oslo_config_entries_match(
|
||||
self.model_name,
|
||||
self.application_name,
|
||||
config_file,
|
||||
default_entry)
|
||||
default_entry,
|
||||
model_name=self.model_name)
|
||||
|
||||
logging.debug(
|
||||
'Waiting for units to reach target states'.format(config_file))
|
||||
model.wait_for_application_states(
|
||||
self.model_name,
|
||||
self.test_config.get('target_deploy_status', {}))
|
||||
self.test_config.get('target_deploy_status', {}),
|
||||
model_name=self.model_name)
|
||||
|
||||
def pause_resume(self, services):
|
||||
"""Run Pause and resume tests.
|
||||
@@ -129,33 +131,41 @@ class OpenStackBaseTest(unittest.TestCase):
|
||||
:type services: list
|
||||
"""
|
||||
model.block_until_service_status(
|
||||
self.model_name,
|
||||
self.first_unit,
|
||||
services,
|
||||
'running')
|
||||
'running',
|
||||
model_name=self.model_name)
|
||||
model.block_until_unit_wl_status(
|
||||
self.model_name,
|
||||
self.first_unit,
|
||||
'active')
|
||||
model.run_action(self.model_name, self.first_unit, 'pause', {})
|
||||
'active',
|
||||
model_name=self.model_name)
|
||||
model.run_action(
|
||||
self.first_unit,
|
||||
'pause',
|
||||
{},
|
||||
model_name=self.model_name)
|
||||
model.block_until_unit_wl_status(
|
||||
self.model_name,
|
||||
self.first_unit,
|
||||
'maintenance')
|
||||
model.block_until_all_units_idle(self.model_name)
|
||||
'maintenance',
|
||||
model_name=self.model_name)
|
||||
model.block_until_all_units_idle(model_name=self.model_name)
|
||||
model.block_until_service_status(
|
||||
self.model_name,
|
||||
self.first_unit,
|
||||
services,
|
||||
'stopped')
|
||||
model.run_action(self.model_name, self.first_unit, 'resume', {})
|
||||
'stopped',
|
||||
model_name=self.model_name)
|
||||
model.run_action(
|
||||
self.first_unit,
|
||||
'resume',
|
||||
{},
|
||||
model_name=self.model_name)
|
||||
model.block_until_unit_wl_status(
|
||||
self.model_name,
|
||||
self.first_unit,
|
||||
'active')
|
||||
model.block_until_all_units_idle(self.model_name)
|
||||
'active',
|
||||
model_name=self.model_name)
|
||||
model.block_until_all_units_idle(model_name=self.model_name)
|
||||
model.block_until_service_status(
|
||||
self.model_name,
|
||||
self.first_unit,
|
||||
services,
|
||||
'running')
|
||||
'running',
|
||||
model_name=self.model_name)
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
"""Module for working with x.509 certificates."""
|
||||
|
||||
import cryptography
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
from cryptography.hazmat.primitives.asymmetric import padding, rsa
|
||||
import cryptography.hazmat.primitives.hashes as hashes
|
||||
import cryptography.hazmat.primitives.serialization as serialization
|
||||
import datetime
|
||||
@@ -205,3 +205,40 @@ def sign_csr(csr, ca_private_key, ca_cert=None, issuer_name=None,
|
||||
backend=backend)
|
||||
|
||||
return signer_ca_cert.public_bytes(encoding=serialization.Encoding.PEM)
|
||||
|
||||
|
||||
def is_keys_valid(public_key_string, private_key_string):
|
||||
"""Test whether these are a valid public/private key pair.
|
||||
|
||||
:param public_key_string: PEM encoded key data.
|
||||
:type public_key_string: str
|
||||
:param private_key_string: OpenSSH encoded key data.
|
||||
:type private_key_string: str
|
||||
"""
|
||||
private_key = serialization.load_pem_private_key(
|
||||
private_key_string.encode(),
|
||||
password=None,
|
||||
backend=cryptography.hazmat.backends.default_backend()
|
||||
)
|
||||
public_key = serialization.load_ssh_public_key(
|
||||
public_key_string.encode(),
|
||||
backend=cryptography.hazmat.backends.default_backend()
|
||||
)
|
||||
message = b"encrypted data"
|
||||
ciphertext = public_key.encrypt(
|
||||
message,
|
||||
padding.OAEP(
|
||||
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
||||
algorithm=hashes.SHA256(),
|
||||
label=None))
|
||||
|
||||
try:
|
||||
plaintext = private_key.decrypt(
|
||||
ciphertext,
|
||||
padding.OAEP(
|
||||
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
||||
algorithm=hashes.SHA256(),
|
||||
label=None))
|
||||
except ValueError:
|
||||
plaintext = ''
|
||||
return plaintext == message
|
||||
|
||||
@@ -5,3 +5,15 @@ class MissingOSAthenticationException(Exception):
|
||||
"""Exception when some data needed to authenticate is missing."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class CloudInitIncomplete(Exception):
|
||||
"""Cloud init has not completed properly."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class SSHFailed(Exception):
|
||||
"""SSH failed."""
|
||||
|
||||
pass
|
||||
|
||||
@@ -16,15 +16,19 @@ from keystoneauth1.identity import (
|
||||
v3,
|
||||
v2,
|
||||
)
|
||||
import zaza.utilities.cert as cert
|
||||
from novaclient import client as novaclient_client
|
||||
from neutronclient.v2_0 import client as neutronclient
|
||||
from neutronclient.common import exceptions as neutronexceptions
|
||||
|
||||
import io
|
||||
import juju_wait
|
||||
import logging
|
||||
import os
|
||||
import paramiko
|
||||
import re
|
||||
import six
|
||||
import subprocess
|
||||
import sys
|
||||
import tenacity
|
||||
import urllib
|
||||
@@ -38,6 +42,10 @@ from zaza.utilities import (
|
||||
|
||||
CIRROS_RELEASE_URL = 'http://download.cirros-cloud.net/version/released'
|
||||
CIRROS_IMAGE_URL = 'http://download.cirros-cloud.net'
|
||||
UBUNTU_IMAGE_URLS = {
|
||||
'bionic': ('http://cloud-images.ubuntu.com/{release}/current/'
|
||||
'{release}-server-cloudimg-{arch}.img')
|
||||
}
|
||||
|
||||
CHARM_TYPES = {
|
||||
'neutron': {
|
||||
@@ -1289,7 +1297,7 @@ def get_urllib_opener():
|
||||
def find_cirros_image(arch):
|
||||
"""Return the url for the latest cirros image for the given architecture.
|
||||
|
||||
:param arch: aarch64, arm, i386, x86_64 etc
|
||||
:param arch: aarch64, arm, i386, amd64, x86_64 etc
|
||||
:type arch: str
|
||||
:returns: URL for latest cirros image
|
||||
:rtype: str
|
||||
@@ -1301,6 +1309,11 @@ def find_cirros_image(arch):
|
||||
return '{}/{}/{}'.format(CIRROS_IMAGE_URL, version, cirros_img)
|
||||
|
||||
|
||||
def find_ubuntu_image(release, arch):
|
||||
"""Return url for image."""
|
||||
return UBUNTU_IMAGE_URLS[release].format(release=release, arch=arch)
|
||||
|
||||
|
||||
def download_image(image_url, target_file):
|
||||
"""Download the image from the given url to the specified file.
|
||||
|
||||
@@ -1337,6 +1350,7 @@ def resource_reaches_status(resource, resource_id,
|
||||
:raises: AssertionError
|
||||
"""
|
||||
resource_status = resource.get(resource_id).status
|
||||
logging.info(resource_status)
|
||||
assert resource_status == expected_status, (
|
||||
"Resource in {} state, waiting for {}" .format(resource_status,
|
||||
expected_status,))
|
||||
@@ -1456,3 +1470,189 @@ def create_image(glance, image_url, image_name, image_cache_dir='tests'):
|
||||
|
||||
image = upload_image_to_glance(glance, local_path, image_name)
|
||||
return image
|
||||
|
||||
|
||||
def create_ssh_key(nova_client, keypair_name, replace=False):
|
||||
"""Create ssh key.
|
||||
|
||||
:param nova_client: Authenticated nova client
|
||||
:type nova_client: novaclient.v2.client.Client
|
||||
:param keypair_name: Label to apply to keypair in Openstack.
|
||||
:type keypair_name: str
|
||||
:param replace: Whether to replace the existing keypair if it already
|
||||
exists.
|
||||
:type replace: str
|
||||
:returns: The keypair
|
||||
:rtype: nova.objects.keypair
|
||||
"""
|
||||
existing_keys = nova_client.keypairs.findall(name=keypair_name)
|
||||
if existing_keys:
|
||||
if replace:
|
||||
logging.info('Deleting key(s) {}'.format(keypair_name))
|
||||
for key in existing_keys:
|
||||
nova_client.keypairs.delete(key)
|
||||
else:
|
||||
return existing_keys[0]
|
||||
logging.info('Creating key %s' % (keypair_name))
|
||||
return nova_client.keypairs.create(name=keypair_name)
|
||||
|
||||
|
||||
def get_private_key_file(keypair_name):
|
||||
"""Location of the file containing the private key with the given label.
|
||||
|
||||
:param keypair_name: Label of keypair in Openstack.
|
||||
:type keypair_name: str
|
||||
:returns: Path to file containing key
|
||||
:rtype: str
|
||||
"""
|
||||
return 'tests/id_rsa_{}'.format(keypair_name)
|
||||
|
||||
|
||||
def write_private_key(keypair_name, key):
|
||||
"""Store supplied private key in file.
|
||||
|
||||
:param keypair_name: Label of keypair in Openstack.
|
||||
:type keypair_name: str
|
||||
:param key: PEM Encoded Private Key
|
||||
:type key: str
|
||||
"""
|
||||
with open(get_private_key_file(keypair_name), 'w') as key_file:
|
||||
key_file.write(key)
|
||||
|
||||
|
||||
def get_private_key(keypair_name):
|
||||
"""Return private key.
|
||||
|
||||
:param keypair_name: Label of keypair in Openstack.
|
||||
:type keypair_name: str
|
||||
:returns: PEM Encoded Private Key
|
||||
:rtype: str
|
||||
"""
|
||||
key_file = get_private_key_file(keypair_name)
|
||||
if not os.path.isfile(key_file):
|
||||
return None
|
||||
with open(key_file, 'r') as key_file:
|
||||
key = key_file.read()
|
||||
return key
|
||||
|
||||
|
||||
def get_public_key(nova_client, keypair_name):
|
||||
"""Return public key from Openstack.
|
||||
|
||||
:param nova_client: Authenticated nova client
|
||||
:type nova_client: novaclient.v2.client.Client
|
||||
:param keypair_name: Label of keypair in Openstack.
|
||||
:type keypair_name: str
|
||||
:returns: OpenSSH Encoded Public Key
|
||||
:rtype: str or None
|
||||
"""
|
||||
keys = nova_client.keypairs.findall(name=keypair_name)
|
||||
if keys:
|
||||
return keys[0].public_key
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def valid_key_exists(nova_client, keypair_name):
|
||||
"""Check if a valid public/private keypair exists for keypair_name.
|
||||
|
||||
:param nova_client: Authenticated nova client
|
||||
:type nova_client: novaclient.v2.client.Client
|
||||
:param keypair_name: Label of keypair in Openstack.
|
||||
:type keypair_name: str
|
||||
"""
|
||||
pub_key = get_public_key(nova_client, keypair_name)
|
||||
priv_key = get_private_key(keypair_name)
|
||||
if not all([pub_key, priv_key]):
|
||||
return False
|
||||
return cert.is_keys_valid(pub_key, priv_key)
|
||||
|
||||
|
||||
def get_ports_from_device_id(neutron_client, device_id):
|
||||
"""Return the ports associated with a given device.
|
||||
|
||||
:param neutron_client: Authenticated neutronclient
|
||||
:type neutron_client: neutronclient.Client object
|
||||
:param device_id: The id of the device to look for
|
||||
:type device_id: str
|
||||
:returns: List of port objects
|
||||
:rtype: []
|
||||
"""
|
||||
ports = []
|
||||
for _port in neutron_client.list_ports().get('ports'):
|
||||
if device_id in _port.get('device_id'):
|
||||
ports.append(_port)
|
||||
return ports
|
||||
|
||||
|
||||
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=1, max=60),
|
||||
reraise=True, stop=tenacity.stop_after_attempt(8))
|
||||
def cloud_init_complete(nova_client, vm_id, bootstring):
|
||||
"""Wait for cloud init to complete on the given vm.
|
||||
|
||||
If cloud init does not complete in the alloted time then
|
||||
exceptions.CloudInitIncomplete is raised.
|
||||
|
||||
:param nova_client: Authenticated nova client
|
||||
:type nova_client: novaclient.v2.client.Client
|
||||
:param vm_id,: The id of the server to monitor.
|
||||
:type vm_id: str (uuid)
|
||||
:param bootstring: The string to look for in the console output that will
|
||||
indicate cloud init is complete.
|
||||
:type bootstring: str
|
||||
:raises: exceptions.CloudInitIncomplete
|
||||
"""
|
||||
instance = nova_client.servers.find(id=vm_id)
|
||||
console_log = instance.get_console_output()
|
||||
if bootstring not in console_log:
|
||||
raise exceptions.CloudInitIncomplete()
|
||||
|
||||
|
||||
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=1, max=60),
|
||||
reraise=True, stop=tenacity.stop_after_attempt(8))
|
||||
def ping_response(ip):
|
||||
"""Wait for ping to respond on the given IP.
|
||||
|
||||
:param ip: IP address to ping
|
||||
:type ip: str
|
||||
:raises: subprocess.CalledProcessError
|
||||
"""
|
||||
cmd = ['ping', '-c', '1', '-W', '1', ip]
|
||||
subprocess.check_call(cmd, stdout=subprocess.DEVNULL)
|
||||
|
||||
|
||||
def ssh_test(username, ip, vm_name, password=None, privkey=None):
|
||||
"""SSH to given ip using supplied credentials.
|
||||
|
||||
:param username: Username to connect with
|
||||
:type username: str
|
||||
:param ip: IP address to ssh to.
|
||||
:type ip: str
|
||||
:param vm_name: Name of VM.
|
||||
:type vm_name: str
|
||||
:param password: Password to authenticate with. If supplied it is used
|
||||
rather than privkey.
|
||||
:type password: str
|
||||
:param privkey: Private key to authenticate with. If a password is
|
||||
supplied it is used rather than the private key.
|
||||
:type privkey: str
|
||||
:raises: exceptions.SSHFailed
|
||||
"""
|
||||
logging.info('Attempting to ssh to %s(%s)' % (vm_name, ip))
|
||||
ssh = paramiko.SSHClient()
|
||||
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
if password:
|
||||
ssh.connect(ip, username=username, password=password)
|
||||
else:
|
||||
key = paramiko.RSAKey.from_private_key(io.StringIO(privkey))
|
||||
ssh.connect(ip, username=username, password='', pkey=key)
|
||||
stdin, stdout, stderr = ssh.exec_command('uname -n')
|
||||
return_string = stdout.readlines()[0].strip()
|
||||
ssh.close()
|
||||
if return_string == vm_name:
|
||||
logging.info('SSH to %s(%s) succesfull' % (vm_name, ip))
|
||||
else:
|
||||
logging.info('SSH to %s(%s) failed (%s != %s)' % (vm_name, ip,
|
||||
return_string,
|
||||
vm_name))
|
||||
raise exceptions.SSHFailed()
|
||||
|
||||
Reference in New Issue
Block a user