105 lines
3.6 KiB
Python
105 lines
3.6 KiB
Python
#!/usr/bin/env python3
|
|
|
|
# Copyright 2018 Canonical Ltd.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Run BGP tests."""
|
|
|
|
import argparse
|
|
import logging
|
|
import sys
|
|
import tenacity
|
|
|
|
from zaza import model
|
|
from zaza.openstack.utilities import (
|
|
cli as cli_utils,
|
|
juju as juju_utils,
|
|
openstack as openstack_utils,
|
|
)
|
|
|
|
|
|
def test_bgp_routes(peer_application_name="quagga", keystone_session=None):
|
|
"""Test BGP routes.
|
|
|
|
:param peer_application_name: String name of BGP peer application
|
|
:type peer_application_name: string
|
|
:param keystone_session: Keystone session object for overcloud
|
|
:type keystone_session: keystoneauth1.session.Session object
|
|
:raises: AssertionError if expected BGP routes are not found
|
|
:returns: None
|
|
:rtype: None
|
|
"""
|
|
# If a session has not been provided, acquire one
|
|
if not keystone_session:
|
|
keystone_session = openstack_utils.get_overcloud_keystone_session()
|
|
|
|
# Get authenticated clients
|
|
neutron_client = openstack_utils.get_neutron_session_client(
|
|
keystone_session)
|
|
|
|
# Get the peer unit
|
|
peer_unit = model.get_units(peer_application_name)[0].entity_id
|
|
|
|
# Get expected advertised routes
|
|
private_cidr = neutron_client.list_subnets(
|
|
name="private_subnet")["subnets"][0]["cidr"]
|
|
floating_ip_cidr = "{}/32".format(
|
|
neutron_client.list_floatingips()
|
|
["floatingips"][0]["floating_ip_address"])
|
|
|
|
# This test may run immediately after configuration. It may take time for
|
|
# routes to propogate via BGP. Do a binary backoff.
|
|
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=1, max=60),
|
|
reraise=True, stop=tenacity.stop_after_attempt(10))
|
|
def _assert_cidr_in_peer_routing_table(peer_unit, cidr):
|
|
logging.debug("Checking for {} on BGP peer {}"
|
|
.format(cidr, peer_unit))
|
|
# Run show ip route bgp on BGP peer
|
|
routes = juju_utils.remote_run(
|
|
peer_unit, remote_cmd='vtysh -c "show ip route bgp"')
|
|
logging.info(routes)
|
|
assert cidr in routes, (
|
|
"CIDR, {}, not found in BGP peer's routing table: {}"
|
|
.format(cidr, routes))
|
|
|
|
_assert_cidr_in_peer_routing_table(peer_unit, private_cidr)
|
|
logging.info("Private subnet CIDR, {}, found in routing table"
|
|
.format(private_cidr))
|
|
_assert_cidr_in_peer_routing_table(peer_unit, floating_ip_cidr)
|
|
logging.info("Floating IP CIDR, {}, found in routing table"
|
|
.format(floating_ip_cidr))
|
|
|
|
|
|
def run_from_cli():
|
|
"""Run test for BGP routes from CLI.
|
|
|
|
:returns: None
|
|
:rtype: None
|
|
"""
|
|
cli_utils.setup_logging()
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--peer-application", "-a",
|
|
help="BGP Peer application name. Default: quagga",
|
|
default="quagga")
|
|
options = parser.parse_args()
|
|
|
|
peer_application_name = cli_utils.parse_arg(options,
|
|
"peer_application")
|
|
|
|
test_bgp_routes(peer_application_name)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(run_from_cli())
|