Neutron dynamic routing testing

Add the testing required for neutron dynamic routing A.K.A dragent.
Create the zaza.charm_tests.dragent module for testing neutron dynamic
routing.
Create the zaza.configure module for reusable configuration tools.
Update utilities to simplify authenticating clients.
This commit is contained in:
David Ames
2018-04-18 14:40:53 -07:00
parent be24513e1c
commit 4278e5822d
15 changed files with 667 additions and 39 deletions
+1
View File
@@ -8,6 +8,7 @@ from setuptools.command.test import test as TestCommand
version = "0.0.1.dev1"
install_require = [
'async_generator',
'hvac',
'jinja2',
'juju',
@@ -152,7 +152,7 @@ class TestCharmLifecycleDeploy(ut_utils.BaseTestCase):
self.patch_object(lc_deploy.juju_wait, 'wait')
lc_deploy.deploy('bun.yaml', 'newmodel')
self.deploy_bundle.assert_called_once_with('bun.yaml', 'newmodel')
self.wait.assert_called_once_with()
self.wait.assert_called_once_with(wait_for_workload=True)
def test_deploy_nowait(self):
self.patch_object(lc_deploy, 'deploy_bundle')
View File
@@ -0,0 +1,76 @@
import mock
import unit_tests.utils as ut_utils
from zaza.utilities import _local_utils
class TestLocalUtils(ut_utils.BaseTestCase):
def setUp(self):
super(TestLocalUtils, self).setUp()
def test_get_yaml_config(self):
self.patch("builtins.open",
new_callable=mock.mock_open(),
name="_open")
_yaml = "data: somedata"
_yaml_dict = {"data": "somedata"}
_filename = "filename"
_fileobj = mock.MagicMock()
_fileobj.read.return_value = _yaml
self._open.return_value = _fileobj
self.assertEqual(_local_utils.get_yaml_config(_filename),
_yaml_dict)
self._open.assert_called_once_with(_filename, "r")
def test_get_network_env_vars(self):
self.patch_object(_local_utils.os.environ, "get")
_env = {"NET_ID": "netid",
"NAMESERVER": "10.0.0.10",
"GATEWAY": "10.0.0.1",
"CIDR_EXT": "10.0.0.0/24",
"CIDR_PRIV": "192.168.0.0/24"}
_result = {}
_result["net_id"] = _env["NET_ID"]
_result["external_dns"] = _env["NAMESERVER"]
_result["default_gateway"] = _env["GATEWAY"]
_result["external_net_cidr"] = _env["CIDR_EXT"]
_result["private_net_cidr"] = _env["CIDR_PRIV"]
def _get_env(key):
return _env.get(key)
self.get.side_effect = _get_env
self.assertEqual(_local_utils.get_network_env_vars(),
_result)
def test_get_net_info(self):
self.patch_object(_local_utils.os.path, "exists")
self.patch_object(_local_utils, "get_yaml_config")
self.patch_object(_local_utils, "get_network_env_vars")
net_topology = "topo"
_data = {net_topology: {"network": "DATA"}}
self.get_yaml_config.return_value = _data
# YAML file does not exist
self.exists.return_value = False
with self.assertRaises(Exception):
_local_utils.get_net_info(net_topology)
# No environmental variables
self.exists.return_value = True
self.assertEqual(
_local_utils.get_net_info(net_topology, ignore_env_vars=True),
_data[net_topology])
self.get_yaml_config.assert_called_once_with("network.yaml")
self.get_network_env_vars.assert_not_called()
# Update with environmental variables
_more_data = {"network": "NEW",
"other": "DATA"}
self.get_network_env_vars.return_value = _more_data
_data[net_topology].update(_more_data)
self.assertEqual(
_local_utils.get_net_info(net_topology),
_data[net_topology])
self.get_network_env_vars.assert_called_once_with()
@@ -1,3 +1,4 @@
import copy
import mock
import unit_tests.utils as ut_utils
from zaza.utilities import openstack_utils
@@ -7,63 +8,130 @@ class TestOpenStackUtils(ut_utils.BaseTestCase):
def setUp(self):
super(TestOpenStackUtils, self).setUp()
self.port_name = 'port_name'
self.net_uuid = 'uuid'
self.port_name = "port_name"
self.net_uuid = "net_uuid"
self.project_id = "project_uuid"
self.port = {
'port': {'id': 'port_id',
'name': self.port_name,
'network_id': self.net_uuid}}
self.ports = {'ports': [self.port['port']]}
"port": {"id": "port_id",
"name": self.port_name,
"network_id": self.net_uuid}}
self.ports = {"ports": [self.port["port"]]}
self.floatingip = {
'floatingip': {'id': 'floatingip_id',
'floating_network_id': self.net_uuid,
'port_id': 'port_id'}}
self.floatingips = {'floatingips': [self.floatingip['floatingip']]}
"floatingip": {"id": "floatingip_id",
"floating_network_id": self.net_uuid,
"port_id": "port_id"}}
self.floatingips = {"floatingips": [self.floatingip["floatingip"]]}
self.address_scope_name = "address_scope_name"
self.address_scope = {
"address_scope": {"id": "address_scope_id",
"name": self.address_scope_name,
"shared": True,
"ip_version": 4,
"tenant_id": self.project_id}}
self.address_scopes = {
"address_scopes": [self.address_scope["address_scope"]]}
self.neutronclient = mock.MagicMock()
self.neutronclient.list_ports.return_value = self.ports
self.neutronclient.create_port.return_value = self.port
self.neutronclient.list_floatingips.return_value = self.floatingips
self.neutronclient.create_floatingip.return_value = self.floatingip
self.ext_net = 'ext_net'
self.private_net = 'private_net'
self.neutronclient.list_address_scopes.return_value = (
self.address_scopes)
self.neutronclient.create_address_scope.return_value = (
self.address_scope)
self.ext_net = "ext_net"
self.private_net = "private_net"
def test_create_port(self):
self.patch_object(openstack_utils, 'get_net_uuid')
self.patch_object(openstack_utils, "get_net_uuid")
self.get_net_uuid.return_value = self.net_uuid
# Already exists
port = openstack_utils.create_port(
self.neutronclient, self.port_name, self.private_net)
self.assertEqual(port, self.port['port'])
self.assertEqual(port, self.port["port"])
self.neutronclient.create_port.assert_not_called()
# Does not yet exist
self.neutronclient.list_ports.return_value = {'ports': []}
self.port['port'].pop('id')
self.neutronclient.list_ports.return_value = {"ports": []}
self.port["port"].pop("id")
port = openstack_utils.create_port(
self.neutronclient, self.port_name, self.private_net)
self.assertEqual(port, self.port['port'])
self.assertEqual(port, self.port["port"])
self.neutronclient.create_port.assert_called_once_with(self.port)
def test_create_floating_ip(self):
self.patch_object(openstack_utils, 'get_net_uuid')
self.patch_object(openstack_utils, "get_net_uuid")
self.get_net_uuid.return_value = self.net_uuid
# Already exists
floatingip = openstack_utils.create_floating_ip(
self.neutronclient, self.ext_net, port=self.port['port'])
self.assertEqual(floatingip, self.floatingip['floatingip'])
self.neutronclient, self.ext_net, port=self.port["port"])
self.assertEqual(floatingip, self.floatingip["floatingip"])
self.neutronclient.create_floatingip.assert_not_called()
# Does not yet exist
self.neutronclient.list_floatingips.return_value = {'floatingips': []}
self.floatingip['floatingip'].pop('id')
self.neutronclient.list_floatingips.return_value = {"floatingips": []}
self.floatingip["floatingip"].pop("id")
floatingip = openstack_utils.create_floating_ip(
self.neutronclient, self.private_net, port=self.port['port'])
self.assertEqual(floatingip, self.floatingip['floatingip'])
self.neutronclient, self.private_net, port=self.port["port"])
self.assertEqual(floatingip, self.floatingip["floatingip"])
self.neutronclient.create_floatingip.assert_called_once_with(
self.floatingip)
def test_create_address_scope(self):
self.patch_object(openstack_utils, "get_net_uuid")
self.get_net_uuid.return_value = self.net_uuid
# Already exists
address_scope = openstack_utils.create_address_scope(
self.neutronclient, self.project_id, self.address_scope_name)
self.assertEqual(address_scope, self.address_scope["address_scope"])
self.neutronclient.create_address_scope.assert_not_called()
# Does not yet exist
self.neutronclient.list_address_scopes.return_value = {
"address_scopes": []}
address_scope_msg = copy.deepcopy(self.address_scope)
address_scope_msg["address_scope"].pop("id")
address_scope = openstack_utils.create_address_scope(
self.neutronclient, self.project_id, self.address_scope_name)
self.assertEqual(address_scope, self.address_scope["address_scope"])
self.neutronclient.create_address_scope.assert_called_once_with(
address_scope_msg)
def test_get_keystone_scope(self):
self.patch_object(openstack_utils, "get_current_os_versions")
# <= Liberty
self.get_current_os_versions.return_value = {"keystone": "liberty"}
self.assertEqual(openstack_utils.get_keystone_scope(), "DOMAIN")
# > Liberty
self.get_current_os_versions.return_value = {"keystone": "mitaka"}
self.assertEqual(openstack_utils.get_keystone_scope(), "PROJECT")
def test_get_overcloud_keystone_session(self):
self.patch_object(openstack_utils, "get_keystone_session")
self.patch_object(openstack_utils, "get_keystone_scope")
self.patch_object(openstack_utils, "get_overcloud_auth")
_auth = "FAKE_AUTH"
_scope = "PROJECT"
self.get_keystone_scope.return_value = _scope
self.get_overcloud_auth.return_value = _auth
openstack_utils.get_overcloud_keystone_session()
self.get_keystone_session.assert_called_once_with(_auth, scope=_scope)
def test_get_undercloud_keystone_session(self):
self.patch_object(openstack_utils, "get_keystone_session")
self.patch_object(openstack_utils, "get_keystone_scope")
self.patch_object(openstack_utils, "get_undercloud_auth")
_auth = "FAKE_AUTH"
_scope = "PROJECT"
self.get_keystone_scope.return_value = _scope
self.get_undercloud_auth.return_value = _auth
openstack_utils.get_undercloud_keystone_session()
self.get_keystone_session.assert_called_once_with(_auth, scope=_scope)
+1 -1
View File
@@ -224,7 +224,7 @@ def deploy(bundle, model, wait=True):
if wait:
logging.info("Waiting for environment to settle")
utils.set_juju_model(model)
juju_wait.wait()
juju_wait.wait(wait_for_workload=True)
def parse_args(args):
@@ -0,0 +1,52 @@
#!/usr/bin/env python3
import os
from zaza.configure import (
network,
bgp_speaker,
)
from zaza.utilities import (
_local_utils,
openstack_utils,
)
DEFAULT_PEER_APPLICATION_NAME = "quagga"
# Find a default network.yaml in tests/bundles/network.yaml of the charm
DEFAULT_TOPOLOGY_FILE = os.path.join("tests", "bundles", "network.yaml")
def setup(net_topology="pool", net_topology_file="network.yaml"):
"""Setup BGP networking
:param net_topology: String name of network topology
:type net_topology: string
:param net_info: Network configuration dictionary
:type net_info: dict
:param keystone_session: Keystone session object for overcloud
:type keystone_session: keystoneauth1.session.Session object
:returns: None
:rtype: None
"""
_local_utils.setup_logging()
# Check for the network.yaml configuration
if (not os.path.exists(net_topology_file) and
os.path.exists(DEFAULT_TOPOLOGY_FILE)):
net_topology_file = DEFAULT_TOPOLOGY_FILE
# Get network configuration from YAML and/or environment variables
net_info = _local_utils.get_net_info(net_topology,
net_topology_file=net_topology_file)
# Get keystone session
keystone_session = openstack_utils.get_overcloud_keystone_session()
# Confugre the network
network.setup_sdn(
net_topology, net_info, keystone_session=keystone_session)
# Configure BGP
bgp_speaker.setup_bgp_speaker(
peer_application_name=DEFAULT_PEER_APPLICATION_NAME,
keystone_session=keystone_session)
+99
View File
@@ -0,0 +1,99 @@
#!/usr/bin/env python3
import argparse
import logging
import time
import sys
from zaza import model
from zaza.charm_lifecycle import utils as lifecycle_utils
from zaza.utilities import (
_local_utils,
openstack_utils,
)
def test_bgp_routes(peer_application_name="quagga", keystone_session=None):
"""Test BGP routes
:param peer_application_name: String name of BGP peer application
:type peer_application_name: string
:param keystone_session: Keystone session object for overcloud
:type keystone_session: keystoneauth1.session.Session object
:raises: AssertionError if expected BGP routes are not found
:returns: None
:rtype: None
"""
# If a session has not been provided, acquire one
if not keystone_session:
keystone_session = openstack_utils.get_overcloud_keystone_session()
# Get authenticated clients
neutron_client = openstack_utils.get_neutron_session_client(
keystone_session)
# Get the peer unit
peer_unit = model.get_units(
lifecycle_utils.get_juju_model(), peer_application_name)[0]
# Get expected advertised routes
private_cidr = neutron_client.list_subnets(
name="private_subnet")["subnets"][0]["cidr"]
floating_ip_cidr = "{}/32".format(
neutron_client.list_floatingips()
["floatingips"][0]["floating_ip_address"])
# This test may run immediately after configuration. It may take time for
# routes to propogate via BGP. Do a binary backoff up to ~2 minutes long.
backoff = 2
max_wait = 129
logging.info("Checking routes on BGP peer {}".format(peer_unit.entity_id))
while backoff < max_wait:
# Run show ip route bgp on BGP peer
routes = _local_utils.remote_run(
peer_unit.entity_id, remote_cmd='vtysh -c "show ip route bgp"')
try:
logging.debug(routes)
assert private_cidr in routes, (
"Private subnet CIDR, {}, not advertised to BGP peer"
.format(private_cidr))
logging.info("Private subnet CIDR, {}, found in routing table"
.format(private_cidr))
break
except AssertionError:
logging.info("Binary backoff waiting {} seconds for bgp "
"routes on peer".format(backoff))
time.sleep(backoff)
backoff = backoff * 2
if backoff > max_wait:
raise
assert floating_ip_cidr in routes, ("Floating IP, {}, not advertised "
"to BGP peer".format(floating_ip_cidr))
logging.info("Floating IP CIDR, {}, found in routing table"
.format(floating_ip_cidr))
def run_from_cli():
"""Run test for BGP routes from CLI
:returns: None
:rtype: None
"""
_local_utils.setup_logging()
parser = argparse.ArgumentParser()
parser.add_argument("--peer-application", "-a",
help="BGP Peer application name. Default: quagga",
default="quagga")
options = parser.parse_args()
peer_application_name = _local_utils.parse_arg(options,
"peer_application")
test_bgp_routes(peer_application_name)
if __name__ == "__main__":
sys.exit(run_from_cli())
+20
View File
@@ -0,0 +1,20 @@
#!/usr/bin/env python3
import unittest
from zaza.utilities import _local_utils
from zaza.charm_tests.dragent import test_dragent
class DRAgentTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
_local_utils.setup_logging()
def test_bgp_routes(self):
test_dragent.test_bgp_routes(peer_application_name="quagga")
if __name__ == "__main__":
unittest.main()
View File
+91
View File
@@ -0,0 +1,91 @@
#!/usr/bin/env python3
import argparse
import logging
import sys
from zaza.utilities import _local_utils
from zaza.utilities import openstack_utils
EXT_NET = "ext_net"
PRIVATE_NET = "private"
FIP_TEST = "FIP TEST"
def setup_bgp_speaker(peer_application_name, keystone_session=None):
"""Setup BGP Speaker
:param peer_application_name: String name of BGP peer application
:type peer_application_name: string
:param keystone_session: Keystone session object for overcloud
:type keystone_session: keystoneauth1.session.Session object
:returns: None
:rtype: None
"""
# If a session has not been provided, acquire one
if not keystone_session:
keystone_session = openstack_utils.get_overcloud_keystone_session()
# Get authenticated clients
neutron_client = openstack_utils.get_neutron_session_client(
keystone_session)
# Create BGP speaker
logging.info("Setting up BGP speaker")
bgp_speaker = openstack_utils.create_bgp_speaker(
neutron_client, local_as=12345)
# Add networks to bgp speaker
logging.info("Advertising BGP routes")
openstack_utils.add_network_to_bgp_speaker(
neutron_client, bgp_speaker, EXT_NET)
openstack_utils.add_network_to_bgp_speaker(
neutron_client, bgp_speaker, PRIVATE_NET)
logging.debug("Advertised routes: {}"
.format(
neutron_client.list_route_advertised_from_bgp_speaker(
bgp_speaker["id"])))
# Create peer
logging.info("Setting up BGP peer")
bgp_peer = openstack_utils.create_bgp_peer(neutron_client,
peer_application_name,
remote_as=10000)
# Add peer to bgp speaker
logging.info("Adding BGP peer to BGP speaker")
openstack_utils.add_peer_to_bgp_speaker(
neutron_client, bgp_speaker, bgp_peer)
# Create Floating IP to advertise
logging.info("Creating floating IP to advertise")
port = openstack_utils.create_port(neutron_client, FIP_TEST, PRIVATE_NET)
floating_ip = openstack_utils.create_floating_ip(neutron_client, EXT_NET,
port=port)
logging.info(
"Advertised floating IP: {}".format(
floating_ip["floating_ip_address"]))
def run_from_cli():
"""Run BGP Speaker setup from CLI
:returns: None
:rtype: None
"""
_local_utils.setup_logging()
parser = argparse.ArgumentParser()
parser.add_argument("--peer-application", "-a",
help="BGP peer application name. Default: quagga",
default="quagga")
options = parser.parse_args()
peer_application_name = _local_utils.parse_arg(options,
"peer_application")
setup_bgp_speaker(peer_application_name)
if __name__ == "__main__":
sys.exit(run_from_cli())
+178
View File
@@ -0,0 +1,178 @@
#!/usr/bin/env python3
import argparse
import logging
import sys
from zaza.utilities import _local_utils
from zaza.utilities import openstack_utils
def setup_sdn(net_topology, net_info, keystone_session=None):
"""Setup Software Defined Network
:param net_topology: String name of network topology
:type net_topology: string
:param net_info: Network configuration dictionary
:type net_info: dict
:param keystone_session: Keystone session object for overcloud
:type keystone_session: keystoneauth1.session.Session object
:returns: None
:rtype: None
"""
# If a session has not been provided, acquire one
if not keystone_session:
keystone_session = openstack_utils.get_overcloud_keystone_session()
# Get authenticated clients
keystone_client = openstack_utils.get_keystone_session_client(
keystone_session)
neutron_client = openstack_utils.get_neutron_session_client(
keystone_session)
# Resolve the project name from the overcloud opentackrc into a project id
project_id = openstack_utils.get_project_id(
keystone_client,
"admin",
)
# Network Setup
subnetpools = False
if net_info.get("subnetpool_prefix"):
subnetpools = True
logging.info("Configuring overcloud network")
# Create the external network
ext_network = openstack_utils.create_external_network(
neutron_client,
project_id,
net_info.get("dvr_enabled", False),
net_info["external_net_name"])
openstack_utils.create_external_subnet(
neutron_client,
project_id,
ext_network,
net_info["default_gateway"],
net_info["external_net_cidr"],
net_info["start_floating_ip"],
net_info["end_floating_ip"],
net_info["external_subnet_name"])
# Should this be --enable_snat = False
provider_router = (
openstack_utils.create_provider_router(neutron_client, project_id))
openstack_utils.plug_extnet_into_router(
neutron_client,
provider_router,
ext_network)
ip_version = net_info.get("ip_version") or 4
subnetpool = None
if subnetpools:
address_scope = openstack_utils.create_address_scope(
neutron_client,
project_id,
net_info.get("address_scope"),
ip_version=ip_version)
subnetpool = openstack_utils.create_subnetpool(
neutron_client,
project_id,
net_info.get("subnetpool_name"),
net_info.get("subnetpool_prefix"),
address_scope)
project_network = openstack_utils.create_project_network(
neutron_client,
project_id,
shared=False,
network_type=net_info["network_type"])
project_subnet = openstack_utils.create_project_subnet(
neutron_client,
project_id,
project_network,
net_info.get("private_net_cidr"),
subnetpool=subnetpool,
ip_version=ip_version)
openstack_utils.update_subnet_dns(
neutron_client,
project_subnet,
net_info["external_dns"])
openstack_utils.plug_subnet_into_router(
neutron_client,
net_info["router_name"],
project_network,
project_subnet)
openstack_utils.add_neutron_secgroup_rules(neutron_client, project_id)
def setup_gateway_ext_port(net_info, keystone_session=None):
"""Setup external port on Neutron Gateway.
For OpenStack on OpenStack scenarios
:param net_info: Network configuration dictionary
:type net_info: dict
:param keystone_session: Keystone session object for undercloud
:type keystone_session: keystoneauth1.session.Session object
:returns: None
:rtype: None
"""
# If a session has not been provided, acquire one
if not keystone_session:
keystone_session = openstack_utils.get_undercloud_keystone_session()
# Get authenticated clients
nova_client = openstack_utils.get_nova_session_client(keystone_session)
neutron_client = openstack_utils.get_neutron_session_client(
keystone_session)
# Add an interface to the neutron-gateway units and tell juju to use it
# as the external port.
if "net_id" in net_info.keys():
net_id = net_info["net_id"]
else:
net_id = None
logging.info("Configuring network for OpenStack undercloud/provider")
openstack_utils.configure_gateway_ext_port(
nova_client,
neutron_client,
dvr_mode=net_info.get("dvr_enabled", False),
net_id=net_id)
def run_from_cli():
"""Run network configurations from CLI
:returns: None
:rtype: None
"""
_local_utils.setup_logging()
parser = argparse.ArgumentParser()
parser.add_argument("net_topology",
help="network topology type, default is GRE",
default="gre", nargs="?")
parser.add_argument("--ignore_env_vars", "-i",
help="do not override using environment variables",
action="store_true",
default=False)
parser.add_argument("--net_topology_file", "-f",
help="Network topology file location",
default="network.yaml")
# Handle CLI options
options = parser.parse_args()
net_topology = _local_utils.parse_arg(options, "net_topology")
net_topology_file = _local_utils.parse_arg(options, "net_topology_file")
ignore_env_vars = _local_utils.parse_arg(options, "ignore_env_vars")
logging.info("Setting up %s network" % (net_topology))
net_info = _local_utils.get_net_info(
net_topology, ignore_env_vars, net_topology_file)
# Handle network for Openstack-on-Openstack scenarios
if _local_utils.get_provider_type() == "openstack":
setup_gateway_ext_port(net_info)
setup_sdn(net_topology, net_info)
if __name__ == "__main__":
sys.exit(run_from_cli())
+11 -6
View File
@@ -112,7 +112,8 @@ def get_yaml_config(config_file):
return yaml.load(open(config_file, 'r').read())
def get_net_info(net_topology, ignore_env_vars=False):
def get_net_info(net_topology, ignore_env_vars=False,
net_topology_file="network.yaml"):
"""Get network info from network.yaml, override the values if specific
environment variables are set.
@@ -124,13 +125,17 @@ def get_net_info(net_topology, ignore_env_vars=False):
:rtype: dict
"""
net_info = get_yaml_config('network.yaml')[net_topology]
if os.path.exists(net_topology_file):
net_info = get_yaml_config(net_topology_file)[net_topology]
else:
raise Exception("Network topology file: {} not found."
.format(net_topology_file))
if not ignore_env_vars:
logging.info('Consuming network environment variables as overrides.')
logging.info("Consuming network environment variables as overrides.")
net_info.update(get_network_env_vars())
logging.info('Network info: {}'.format(dict_to_yaml(net_info)))
logging.info("Network info: {}".format(dict_to_yaml(net_info)))
return net_info
@@ -173,8 +178,8 @@ def remote_run(unit, remote_cmd, timeout=None, fatal=None):
"""
if fatal is None:
fatal = True
result = model.run_on_unit(unit,
lifecycle_utils.get_juju_model(),
result = model.run_on_unit(lifecycle_utils.get_juju_model(),
unit,
remote_cmd,
timeout=timeout)
if result:
+42 -4
View File
@@ -139,6 +139,23 @@ def get_neutron_session_client(session):
return neutronclient.Client(session=session)
def get_keystone_scope():
"""Return Keystone scope based on OpenStack release
:returns: String keystone scope
:rtype: string
"""
os_version = get_current_os_versions("keystone")["keystone"]
# Keystone policy.json shipped the charm with liberty requires a domain
# scoped token. Bug #1649106
if os_version == "liberty":
scope = "DOMAIN"
else:
scope = "PROJECT"
return scope
def get_keystone_session(opentackrc_creds, insecure=True, scope='PROJECT'):
"""Return keystone session
@@ -160,6 +177,28 @@ def get_keystone_session(opentackrc_creds, insecure=True, scope='PROJECT'):
return session.Session(auth=auth, verify=not insecure)
def get_overcloud_keystone_session():
"""Return Over cloud keystone session
:returns keystone_session: keystoneauth1.session.Session object
:rtype: keystoneauth1.session.Session
"""
return get_keystone_session(get_overcloud_auth(),
scope=get_keystone_scope())
def get_undercloud_keystone_session():
"""Return Under cloud keystone session
:returns keystone_session: keystoneauth1.session.Session object
:rtype: keystoneauth1.session.Session
"""
return get_keystone_session(get_undercloud_auth(),
scope=get_keystone_scope())
def get_keystone_session_client(session):
"""Return keystoneclient authenticated by keystone session
@@ -377,7 +416,7 @@ def configure_gateway_ext_port(novaclient, neutronclient,
model.set_application_config(
lifecycle_utils.get_juju_model(), application_name,
configuration={config_key: ext_br_macs_str})
juju_wait.wait()
juju_wait.wait(wait_for_workload=True)
def create_project_network(neutron_client, project_id, net_name='private',
@@ -444,10 +483,9 @@ def create_external_network(neutron_client, project_id, dvr_mode,
'name': net_name,
'router:external': True,
'tenant_id': project_id,
'provider:physical_network': 'physnet1',
'provider:network_type': 'flat',
}
if not deprecated_external_networking(dvr_mode):
network_msg['provider:physical_network'] = 'physnet1'
network_msg['provider:network_type'] = 'flat'
logging.info('Creating new external network definition: %s',
net_name)