Merge pull request #50 from thedac/controller

Controller Module and Functional Break up of Utilities
This commit is contained in:
Frode Nordahl
2018-05-14 20:00:51 +02:00
committed by GitHub
21 changed files with 902 additions and 622 deletions

View File

@@ -0,0 +1,80 @@
import mock
import unit_tests.utils as ut_utils
import zaza.controller as controller
class TestController(ut_utils.BaseTestCase):
def setUp(self):
super(TestController, self).setUp()
async def _disconnect():
return
async def _connect():
return
async def _list_models():
return self.models
async def _add_model(model_name):
return self.model1
async def _destroy_model(model_name):
return
async def _get_cloud():
return self.cloud
# Cloud
self.cloud = "FakeCloud"
# Model
self.Model_mock = mock.MagicMock()
self.Model_mock.connect.side_effect = _connect
self.Model_mock.disconnect.side_effect = _disconnect
self.Model_mock.disconnect.side_effect = _disconnect
self.model1 = self.Model_mock
self.model2 = mock.MagicMock()
self.model1.info.name = "model1"
self.model2.info.name = "model2"
self.models = [self.model1.info.name, self.model2.info.name]
# Controller
self.Controller_mock = mock.MagicMock()
self.Controller_mock.connect.side_effect = _connect
self.Controller_mock.disconnect.side_effect = _disconnect
self.Controller_mock.add_model.side_effect = _add_model
self.Controller_mock.destroy_model.side_effect = _destroy_model
self.Controller_mock.list_models.side_effect = _list_models
self.Controller_mock.get_cloud.side_effect = _get_cloud
self.controller_name = "testcontroller"
self.Controller_mock.info.name = self.controller_name
self.patch_object(controller, 'Controller')
self.Controller.return_value = self.Controller_mock
def test_add_model(self):
self.assertEqual(controller.add_model(self.model1.info.name),
self.model1.info.name)
self.Controller_mock.add_model.assert_called_once_with(
self.model1.info.name)
self.model1.connect.assert_called_once()
def test_destroy_model(self):
controller.destroy_model(self.model1.info.name)
self.Controller_mock.destroy_model.assert_called_once_with(
self.model1.info.name)
def test_get_cloud(self):
self.assertEqual(
controller.get_cloud(),
self.cloud)
self.Controller_mock.get_cloud.assert_called_once()
def test_list_models(self):
self.assertEqual(
controller.list_models(),
self.models)
self.Controller_mock.list_models.assert_called_once()

View File

@@ -0,0 +1,49 @@
import mock
import unit_tests.utils as ut_utils
from zaza.utilities import cli as cli_utils
class TestCLIUtils(ut_utils.BaseTestCase):
def setUp(self):
super(TestCLIUtils, self).setUp()
def test_parse_arg(self):
_options = mock.MagicMock()
_arg_property = "property-value"
_options.property = _arg_property
# Argparse value
self.assertEqual(cli_utils.parse_arg(_options, "property"),
_arg_property)
# Single value environment
_environ_value = "environ-value"
_env = {"PROPERTY": _environ_value}
with mock.patch.dict(cli_utils.os.environ, _env):
self.assertEqual(cli_utils.parse_arg(_options, "property"),
_environ_value)
# Multi value environment
_multi_value = "val1 val2"
_env = {"PROPERTY": _multi_value}
with mock.patch.dict(cli_utils.os.environ, _env):
self.assertEqual(
cli_utils.parse_arg(_options, "property", multiargs=True),
_multi_value.split())
def test_setup_logging(self):
self.patch_object(cli_utils, "logging")
_logformatter = mock.MagicMock()
_logger = mock.MagicMock()
_consolehandler = mock.MagicMock()
self.logging.Formatter.return_value = _logformatter
self.logging.getLogger.return_value = _logger
self.logging.StreamHandler.return_value = _consolehandler
cli_utils.setup_logging()
self.logging.Formatter.assert_called_with(
datefmt='%Y-%m-%d %H:%M:%S',
fmt='%(asctime)s [%(levelname)s] %(message)s')
self.logging.getLogger.assert_called_with()
_logger.setLevel.assert_called_with("INFO")
_consolehandler.setFormatter.assert_called_with(_logformatter)
_logger.addHandler.assert_called_with(_consolehandler)

View File

@@ -0,0 +1,124 @@
import mock
import unit_tests.utils as ut_utils
from zaza.utilities import generic as generic_utils
class TestGenericUtils(ut_utils.BaseTestCase):
def setUp(self):
super(TestGenericUtils, self).setUp()
def test_dict_to_yaml(self):
_dict_data = {"key": "value"}
_str_data = "key: value\n"
self.assertEqual(generic_utils.dict_to_yaml(_dict_data),
_str_data)
def test_get_network_config(self):
self.patch_object(generic_utils.os.path, "exists")
self.patch_object(generic_utils, "get_yaml_config")
self.patch_object(generic_utils, "get_undercloud_env_vars")
net_topology = "topo"
_data = {net_topology: {"network": "DATA"}}
self.get_yaml_config.return_value = _data
# YAML file does not exist
self.exists.return_value = False
with self.assertRaises(Exception):
generic_utils.get_network_config(net_topology)
# No environmental variables
self.exists.return_value = True
self.assertEqual(
generic_utils.get_network_config(net_topology,
ignore_env_vars=True),
_data[net_topology])
self.get_yaml_config.assert_called_once_with("network.yaml")
self.get_undercloud_env_vars.assert_not_called()
# Update with environmental variables
_more_data = {"network": "NEW",
"other": "DATA"}
self.get_undercloud_env_vars.return_value = _more_data
_data[net_topology].update(_more_data)
self.assertEqual(
generic_utils.get_network_config(net_topology),
_data[net_topology])
self.get_undercloud_env_vars.assert_called_once_with()
def test_get_pkg_version(self):
self.patch_object(generic_utils.model, "get_units")
self.patch_object(generic_utils.lifecycle_utils, "get_juju_model")
self.patch_object(generic_utils.juju_utils, "remote_run")
_pkg = "os-thingy"
_version = "2:27.0.0-0ubuntu1~cloud0"
_dpkg_output = ("ii {} {} all OpenStack thingy\n"
.format(_pkg, _version))
self.remote_run.return_value = _dpkg_output
_model_name = "model-name"
self.get_juju_model.return_value = _model_name
_unit1 = mock.MagicMock()
_unit1.entity_id = "os-thingy/7"
_unit2 = mock.MagicMock()
_unit2.entity_id = "os-thingy/12"
_units = [_unit1, _unit2]
self.get_units.return_value = _units
# Matching
self.assertEqual(generic_utils.get_pkg_version(_pkg, _pkg),
_version)
# Mismatched
_different_dpkg_output = ("ii {} {} all OpenStack thingy\n"
.format(_pkg, "DIFFERENT"))
self.remote_run.side_effect = [_dpkg_output, _different_dpkg_output]
with self.assertRaises(Exception):
generic_utils.get_pkg_version(_pkg, _pkg)
def test_get_undercloud_env_vars(self):
self.patch_object(generic_utils.os.environ, "get")
def _get_env(key):
return _env.get(key)
self.get.side_effect = _get_env
# OSCI backward compatible env vars
_env = {"NET_ID": "netid",
"NAMESERVER": "10.0.0.10",
"GATEWAY": "10.0.0.1",
"CIDR_EXT": "10.0.0.0/24",
"FIP_RANGE": "10.0.200.0:10.0.200.254"}
_expected_result = {}
_expected_result["net_id"] = _env["NET_ID"]
_expected_result["external_dns"] = _env["NAMESERVER"]
_expected_result["default_gateway"] = _env["GATEWAY"]
_expected_result["external_net_cidr"] = _env["CIDR_EXT"]
_expected_result["start_floating_ip"] = _env["FIP_RANGE"].split(":")[0]
_expected_result["end_floating_ip"] = _env["FIP_RANGE"].split(":")[1]
self.assertEqual(generic_utils.get_undercloud_env_vars(),
_expected_result)
# Overriding configure.network named variables
_override = {"start_floating_ip": "10.100.50.0",
"end_floating_ip": "10.100.50.254",
"default_gateway": "10.100.0.1",
"external_net_cidr": "10.100.0.0/16"}
_env.update(_override)
_expected_result.update(_override)
self.assertEqual(generic_utils.get_undercloud_env_vars(),
_expected_result)
def test_get_yaml_config(self):
self.patch("builtins.open",
new_callable=mock.mock_open(),
name="_open")
_yaml = "data: somedata"
_yaml_dict = {"data": "somedata"}
_filename = "filename"
_fileobj = mock.MagicMock()
_fileobj.read.return_value = _yaml
self._open.return_value = _fileobj
self.assertEqual(generic_utils.get_yaml_config(_filename),
_yaml_dict)
self._open.assert_called_once_with(_filename, "r")

View File

@@ -0,0 +1,157 @@
import mock
import unit_tests.utils as ut_utils
from zaza.utilities import juju as juju_utils
class TestJujuUtils(ut_utils.BaseTestCase):
def setUp(self):
super(TestJujuUtils, self).setUp()
# Juju Status Object and data
self.key = "instance-id"
self.key_data = "machine-uuid"
self.machine = "1"
self.machine_data = {self.key: self.key_data}
self.unit = "app/1"
self.unit_data = {"machine": self.machine}
self.application = "app"
self.application_data = {"units": {self.unit: self.unit_data}}
self.subordinate_application = "subordinate_application"
self.subordinate_application_data = {
"subordinate-to": [self.application]}
self.juju_status = mock.MagicMock()
self.juju_status.name = "juju_status_object"
self.juju_status.applications.get.return_value = self.application_data
self.juju_status.machines.get.return_value = self.machine_data
# Model
self.patch_object(juju_utils, "model")
self.patch_object(juju_utils.lifecycle_utils, "get_juju_model")
self.model_name = "model-name"
self.get_juju_model.return_value = self.model_name
self.model.get_status.return_value = self.juju_status
self.run_output = {"Code": "0", "Stderr": "", "Stdout": "RESULT"}
self.error_run_output = {"Code": "1", "Stderr": "ERROR", "Stdout": ""}
self.model.run_on_unit.return_value = self.run_output
# Clouds
self.cloud_name = "FakeCloudName"
self.cloud_type = "FakeCloudType"
self.clouds = {
"clouds":
{self.cloud_name:
{"type": self.cloud_type}}}
# Controller
self.patch_object(juju_utils, "controller")
self.controller.get_cloud.return_value = self.cloud_name
def test_get_application_status(self):
self.patch_object(juju_utils, "get_full_juju_status")
self.get_full_juju_status.return_value = self.juju_status
# Full status juju object return
self.assertEqual(
juju_utils.get_application_status(), self.juju_status)
self.get_full_juju_status.assert_called_once()
# Application only dictionary return
self.assertEqual(
juju_utils.get_application_status(application=self.application),
self.application_data)
# Unit no application dictionary return
self.assertEqual(
juju_utils.get_application_status(unit=self.unit),
self.unit_data)
def test_get_cloud_configs(self):
self.patch_object(juju_utils.Path, "home")
self.patch_object(juju_utils.generic_utils, "get_yaml_config")
self.get_yaml_config.return_value = self.clouds
# All the cloud configs
self.assertEqual(juju_utils.get_cloud_configs(), self.clouds)
# With cloud specified
self.assertEqual(juju_utils.get_cloud_configs(self.cloud_name),
self.clouds["clouds"][self.cloud_name])
def test_get_full_juju_status(self):
self.assertEqual(juju_utils.get_full_juju_status(), self.juju_status)
self.model.get_status.assert_called_once_with(self.model_name)
def test_get_machines_for_application(self):
self.patch_object(juju_utils, "get_application_status")
self.get_application_status.return_value = self.application_data
# Machine data
self.assertEqual(
juju_utils.get_machines_for_application(self.application),
[self.machine])
self.get_application_status.assert_called_once()
# Subordinate application has no units
def _get_application_status(application):
_apps = {
self.application: self.application_data,
self.subordinate_application:
self.subordinate_application_data}
return _apps[application]
self.get_application_status.side_effect = _get_application_status
self.assertEqual(
juju_utils.get_machines_for_application(
self.subordinate_application),
[self.machine])
def test_get_machine_status(self):
self.patch_object(juju_utils, "get_full_juju_status")
self.get_full_juju_status.return_value = self.juju_status
# All machine data
self.assertEqual(
juju_utils.get_machine_status(self.machine),
self.machine_data)
self.get_full_juju_status.assert_called_once()
# Request a specific key
self.assertEqual(
juju_utils.get_machine_status(self.machine, self.key),
self.key_data)
def test_get_machine_uuids_for_application(self):
self.patch_object(juju_utils, "get_machines_for_application")
self.get_machines_for_application.return_value = [self.machine]
self.assertEqual(
juju_utils.get_machine_uuids_for_application(self.application),
[self.machine_data.get("instance-id")])
self.get_machines_for_application.assert_called_once_with(
self.application)
def test_get_provider_type(self):
self.patch_object(juju_utils, "get_cloud_configs")
self.get_cloud_configs.return_value = {"type": self.cloud_type}
self.assertEqual(juju_utils.get_provider_type(),
self.cloud_type)
self.get_cloud_configs.assert_called_once_with(self.cloud_name)
def test_remote_run(self):
_cmd = "do the thing"
# Success
self.assertEqual(juju_utils.remote_run(self.unit, _cmd),
self.run_output["Stdout"])
self.model.run_on_unit.assert_called_once_with(
self.model_name, self.unit, _cmd, timeout=None)
# Non-fatal failure
self.model.run_on_unit.return_value = self.error_run_output
self.assertEqual(juju_utils.remote_run(self.unit, _cmd, fatal=False),
self.error_run_output["Stderr"])
# Fatal failure
with self.assertRaises(Exception):
juju_utils.remote_run(self.unit, _cmd, fatal=True)

View File

@@ -1,7 +1,7 @@
import copy
import mock
import unit_tests.utils as ut_utils
from zaza.utilities import openstack_utils
from zaza.utilities import openstack as openstack_utils
class TestOpenStackUtils(ut_utils.BaseTestCase):

View File

@@ -1,185 +0,0 @@
import mock
import unit_tests.utils as ut_utils
from zaza.utilities import _local_utils
class TestLocalUtils(ut_utils.BaseTestCase):
def setUp(self):
super(TestLocalUtils, self).setUp()
# Juju Status Object and data
self.key = "instance-id"
self.key_data = "machine-uuid"
self.machine = "1"
self.machine_data = {self.key: self.key_data}
self.unit = "app/1"
self.unit_data = {"machine": self.machine}
self.application = "app"
self.application_data = {"units": {self.unit: self.unit_data}}
self.subordinate_application = "subordinate_application"
self.subordinate_application_data = {
"subordinate-to": [self.application]}
self.juju_status = mock.MagicMock()
self.juju_status.name = "juju_status_object"
self.juju_status.applications.get.return_value = self.application_data
self.juju_status.machines.get.return_value = self.machine_data
# Model
self.patch_object(_local_utils, "model")
self.patch_object(_local_utils.lifecycle_utils, "get_juju_model")
self.model_name = "model-name"
self.get_juju_model.return_value = self.model_name
self.model.get_status.return_value = self.juju_status
def test_get_yaml_config(self):
self.patch("builtins.open",
new_callable=mock.mock_open(),
name="_open")
_yaml = "data: somedata"
_yaml_dict = {"data": "somedata"}
_filename = "filename"
_fileobj = mock.MagicMock()
_fileobj.read.return_value = _yaml
self._open.return_value = _fileobj
self.assertEqual(_local_utils.get_yaml_config(_filename),
_yaml_dict)
self._open.assert_called_once_with(_filename, "r")
def test_get_undercloud_env_vars(self):
self.patch_object(_local_utils.os.environ, "get")
def _get_env(key):
return _env.get(key)
self.get.side_effect = _get_env
# OSCI backward compatible env vars
_env = {"NET_ID": "netid",
"NAMESERVER": "10.0.0.10",
"GATEWAY": "10.0.0.1",
"CIDR_EXT": "10.0.0.0/24",
"FIP_RANGE": "10.0.200.0:10.0.200.254"}
_expected_result = {}
_expected_result["net_id"] = _env["NET_ID"]
_expected_result["external_dns"] = _env["NAMESERVER"]
_expected_result["default_gateway"] = _env["GATEWAY"]
_expected_result["external_net_cidr"] = _env["CIDR_EXT"]
_expected_result["start_floating_ip"] = _env["FIP_RANGE"].split(":")[0]
_expected_result["end_floating_ip"] = _env["FIP_RANGE"].split(":")[1]
self.assertEqual(_local_utils.get_undercloud_env_vars(),
_expected_result)
# Overriding configure.network named variables
_override = {"start_floating_ip": "10.100.50.0",
"end_floating_ip": "10.100.50.254",
"default_gateway": "10.100.0.1",
"external_net_cidr": "10.100.0.0/16"}
_env.update(_override)
_expected_result.update(_override)
self.assertEqual(_local_utils.get_undercloud_env_vars(),
_expected_result)
def test_get_network_config(self):
self.patch_object(_local_utils.os.path, "exists")
self.patch_object(_local_utils, "get_yaml_config")
self.patch_object(_local_utils, "get_undercloud_env_vars")
net_topology = "topo"
_data = {net_topology: {"network": "DATA"}}
self.get_yaml_config.return_value = _data
# YAML file does not exist
self.exists.return_value = False
with self.assertRaises(Exception):
_local_utils.get_network_config(net_topology)
# No environmental variables
self.exists.return_value = True
self.assertEqual(
_local_utils.get_network_config(net_topology,
ignore_env_vars=True),
_data[net_topology])
self.get_yaml_config.assert_called_once_with("network.yaml")
self.get_undercloud_env_vars.assert_not_called()
# Update with environmental variables
_more_data = {"network": "NEW",
"other": "DATA"}
self.get_undercloud_env_vars.return_value = _more_data
_data[net_topology].update(_more_data)
self.assertEqual(
_local_utils.get_network_config(net_topology),
_data[net_topology])
self.get_undercloud_env_vars.assert_called_once_with()
def test_get_full_juju_status(self):
self.assertEqual(_local_utils.get_full_juju_status(), self.juju_status)
self.model.get_status.assert_called_once_with(self.model_name)
def test_get_application_status(self):
self.patch_object(_local_utils, "get_full_juju_status")
self.get_full_juju_status.return_value = self.juju_status
# Full status juju object return
self.assertEqual(
_local_utils.get_application_status(), self.juju_status)
self.get_full_juju_status.assert_called_once()
# Application only dictionary return
self.assertEqual(
_local_utils.get_application_status(application=self.application),
self.application_data)
# Unit no application dictionary return
self.assertEqual(
_local_utils.get_application_status(unit=self.unit),
self.unit_data)
def test_get_machine_status(self):
self.patch_object(_local_utils, "get_full_juju_status")
self.get_full_juju_status.return_value = self.juju_status
# All machine data
self.assertEqual(
_local_utils.get_machine_status(self.machine),
self.machine_data)
self.get_full_juju_status.assert_called_once()
# Request a specific key
self.assertEqual(
_local_utils.get_machine_status(self.machine, self.key),
self.key_data)
def test_get_machines_for_application(self):
self.patch_object(_local_utils, "get_application_status")
self.get_application_status.return_value = self.application_data
# Machine data
self.assertEqual(
_local_utils.get_machines_for_application(self.application),
[self.machine])
self.get_application_status.assert_called_once()
# Subordinate application has no units
def _get_application_status(application):
_apps = {
self.application: self.application_data,
self.subordinate_application:
self.subordinate_application_data}
return _apps[application]
self.get_application_status.side_effect = _get_application_status
self.assertEqual(
_local_utils.get_machines_for_application(
self.subordinate_application),
[self.machine])
def test_get_machine_uuids_for_application(self):
self.patch_object(_local_utils, "get_machines_for_application")
self.get_machines_for_application.return_value = [self.machine]
self.assertEqual(
_local_utils.get_machine_uuids_for_application(self.application),
[self.machine_data.get("instance-id")])
self.get_machines_for_application.assert_called_once_with(
self.application)

View File

@@ -0,0 +1,30 @@
import asyncio
def run(*steps):
"""Run the given steps in an asyncio loop
:returns: The result of the asyncio.Task
:rtype: Any
"""
if not steps:
return
loop = asyncio.get_event_loop()
for step in steps:
task = loop.create_task(step)
loop.run_until_complete(asyncio.wait([task], loop=loop))
return task.result()
def sync_wrapper(f):
"""Convert the given async function into a sync function
:returns: The de-async'd function
:rtype: function
"""
def _wrapper(*args, **kwargs):
async def _run_it():
return await f(*args, **kwargs)
return run(_run_it())
return _wrapper

View File

@@ -5,8 +5,9 @@ from zaza.configure import (
bgp_speaker,
)
from zaza.utilities import (
_local_utils,
openstack_utils,
cli as cli_utils,
generic as generic_utils,
openstack as openstack_utils,
)
DEFAULT_PEER_APPLICATION_NAME = "quagga"
@@ -28,7 +29,7 @@ OVERCLOUD_NETWORK_CONFIG = {
# The undercloud network configuration settings are substrate specific to
# the environment where the tests are being executed. These settings may be
# overridden by environment variables. See the doc string documentation for
# zaza.utilities._local_utils.get_overcloud_env_vars for the environment
# zaza.utilities.generic_utils.get_undercloud_env_vars for the environment
# variables required to be exported and available to zaza.
# These are default settings provided as an example.
DEFAULT_UNDERCLOUD_NETWORK_CONFIG = {
@@ -54,7 +55,7 @@ def setup():
:rtype: None
"""
_local_utils.setup_logging()
cli_utils.setup_logging()
# Get network configuration settings
network_config = {}
@@ -63,7 +64,7 @@ def setup():
# Default undercloud settings
network_config.update(DEFAULT_UNDERCLOUD_NETWORK_CONFIG)
# Environment specific settings
network_config.update(_local_utils.get_undercloud_env_vars())
network_config.update(generic_utils.get_undercloud_env_vars())
# Get keystone session
keystone_session = openstack_utils.get_overcloud_keystone_session()

View File

@@ -8,8 +8,9 @@ import tenacity
from zaza import model
from zaza.charm_lifecycle import utils as lifecycle_utils
from zaza.utilities import (
_local_utils,
openstack_utils,
cli as cli_utils,
juju as juju_utils,
openstack as openstack_utils,
)
@@ -52,7 +53,7 @@ def test_bgp_routes(peer_application_name="quagga", keystone_session=None):
logging.debug("Checking for {} on BGP peer {}"
.format(cidr, peer_unit))
# Run show ip route bgp on BGP peer
routes = _local_utils.remote_run(
routes = juju_utils.remote_run(
peer_unit, remote_cmd='vtysh -c "show ip route bgp"')
logging.debug(routes)
assert cidr in routes, (
@@ -73,15 +74,15 @@ def run_from_cli():
:rtype: None
"""
_local_utils.setup_logging()
cli_utils.setup_logging()
parser = argparse.ArgumentParser()
parser.add_argument("--peer-application", "-a",
help="BGP Peer application name. Default: quagga",
default="quagga")
options = parser.parse_args()
peer_application_name = _local_utils.parse_arg(options,
"peer_application")
peer_application_name = cli_utils.parse_arg(options,
"peer_application")
test_bgp_routes(peer_application_name)

View File

@@ -2,7 +2,7 @@
import unittest
from zaza.utilities import _local_utils
from zaza.utilities import generic as generic_utils
from zaza.charm_tests.dragent import test
@@ -12,7 +12,7 @@ class DRAgentTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
_local_utils.setup_logging()
generic_utils.setup_logging()
def test_bgp_routes(self):
test.test_bgp_routes(peer_application_name=self.BGP_PEER_APPLICATION)

View File

@@ -3,8 +3,10 @@
import argparse
import logging
import sys
from zaza.utilities import _local_utils
from zaza.utilities import openstack_utils
from zaza.utilities import (
cli as cli_utils,
openstack as openstack_utils,
)
EXT_NET = "ext_net"
@@ -74,15 +76,15 @@ def run_from_cli():
:rtype: None
"""
_local_utils.setup_logging()
cli_utils.setup_logging()
parser = argparse.ArgumentParser()
parser.add_argument("--peer-application", "-a",
help="BGP peer application name. Default: quagga",
default="quagga")
options = parser.parse_args()
peer_application_name = _local_utils.parse_arg(options,
"peer_application")
peer_application_name = cli_utils.parse_arg(options,
"peer_application")
setup_bgp_speaker(peer_application_name)

View File

@@ -4,8 +4,12 @@ import argparse
import logging
import sys
from zaza.utilities import _local_utils
from zaza.utilities import openstack_utils
from zaza.utilities import (
cli as cli_utils,
generic as generic_utils,
juju as juju_utils,
openstack as openstack_utils,
)
"""Configure network
@@ -34,7 +38,7 @@ The undercloud network configuration settings are substrate specific to the
environment where the tests are being executed. They primarily focus on the
provider network settings. These settings may be overridden by environment
variables. See the doc string documentation for
zaza.utilities._local_utils.get_overcloud_env_vars for the environment
zaza.utilities.generic_utils.get_undercloud_env_vars for the environment
variables required to be exported and available to zaza. Here is an example of
undercloud settings:
EXAMPLE_DEFAULT_UNDERCLOUD_NETWORK_CONFIG = {
@@ -60,7 +64,7 @@ As a python module:
network_config.update(EXAMPLE_DEFAULT_UNDERCLOUD_NETWORK_CONFIG)
# Environment specific settings
network_config.update(
zaza.utilities._local_utils.get_undercloud_env_vars())
zaza.utilities.generic_utils.get_undercloud_env_vars())
# Configure the SDN network
zaza.configure.network.setup_sdn(network_config)
@@ -222,7 +226,7 @@ def run_from_cli(**kwargs):
:rtype: None
"""
_local_utils.setup_logging()
cli_utils.setup_logging()
parser = argparse.ArgumentParser()
parser.add_argument("net_topology",
help="network topology type, default is GRE",
@@ -237,18 +241,18 @@ def run_from_cli(**kwargs):
# Handle CLI options
options = parser.parse_args()
net_topology = (kwargs.get('net_toplogoy') or
_local_utils.parse_arg(options, "net_topology"))
cli_utils.parse_arg(options, "net_topology"))
net_topology_file = (kwargs.get('net_topology_file') or
_local_utils.parse_arg(options, "net_topology_file"))
cli_utils.parse_arg(options, "net_topology_file"))
ignore_env_vars = (kwargs.get('ignore_env_vars') or
_local_utils.parse_arg(options, "ignore_env_vars"))
cli_utils.parse_arg(options, "ignore_env_vars"))
logging.info("Setting up %s network" % (net_topology))
network_config = _local_utils.get_network_config(
network_config = generic_utils.get_network_config(
net_topology, ignore_env_vars, net_topology_file)
# Handle network for Openstack-on-Openstack scenarios
if _local_utils.get_provider_type() == "openstack":
if juju_utils.get_provider_type() == "openstack":
setup_gateway_ext_port(network_config)
setup_sdn(network_config)

47
zaza/controller.py Normal file
View File

@@ -0,0 +1,47 @@
import logging
from juju.controller import Controller
from zaza import sync_wrapper
async def async_add_model(model_name):
controller = Controller()
await controller.connect()
logging.debug("Adding model {}".format(model_name))
model = await controller.add_model(model_name)
await model.connect()
model_name = model.info.name
await model.disconnect()
await controller.disconnect()
return model_name
add_model = sync_wrapper(async_add_model)
async def async_destroy_model(model_name):
controller = Controller()
await controller.connect()
logging.debug("Destroying model {}".format(model_name))
await controller.destroy_model(model_name)
await controller.disconnect()
destroy_model = sync_wrapper(async_destroy_model)
async def async_get_cloud():
controller = Controller()
await controller.connect()
cloud = await controller.get_cloud()
await controller.disconnect()
return cloud
get_cloud = sync_wrapper(async_get_cloud)
async def async_list_models():
controller = Controller()
await controller.connect()
models = await controller.list_models()
await controller.disconnect()
return models
list_models = sync_wrapper(async_list_models)

View File

@@ -10,6 +10,8 @@ from juju import loop
from juju.errors import JujuError
from juju.model import Model
from zaza import sync_wrapper
async def deployed(filter=None):
# Create a Model instance. We need to connect our Model to a Juju api
@@ -46,35 +48,6 @@ def get_unit_from_name(unit_name, model):
return unit
def run(*steps):
"""Run the given steps in an asyncio loop
:returns: The result of the asyncio.Task
:rtype: Any
"""
if not steps:
return
loop = asyncio.get_event_loop()
for step in steps:
task = loop.create_task(step)
loop.run_until_complete(asyncio.wait([task], loop=loop))
return task.result()
def sync_wrapper(f):
"""Convert the given async function into a sync function
:returns: The de-async'd function
:rtype: function
"""
def _wrapper(*args, **kwargs):
async def _run_it():
return await f(*args, **kwargs)
return run(_run_it())
return _wrapper
@asynccontextmanager
@async_generator
async def run_in_model(model_name):

View File

@@ -1,374 +0,0 @@
#!/usr/bin/env python
# The purpose of this file is for general use utilities internally and directly
# consumed by zaza. No guarantees are made for consuming these utilities
# outside of zaza. These utilities may be deprecated, removed or transformed up
# to and including parameters and return values changing without warning.
# You have been warned.
import logging
import os
import six
import subprocess
import yaml
from zaza import model
from zaza.charm_lifecycle import utils as lifecycle_utils
def get_undercloud_env_vars():
""" Get environment specific undercloud network configuration settings from
environment variables.
For each testing substrate, specific undercloud network configuration
settings should be exported into the environment to enable testing on that
substrate.
Note: *Overcloud* settings should be declared by the test caller and should
not be overridden here.
Return a dictionary compatible with zaza.configure.network functions'
expected key structure.
Example exported environment variables:
export default_gateway="172.17.107.1"
export external_net_cidr="172.17.107.0/24"
export external_dns="10.5.0.2"
export start_floating_ip="172.17.107.200"
export end_floating_ip="172.17.107.249"
Example o-c-t & uosci non-standard environment variables:
export NET_ID="a705dd0f-5571-4818-8c30-4132cc494668"
export GATEWAY="172.17.107.1"
export CIDR_EXT="172.17.107.0/24"
export NAMESERVER="10.5.0.2"
export FIP_RANGE="172.17.107.200:172.17.107.249"
:returns: Network environment variables
:rtype: dict
"""
# Handle backward compatibile OSCI enviornment variables
_vars = {}
_vars['net_id'] = os.environ.get('NET_ID')
_vars['external_dns'] = os.environ.get('NAMESERVER')
_vars['default_gateway'] = os.environ.get('GATEWAY')
_vars['external_net_cidr'] = os.environ.get('CIDR_EXT')
# Take FIP_RANGE and create start and end floating ips
_fip_range = os.environ.get('FIP_RANGE')
if _fip_range and ':' in _fip_range:
_vars['start_floating_ip'] = os.environ.get('FIP_RANGE').split(':')[0]
_vars['end_floating_ip'] = os.environ.get('FIP_RANGE').split(':')[1]
# Env var naming consistent with zaza.configure.network functions takes
# priority. Override backward compatible settings.
_keys = ['default_gateway',
'start_floating_ip',
'end_floating_ip',
'external_dns',
'external_net_cidr']
for _key in _keys:
_val = os.environ.get(_key)
if _val:
_vars[_key] = _val
# Remove keys and items with a None value
for k, v in list(_vars.items()):
if not v:
del _vars[k]
return _vars
def dict_to_yaml(dict_data):
"""Return YAML from dictionary
:param dict_data: Dictionary data
:type dict_data: dict
:returns: YAML dump
:rtype: string
"""
return yaml.dump(dict_data, default_flow_style=False)
def get_yaml_config(config_file):
"""Return configuration from YAML file
:param config_file: Configuration file name
:type config_file: string
:returns: Dictionary of configuration
:rtype: dict
"""
# Note in its original form get_mojo_config it would do a search pattern
# through mojo stage directories. This version assumes the yaml file is in
# the pwd.
logging.info('Using config %s' % (config_file))
return yaml.load(open(config_file, 'r').read())
def get_network_config(net_topology, ignore_env_vars=False,
net_topology_file="network.yaml"):
"""Get network info from network.yaml, override the values if specific
environment variables are set for the undercloud.
This function may be used when running network configuration from CLI to
pass in network configuration settings from a YAML file.
:param net_topology: Network topology name from network.yaml
:type net_topology: string
:param ignore_env_vars: Ignore enviroment variables or not
:type ignore_env_vars: boolean
:returns: Dictionary of network configuration
:rtype: dict
"""
if os.path.exists(net_topology_file):
net_info = get_yaml_config(net_topology_file)[net_topology]
else:
raise Exception("Network topology file: {} not found."
.format(net_topology_file))
if not ignore_env_vars:
logging.info("Consuming network environment variables as overrides "
"for the undercloud.")
net_info.update(get_undercloud_env_vars())
logging.info("Network info: {}".format(dict_to_yaml(net_info)))
return net_info
def parse_arg(options, arg, multiargs=False):
"""Parse argparse argments
:param options: Argparse options
:type options: argparse object
:param arg: Argument attribute key
:type arg: string
:param multiargs: More than one arugment or not
:type multiargs: boolean
:returns: Argparse atrribute value
:rtype: string
"""
if arg.upper() in os.environ:
if multiargs:
return os.environ[arg.upper()].split()
else:
return os.environ[arg.upper()]
else:
return getattr(options, arg)
def remote_run(unit, remote_cmd, timeout=None, fatal=None):
"""Run command on unit and return the output
NOTE: This function is pre-deprecated. As soon as libjuju unit.run is able
to return output this functionality should move to model.run_on_unit.
:param remote_cmd: Command to execute on unit
:type remote_cmd: string
:param timeout: Timeout value for the command
:type arg: int
:param fatal: Command failure condidered fatal or not
:type fatal: boolean
:returns: Juju run output
:rtype: string
"""
if fatal is None:
fatal = True
result = model.run_on_unit(lifecycle_utils.get_juju_model(),
unit,
remote_cmd,
timeout=timeout)
if result:
if int(result.get('Code')) == 0:
return result.get('Stdout')
else:
if fatal:
raise Exception('Error running remote command: {}'
.format(result.get('Stderr')))
return result.get('Stderr')
def get_pkg_version(application, pkg):
"""Return package version
:param application: Application name
:type application: string
:param pkg: Package name
:type pkg: string
:returns: List of package version
:rtype: list
"""
versions = []
units = model.get_units(
lifecycle_utils.get_juju_model(), application)
for unit in units:
cmd = 'dpkg -l | grep {}'.format(pkg)
out = remote_run(unit.entity_id, cmd)
versions.append(out.split('\n')[0].split()[2])
if len(set(versions)) != 1:
raise Exception('Unexpected output from pkg version check')
return versions[0]
def get_cloud_from_controller():
"""Get the cloud name from the Juju controller
:returns: Name of the cloud for the current controller
:rtype: string
"""
cmd = ['juju', 'show-controller', '--format=yaml']
output = subprocess.check_output(cmd)
if six.PY3:
output = output.decode('utf-8')
cloud_config = yaml.load(output)
# There will only be one top level controller from show-controller,
# but we do not know its name.
assert len(cloud_config) == 1
try:
return list(cloud_config.values())[0]['details']['cloud']
except KeyError:
raise KeyError("Failed to get cloud information from the controller")
def get_provider_type():
"""Get the type of the undercloud
:returns: Name of the undercloud type
:rtype: string
"""
juju_env = subprocess.check_output(['juju', 'switch'])
if six.PY3:
juju_env = juju_env.decode('utf-8')
juju_env = juju_env.strip('\n')
cloud = get_cloud_from_controller()
if cloud:
# If the controller was deployed from this system with
# the cloud configured in ~/.local/share/juju/clouds.yaml
# Determine the cloud type directly
cmd = ['juju', 'show-cloud', cloud, '--format=yaml']
output = subprocess.check_output(cmd)
if six.PY3:
output = output.decode('utf-8')
return yaml.load(output)['type']
else:
# If the controller was deployed elsewhere
# show-controllers unhelpfully returns an empty string for cloud
# For now assume openstack
return 'openstack'
def get_full_juju_status():
"""Return the full juju status output
:returns: Full juju status output
:rtype: dict
"""
status = model.get_status(lifecycle_utils.get_juju_model())
return status
def get_application_status(application=None, unit=None):
"""Return the juju status for an application
:param application: Application name
:type application: string
:param unit: Specific unit
:type unit: string
:returns: Juju status output for an application
:rtype: dict
"""
status = get_full_juju_status()
if unit and not application:
application = unit.split("/")[0]
if application:
status = status.applications.get(application)
if unit:
status = status.get('units').get(unit)
return status
def get_machine_status(machine, key=None):
"""Return the juju status for a machine
:param machine: Machine number
:type machine: string
:param key: Key option requested
:type key: string
:returns: Juju status output for a machine
:rtype: dict
"""
status = get_full_juju_status()
status = status.machines.get(machine)
if key:
status = status.get(key)
return status
def get_machines_for_application(application):
"""Return machines for a given application
:param application: Application name
:type application: string
:returns: List of machines for an application
:rtype: list
"""
status = get_application_status(application)
# libjuju juju status no longer has units for subordinate charms
# Use the application it is subordinate-to to find machines
if status.get('units') is None and status.get('subordinate-to'):
return get_machines_for_application(status.get('subordinate-to')[0])
machines = []
for unit in status.get('units').keys():
machines.append(
status.get('units').get(unit).get('machine'))
return machines
def get_machine_uuids_for_application(application):
"""Return machine uuids for a given application
:param application: Application name
:type application: string
:returns: List of machine uuuids for an application
:rtype: list
"""
uuids = []
for machine in get_machines_for_application(application):
uuids.append(get_machine_status(machine, key='instance-id'))
return uuids
def setup_logging():
"""Setup zaza logging
:returns: Nothing: This fucntion is executed for its sideffect
:rtype: None
"""
logFormatter = logging.Formatter(
fmt="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S")
rootLogger = logging.getLogger()
rootLogger.setLevel('INFO')
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(logFormatter)
rootLogger.addHandler(consoleHandler)

43
zaza/utilities/cli.py Normal file
View File

@@ -0,0 +1,43 @@
#!/usr/bin/env python3
import logging
import os
def parse_arg(options, arg, multiargs=False):
"""Parse argparse argments
:param options: Argparse options
:type options: argparse object
:param arg: Argument attribute key
:type arg: string
:param multiargs: More than one arugment or not
:type multiargs: boolean
:returns: Argparse atrribute value
:rtype: string
"""
if arg.upper() in os.environ:
if multiargs:
return os.environ[arg.upper()].split()
else:
return os.environ[arg.upper()]
else:
return getattr(options, arg)
def setup_logging():
"""Setup zaza logging
:returns: Nothing: This fucntion is executed for its sideffect
:rtype: None
"""
logFormatter = logging.Formatter(
fmt="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S")
rootLogger = logging.getLogger()
rootLogger.setLevel('INFO')
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(logFormatter)
rootLogger.addHandler(consoleHandler)

156
zaza/utilities/generic.py Normal file
View File

@@ -0,0 +1,156 @@
#!/usr/bin/env python3
import logging
import os
import yaml
from zaza import model
from zaza.charm_lifecycle import utils as lifecycle_utils
from zaza.utilities import juju as juju_utils
def dict_to_yaml(dict_data):
"""Return YAML from dictionary
:param dict_data: Dictionary data
:type dict_data: dict
:returns: YAML dump
:rtype: string
"""
return yaml.dump(dict_data, default_flow_style=False)
def get_network_config(net_topology, ignore_env_vars=False,
net_topology_file="network.yaml"):
"""Get network info from network.yaml, override the values if specific
environment variables are set for the undercloud.
This function may be used when running network configuration from CLI to
pass in network configuration settings from a YAML file.
:param net_topology: Network topology name from network.yaml
:type net_topology: string
:param ignore_env_vars: Ignore enviroment variables or not
:type ignore_env_vars: boolean
:returns: Dictionary of network configuration
:rtype: dict
"""
if os.path.exists(net_topology_file):
net_info = get_yaml_config(net_topology_file)[net_topology]
else:
raise Exception("Network topology file: {} not found."
.format(net_topology_file))
if not ignore_env_vars:
logging.info("Consuming network environment variables as overrides "
"for the undercloud.")
net_info.update(get_undercloud_env_vars())
logging.info("Network info: {}".format(dict_to_yaml(net_info)))
return net_info
def get_pkg_version(application, pkg):
"""Return package version
:param application: Application name
:type application: string
:param pkg: Package name
:type pkg: string
:returns: List of package version
:rtype: list
"""
versions = []
units = model.get_units(
lifecycle_utils.get_juju_model(), application)
for unit in units:
cmd = 'dpkg -l | grep {}'.format(pkg)
out = juju_utils.remote_run(unit.entity_id, cmd)
versions.append(out.split('\n')[0].split()[2])
if len(set(versions)) != 1:
raise Exception('Unexpected output from pkg version check')
return versions[0]
def get_undercloud_env_vars():
""" Get environment specific undercloud network configuration settings from
environment variables.
For each testing substrate, specific undercloud network configuration
settings should be exported into the environment to enable testing on that
substrate.
Note: *Overcloud* settings should be declared by the test caller and should
not be overridden here.
Return a dictionary compatible with zaza.configure.network functions'
expected key structure.
Example exported environment variables:
export default_gateway="172.17.107.1"
export external_net_cidr="172.17.107.0/24"
export external_dns="10.5.0.2"
export start_floating_ip="172.17.107.200"
export end_floating_ip="172.17.107.249"
Example o-c-t & uosci non-standard environment variables:
export NET_ID="a705dd0f-5571-4818-8c30-4132cc494668"
export GATEWAY="172.17.107.1"
export CIDR_EXT="172.17.107.0/24"
export NAMESERVER="10.5.0.2"
export FIP_RANGE="172.17.107.200:172.17.107.249"
:returns: Network environment variables
:rtype: dict
"""
# Handle backward compatibile OSCI enviornment variables
_vars = {}
_vars['net_id'] = os.environ.get('NET_ID')
_vars['external_dns'] = os.environ.get('NAMESERVER')
_vars['default_gateway'] = os.environ.get('GATEWAY')
_vars['external_net_cidr'] = os.environ.get('CIDR_EXT')
# Take FIP_RANGE and create start and end floating ips
_fip_range = os.environ.get('FIP_RANGE')
if _fip_range and ':' in _fip_range:
_vars['start_floating_ip'] = os.environ.get('FIP_RANGE').split(':')[0]
_vars['end_floating_ip'] = os.environ.get('FIP_RANGE').split(':')[1]
# Env var naming consistent with zaza.configure.network functions takes
# priority. Override backward compatible settings.
_keys = ['default_gateway',
'start_floating_ip',
'end_floating_ip',
'external_dns',
'external_net_cidr']
for _key in _keys:
_val = os.environ.get(_key)
if _val:
_vars[_key] = _val
# Remove keys and items with a None value
for k, v in list(_vars.items()):
if not v:
del _vars[k]
return _vars
def get_yaml_config(config_file):
"""Return configuration from YAML file
:param config_file: Configuration file name
:type config_file: string
:returns: Dictionary of configuration
:rtype: dict
"""
# Note in its original form get_mojo_config it would do a search pattern
# through mojo stage directories. This version assumes the yaml file is in
# the pwd.
logging.info('Using config %s' % (config_file))
return yaml.load(open(config_file, 'r').read())

171
zaza/utilities/juju.py Normal file
View File

@@ -0,0 +1,171 @@
#!/usr/bin/env python3
import os
from pathlib import Path
from zaza import (
model,
controller,
)
from zaza.charm_lifecycle import utils as lifecycle_utils
from zaza.utilities import generic as generic_utils
def get_application_status(application=None, unit=None):
"""Return the juju status for an application
:param application: Application name
:type application: string
:param unit: Specific unit
:type unit: string
:returns: Juju status output for an application
:rtype: dict
"""
status = get_full_juju_status()
if unit and not application:
application = unit.split("/")[0]
if application:
status = status.applications.get(application)
if unit:
status = status.get("units").get(unit)
return status
def get_cloud_configs(cloud=None):
"""Get cloud configuration from local clouds.yaml
libjuju does not yet have cloud information implemented.
Use libjuju as soon as possible.
:param cloud: Name of specific cloud
:type remote_cmd: string
:returns: Dictionary of cloud configuration
:rtype: dict
"""
home = str(Path.home())
cloud_config = os.path.join(home, ".local", "share", "juju", "clouds.yaml")
if cloud:
return generic_utils.get_yaml_config(cloud_config)["clouds"].get(cloud)
else:
return generic_utils.get_yaml_config(cloud_config)
def get_full_juju_status():
"""Return the full juju status output
:returns: Full juju status output
:rtype: dict
"""
status = model.get_status(lifecycle_utils.get_juju_model())
return status
def get_machines_for_application(application):
"""Return machines for a given application
:param application: Application name
:type application: string
:returns: List of machines for an application
:rtype: list
"""
status = get_application_status(application)
# libjuju juju status no longer has units for subordinate charms
# Use the application it is subordinate-to to find machines
if status.get("units") is None and status.get("subordinate-to"):
return get_machines_for_application(status.get("subordinate-to")[0])
machines = []
for unit in status.get("units").keys():
machines.append(
status.get("units").get(unit).get("machine"))
return machines
def get_machine_status(machine, key=None):
"""Return the juju status for a machine
:param machine: Machine number
:type machine: string
:param key: Key option requested
:type key: string
:returns: Juju status output for a machine
:rtype: dict
"""
status = get_full_juju_status()
status = status.machines.get(machine)
if key:
status = status.get(key)
return status
def get_machine_uuids_for_application(application):
"""Return machine uuids for a given application
:param application: Application name
:type application: string
:returns: List of machine uuuids for an application
:rtype: list
"""
uuids = []
for machine in get_machines_for_application(application):
uuids.append(get_machine_status(machine, key="instance-id"))
return uuids
def get_provider_type():
"""Get the type of the undercloud
:returns: Name of the undercloud type
:rtype: string
"""
cloud = controller.get_cloud()
if cloud:
# If the controller was deployed from this system with
# the cloud configured in ~/.local/share/juju/clouds.yaml
# Determine the cloud type directly
return get_cloud_configs(cloud)["type"]
else:
# If the controller was deployed elsewhere
# For now assume openstack
return "openstack"
def remote_run(unit, remote_cmd, timeout=None, fatal=None):
"""Run command on unit and return the output
NOTE: This function is pre-deprecated. As soon as libjuju unit.run is able
to return output this functionality should move to model.run_on_unit.
:param remote_cmd: Command to execute on unit
:type remote_cmd: string
:param timeout: Timeout value for the command
:type arg: int
:param fatal: Command failure condidered fatal or not
:type fatal: boolean
:returns: Juju run output
:rtype: string
"""
if fatal is None:
fatal = True
result = model.run_on_unit(lifecycle_utils.get_juju_model(),
unit,
remote_cmd,
timeout=timeout)
if result:
if int(result.get("Code")) == 0:
return result.get("Stdout")
else:
if fatal:
raise Exception("Error running remote command: {}"
.format(result.get("Stderr")))
return result.get("Stderr")

View File

@@ -27,7 +27,8 @@ from zaza import model
from zaza.charm_lifecycle import utils as lifecycle_utils
from zaza.utilities import (
exceptions,
_local_utils,
generic as generic_utils,
juju as juju_utils,
)
CHARM_TYPES = {
@@ -267,7 +268,7 @@ def get_gateway_uuids():
:rtype: list
"""
return _local_utils.get_machine_uuids_for_application('neutron-gateway')
return juju_utils.get_machine_uuids_for_application('neutron-gateway')
def get_ovs_uuids():
@@ -277,7 +278,7 @@ def get_ovs_uuids():
:rtype: list
"""
return (_local_utils
return (juju_utils
.get_machine_uuids_for_application('neutron-openvswitch'))
@@ -1100,8 +1101,8 @@ def get_current_os_versions(deployed_applications):
if application['name'] not in deployed_applications:
continue
version = _local_utils.get_pkg_version(application['name'],
application['type']['pkg'])
version = generic_utils.get_pkg_version(application['name'],
application['type']['pkg'])
versions[application['name']] = (
get_os_code_info(application['type']['pkg'], version))
return versions