First pass at batched parallel series upgrade

This commit is contained in:
Chris MacNaughton
2020-04-08 16:10:38 +02:00
parent f383064b17
commit cf1ea4c71b
4 changed files with 737 additions and 3 deletions

View File

@@ -90,6 +90,7 @@ class TestUpgradeUtils(ut_utils.BaseTestCase):
def test_get_upgrade_groups(self):
expected = collections.OrderedDict([
('Stateful Services', []),
('Core Identity', []),
('Control Plane', ['cinder']),
('Data Plane', ['nova-compute']),
@@ -103,10 +104,11 @@ class TestUpgradeUtils(ut_utils.BaseTestCase):
def test_get_series_upgrade_groups(self):
expected = collections.OrderedDict([
('Stateful Services', ['mydb']),
('Core Identity', []),
('Control Plane', ['cinder']),
('Data Plane', ['nova-compute']),
('sweep_up', ['mydb', 'ntp'])])
('sweep_up', ['ntp'])])
actual = openstack_upgrade.get_series_upgrade_groups()
pprint.pprint(expected)
pprint.pprint(actual)

View File

@@ -0,0 +1,202 @@
#!/usr/bin/env python3
# Copyright 2018 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Define class for Series Upgrade."""
import asyncio
import logging
import os
import sys
import unittest
from zaza import model
from zaza.openstack.utilities import (
cli as cli_utils,
upgrade_utils as upgrade_utils,
)
from zaza.openstack.charm_tests.nova.tests import LTSGuestCreateTest
from zaza.openstack.utilities.parallel_series_upgrade import (
parallel_series_upgrade,
)
def _filter_easyrsa(app, app_config, model_name=None):
charm_name = upgrade_utils.extract_charm_name_from_url(app_config['charm'])
if "easyrsa" in charm_name:
logging.warn("Skipping series upgrade of easyrsa Bug #1850121")
return True
return False
def _filter_etcd(app, app_config, model_name=None):
charm_name = upgrade_utils.extract_charm_name_from_url(app_config['charm'])
if "etcd" in charm_name:
logging.warn("Skipping series upgrade of easyrsa Bug #1850124")
return True
return False
class ParallelSeriesUpgradeTest(unittest.TestCase):
"""Class to encapsulate Series Upgrade Tests."""
@classmethod
def setUpClass(cls):
"""Run setup for Series Upgrades."""
cli_utils.setup_logging()
cls.from_series = None
cls.to_series = None
cls.workaround_script = None
cls.files = []
def test_200_run_series_upgrade(self):
"""Run series upgrade."""
# Set Feature Flag
os.environ["JUJU_DEV_FEATURE_FLAGS"] = "upgrade-series"
upgrade_groups = upgrade_utils.get_series_upgrade_groups(
extra_filters=[_filter_etcd, _filter_easyrsa])
from_series = self.from_series
to_series = self.to_series
completed_machines = []
workaround_script = None
files = []
applications = model.get_status().applications
for group_name, apps in upgrade_groups.items():
logging.info("About to upgrade {} from {} to {}".format(
group_name, from_series, to_series))
upgrade_functions = []
if group_name in ["Stateful Services", "Data Plane", "sweep_up"]:
logging.info("Going to upgrade {} unit by unit".format(apps))
upgrade_function = \
parallel_series_upgrade.serial_series_upgrade
else:
logging.info("Going to upgrade {} all at once".format(apps))
upgrade_function = \
parallel_series_upgrade.parallel_series_upgrade
for charm_name in apps:
charm = applications[charm_name]['charm']
name = upgrade_utils.extract_charm_name_from_url(charm)
upgrade_config = parallel_series_upgrade.app_config(name)
upgrade_functions.append(
upgrade_function(
charm_name,
**upgrade_config,
from_series=from_series,
to_series=to_series,
completed_machines=completed_machines,
workaround_script=workaround_script,
files=files))
asyncio.get_event_loop().run_until_complete(
asyncio.gather(*upgrade_functions))
if "rabbitmq-server" in apps:
logging.info(
"Running complete-cluster-series-upgrade action on leader")
model.run_action_on_leader(
'rabbitmq-server',
'complete-cluster-series-upgrade',
action_params={})
if "percona-cluster" in apps:
logging.info(
"Running complete-cluster-series-upgrade action on leader")
model.run_action_on_leader(
'mysql',
'complete-cluster-series-upgrade',
action_params={})
model.block_until_all_units_idle()
logging.info("Finished {}".format(group_name))
logging.info("Done!")
class OpenStackParallelSeriesUpgrade(ParallelSeriesUpgradeTest):
"""OpenStack Series Upgrade.
Full OpenStack series upgrade with VM launch before and after the series
upgrade.
This test requires a full OpenStack including at least: keystone, glance,
nova-cloud-controller, nova-compute, neutron-gateway, neutron-api and
neutron-openvswitch.
"""
@classmethod
def setUpClass(cls):
"""Run setup for Series Upgrades."""
super(OpenStackParallelSeriesUpgrade, cls).setUpClass()
cls.lts = LTSGuestCreateTest()
cls.lts.setUpClass()
def test_100_validate_pre_series_upgrade_cloud(self):
"""Validate pre series upgrade."""
logging.info("Validate pre-series-upgrade: Spin up LTS instance")
self.lts.test_launch_small_instance()
def test_500_validate_series_upgraded_cloud(self):
"""Validate post series upgrade."""
logging.info("Validate post-series-upgrade: Spin up LTS instance")
self.lts.test_launch_small_instance()
class TrustyXenialSeriesUpgrade(OpenStackParallelSeriesUpgrade):
"""OpenStack Trusty to Xenial Series Upgrade."""
@classmethod
def setUpClass(cls):
"""Run setup for Trusty to Xenial Series Upgrades."""
super(TrustyXenialSeriesUpgrade, cls).setUpClass()
cls.from_series = "trusty"
cls.to_series = "xenial"
class XenialBionicSeriesUpgrade(OpenStackParallelSeriesUpgrade):
"""OpenStack Xenial to Bionic Series Upgrade."""
@classmethod
def setUpClass(cls):
"""Run setup for Xenial to Bionic Series Upgrades."""
super(XenialBionicSeriesUpgrade, cls).setUpClass()
cls.from_series = "xenial"
cls.to_series = "bionic"
class BionicFocalSeriesUpgrade(OpenStackParallelSeriesUpgrade):
"""OpenStack Bionic to FocalSeries Upgrade."""
@classmethod
def setUpClass(cls):
"""Run setup for Xenial to Bionic Series Upgrades."""
super(BionicFocalSeriesUpgrade, cls).setUpClass()
cls.from_series = "bionic"
cls.to_series = "focal"
if __name__ == "__main__":
from_series = os.environ.get("FROM_SERIES")
if from_series == "trusty":
to_series = "xenial"
series_upgrade_test = TrustyXenialSeriesUpgrade()
elif from_series == "xenial":
to_series = "bionic"
series_upgrade_test = XenialBionicSeriesUpgrade()
elif from_series == "bionic":
to_series = "focal"
series_upgrade_test = BionicFocalSeriesUpgrade()
else:
raise Exception("FROM_SERIES is not set to a vailid LTS series")
series_upgrade_test.setUpClass()
sys.exit(series_upgrade_test.test_200_run_series_upgrade())

View File

@@ -0,0 +1,528 @@
#!/usr/bin/env python3
# Copyright 2020 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Collection of functions for testing series upgrade in parallel."""
import asyncio
import collections
import copy
import logging
import subprocess
from zaza import model
import zaza.openstack.utilities.generic as os_utils
import zaza.openstack.utilities.series_upgrade as series_upgrade_utils
from zaza.openstack.utilities.series_upgrade import (
SUBORDINATE_PAUSE_RESUME_BLACKLIST,
)
def app_config(charm_name):
"""Return a dict with the upgrade config for an application.
:param charm_name: Name of the charm about to upgrade
:type charm_name: str
:param async: Whether the upgreade functions should be async
:type async: bool
:returns: A dicitonary of the upgrade config for the application
:rtype: Dict
"""
default = {
'origin': 'openstack-origin',
'pause_non_leader_subordinate': True,
'pause_non_leader_primary': True,
'post_upgrade_functions': [],
'follower_first': False, }
_app_settings = collections.defaultdict(lambda: default)
ceph = {
'origin': "source",
'pause_non_leader_primary': False,
'pause_non_leader_subordinate': False,
}
exceptions = {
'rabbitmq-server': {
'origin': 'source',
'pause_non_leader_subordinate': False, },
'percona-cluster': {'origin': 'source', },
'nova-compute': {
'pause_non_leader_primary': False,
'pause_non_leader_subordinate': False, },
'ceph': ceph,
'ceph-mon': ceph,
'ceph-osd': ceph,
'designate-bind': {'origin': None, },
'tempest': {'origin': None, },
'memcached': {
'origin': None,
'pause_non_leader_primary': False,
'pause_non_leader_subordinate': False,
},
'vault': {
'origin': None,
'pause_non_leader_primary': False,
'pause_non_leader_subordinate': True,
'post_upgrade_functions': [
('zaza.openstack.charm_tests.vault.setup.'
'mojo_unseal_by_unit')]
},
'mongodb': {
'origin': None,
'follower_first': True,
}
}
for key, value in exceptions.items():
_app_settings[key] = copy.deepcopy(default)
_app_settings[key].update(value)
return _app_settings[charm_name]
def upgrade_ubuntu_lite(from_series='xenial', to_series='bionic'):
"""Validate that we can upgrade the ubuntu-lite charm.
:param from_series: What series are we upgrading from
:type from_series: str
:param to_series: What series are we upgrading to
:type to_series: str
"""
completed_machines = []
asyncio.get_event_loop().run_until_complete(
parallel_series_upgrade(
'ubuntu-lite', pause_non_leader_primary=False,
pause_non_leader_subordinate=False,
completed_machines=completed_machines, origin=None)
)
async def parallel_series_upgrade(
application, from_series='xenial', to_series='bionic',
origin='openstack-origin', pause_non_leader_primary=True,
pause_non_leader_subordinate=True, post_upgrade_functions=None,
completed_machines=None, files=None, workaround_script=None,
follower_first=False):
"""Perform series upgrade on an application in parallel.
:param unit_name: Unit Name
:type unit_name: str
:param machine_num: Machine number
:type machine_num: str
:param from_series: The series from which to upgrade
:type from_series: str
:param to_series: The series to which to upgrade
:type to_series: str
:param origin: The configuration setting variable name for changing origin
source. (openstack-origin or source)
:type origin: str
:param files: Workaround files to scp to unit under upgrade
:type files: list
:param workaround_script: Workaround script to run during series upgrade
:type workaround_script: str
:param follower_first: Should the follower(s) be upgraded first
:type follower_first: bool
:returns: None
:rtype: None
"""
if completed_machines is None:
completed_machines = []
if files is None:
files = []
if post_upgrade_functions is None:
post_upgrade_functions = []
if follower_first:
logging.error("leader_first is ignored for parallel upgrade")
logging.info(
"About to upgrade the units of {} in parallel (follower first: {})"
.format(application, follower_first))
# return
status = (await model.async_get_status()).applications[application]
leader, non_leaders = await get_leader_and_non_leaders(application)
for leader_name, leader_unit in leader.items():
leader_machine = leader_unit["machine"]
leader = leader_name
machines = [
unit["machine"] for name, unit
in non_leaders.items()
if unit['machine'] not in completed_machines]
await maybe_pause_things(
status,
non_leaders,
pause_non_leader_subordinate,
pause_non_leader_primary)
await series_upgrade_utils.async_set_series(
application, to_series=to_series)
prepare_group = [
prepare_series_upgrade(machine, to_series=to_series)
for machine in machines]
asyncio.gather(*prepare_group)
await prepare_series_upgrade(leader_machine, to_series=to_series)
if leader_machine not in completed_machines:
machines.append(leader_machine)
# do the dist upgrade
await dist_upgrades(machines)
# do a do-release-upgrade
await do_release_upgrades(machines)
# do a reboot
await reboots(machines)
await complete_series_upgrade(machines, to_series)
if origin:
await os_utils.async_set_origin(application, origin)
async def serial_series_upgrade(
application, from_series='xenial', to_series='bionic',
origin='openstack-origin', pause_non_leader_primary=True,
pause_non_leader_subordinate=True, post_upgrade_functions=None,
completed_machines=None, files=None, workaround_script=None,
follower_first=False,):
"""Perform series upgrade on an application in series.
:param unit_name: Unit Name
:type unit_name: str
:param machine_num: Machine number
:type machine_num: str
:param from_series: The series from which to upgrade
:type from_series: str
:param to_series: The series to which to upgrade
:type to_series: str
:param origin: The configuration setting variable name for changing origin
source. (openstack-origin or source)
:type origin: str
:param files: Workaround files to scp to unit under upgrade
:type files: list
:param workaround_script: Workaround script to run during series upgrade
:type workaround_script: str
:param follower_first: Should the follower(s) be upgraded first
:type follower_first: bool
:returns: None
:rtype: None
"""
if completed_machines is None:
completed_machines = []
if files is None:
files = []
if post_upgrade_functions is None:
post_upgrade_functions = []
logging.info(
"About to upgrade the units of {} in serial (follower first: {})"
.format(application, follower_first))
# return
status = (await model.async_get_status()).applications[application]
leader, non_leaders = await get_leader_and_non_leaders(application)
for leader_name, leader_unit in leader.items():
leader_machine = leader_unit["machine"]
leader = leader_name
machines = [
unit["machine"] for name, unit
in non_leaders.items()
if unit['machine'] not in completed_machines]
await maybe_pause_things(
status,
non_leaders,
pause_non_leader_subordinate,
pause_non_leader_primary)
await series_upgrade_utils.async_set_series(
application, to_series=to_series)
if not follower_first and leader_machine not in completed_machines:
await prepare_series_upgrade(leader_machine, to_series)
logging.info(
"About to dist-upgrade the leader ({})".format(leader_machine))
# upgrade the leader
await dist_upgrades([leader_machine])
# do a do-release-upgrade
await do_release_upgrades([leader_machine])
# do a reboot
await reboots([leader_machine])
await complete_series_upgrade([leader_machine], to_series)
for machine in machines:
await prepare_series_upgrade(machine, to_series)
logging.debug(
"About to dist-upgrade follower {}".format(machine))
await dist_upgrades([machine])
# do a do-release-upgrade
await do_release_upgrades([machine])
# do a reboot
await reboots([machine])
await complete_series_upgrade([machine], to_series)
if follower_first and leader_machine not in completed_machines:
await prepare_series_upgrade(leader_machine, to_series)
logging.info(
"About to dist-upgrade the leader ({})".format(leader_machine))
# upgrade the leader
await dist_upgrades([leader_machine])
# do a do-release-upgrade
await do_release_upgrades([leader_machine])
# do a reboot
await reboots([leader_machine])
await complete_series_upgrade([leader_machine], to_series)
if origin:
await os_utils.async_set_origin(application, origin)
async def maybe_pause_things(
status, units, pause_non_leader_subordinate=True,
pause_non_leader_primary=True):
"""Pause the non-leaders, based on the run configuration.
:param status: Juju status for an application
:type status: juju.applications
:param units: List of units to paybe pause
:type units: LIst[str]
:param pause_non_leader_subordinate: Should the non leader
subordinate be paused
:type pause_non_leader_subordinate: bool
:param pause_non_leader_primary: Should the non leaders be paused
:type pause_non_leader_primary: bool
:returns: Nothing
:trype: None
"""
subordinate_pauses = []
leader_pauses = []
for unit in units:
if pause_non_leader_subordinate:
if status["units"][unit].get("subordinates"):
for subordinate in status["units"][unit]["subordinates"]:
_app = subordinate.split('/')[0]
if _app in SUBORDINATE_PAUSE_RESUME_BLACKLIST:
logging.info("Skipping pausing {} - blacklisted"
.format(subordinate))
else:
logging.info("Pausing {}".format(subordinate))
subordinate_pauses.append(model.async_run_action(
subordinate, "pause", action_params={}))
if pause_non_leader_primary:
logging.info("Pausing {}".format(unit))
leader_pauses.append(
model.async_run_action(unit, "pause", action_params={}))
await asyncio.gather(*subordinate_pauses)
await asyncio.gather(*leader_pauses)
async def dist_upgrades(machines):
"""Run dist-upgrade on unit after update package db.
:param machines: List of machines to upgrade
:type machines: List[str]
:returns: None
:rtype: None
"""
upgrade_group = []
for machine in machines:
upgrade_group.append(async_dist_upgrade(machine))
logging.info(
"About to await the dist upgrades of {}".format(machines))
await asyncio.gather(*upgrade_group)
async def do_release_upgrades(machines):
"""Run do-release-upgrade noninteractive.
:param machines: List of machines to upgrade
:type machines: List[str]
:returns: None
:rtype: None
"""
upgrade_group = []
for machine in machines:
upgrade_group.append(async_do_release_upgrade(machine))
logging.info(
"About to await the do release upgrades of {}".format(machines))
await asyncio.gather(*upgrade_group)
async def reboots(machines):
"""Reboot all of the listed machines.
:param machines: A list of machines, ex: ['1', '2']
:type machines: List[str]
:returns: None
:rtype: None
"""
upgrade_group = []
for machine in machines:
upgrade_group.append(reboot(machine))
logging.info("About to await the reboots of {}".format(machines))
await asyncio.gather(*upgrade_group)
async def get_units(application):
"""Get all units for the named application.
:param application: The application to get units of
:type application: str
:returns: The units for a specified application
:rtype: Dict[str, juju.Unit]
"""
status = (await model.async_get_status()).applications[application]
return status["units"]
async def get_leader_and_non_leaders(application):
"""Get the leader and non-leader Juju units.
This function returns a tuple that looks like:
({
'unit/1': juju.Unit,
},
{
'unit/0': juju.Unit,
'unit/2': juju.unit,
})
The first entry of this tuple is the leader, and the second is
all non-leader units.
:param application: Application to fetch details for
:type application: str
:returns: A tuple of dicts identifying leader and non-leaders
:rtype: Dict[str, List[juju.Unit]]
"""
logging.info(
"Configuring leader / non leaders for {}".format(application))
# if completed_machines is None:
# completed_machines = []
leader = None
non_leaders = {}
for name, unit in (await get_units(application)).items():
if unit.get("leader"):
leader = {name: unit}
# leader = status["units"][unit]["machine"]
else:
non_leaders[name] = unit
# machine = status["units"][unit]["machine"]
# if machine not in completed_machines:
# non_leaders.append(machine)
return (leader, non_leaders)
async def prepare_series_upgrade(machine, to_series):
"""Execute juju series-upgrade prepare on machine.
NOTE: This is a new feature in juju behind a feature flag and not yet in
libjuju.
export JUJU_DEV_FEATURE_FLAGS=upgrade-series
:param machine_num: Machine number
:type machine_num: str
:param to_series: The series to which to upgrade
:type to_series: str
:returns: None
:rtype: None
"""
logging.debug("Preparing series upgrade for: {}".format(machine))
await series_upgrade_utils.async_prepare_series_upgrade(
machine, to_series=to_series)
async def reboot(unit):
"""Reboot the named machine.
:param unit: Machine to reboot
:type unit: str
:returns: Nothing
:rtype: None
"""
try:
await run_on_machine(unit, 'shutdown --reboot now & exit')
# await run_on_machine(unit, "sudo reboot && exit")
except subprocess.CalledProcessError as e:
logging.warn("Error doing reboot: {}".format(e))
pass
async def complete_series_upgrade(machines, to_series):
"""Execute juju series-upgrade complete on machine.
NOTE: This is a new feature in juju behind a feature flag and not yet in
libjuju.
export JUJU_DEV_FEATURE_FLAGS=upgrade-series
:param machine_num: Machine number
:type machine_num: str
:returns: None
:rtype: None
"""
logging.info("Completing series upgrade for {}".format(machines))
group = []
for machine in machines:
# This can fail on the non-leaders if the leader goes first?
group.append(
series_upgrade_utils.async_complete_series_upgrade(machine))
await asyncio.gather(*group)
async def run_on_machine(machine, command, model_name=None, timeout=None):
"""Juju run on unit.
:param model_name: Name of model unit is in
:type model_name: str
:param unit_name: Name of unit to match
:type unit: str
:param command: Command to execute
:type command: str
:param timeout: How long in seconds to wait for command to complete
:type timeout: int
:returns: action.data['results'] {'Code': '', 'Stderr': '', 'Stdout': ''}
:rtype: dict
"""
cmd = ['juju', 'run', '--machine={}'.format(machine)]
if model_name:
cmd.append('--model={}'.format(model_name))
if timeout:
cmd.append('--timeout={}'.format(timeout))
cmd.append(command)
logging.debug("About to call '{}'".format(cmd))
await os_utils.check_call(cmd)
async def async_dist_upgrade(machine):
"""Run dist-upgrade on unit after update package db.
:param machine: Machine Number
:type machine: str
:returns: None
:rtype: None
"""
logging.info('Updating package db ' + machine)
update_cmd = 'sudo apt update'
await run_on_machine(machine, update_cmd)
logging.info('Updating existing packages ' + machine)
dist_upgrade_cmd = (
"""yes | sudo DEBIAN_FRONTEND=noninteractive apt --assume-yes """
"""-o "Dpkg::Options::=--force-confdef" """
"""-o "Dpkg::Options::=--force-confold" dist-upgrade""")
await run_on_machine(machine, dist_upgrade_cmd)
async def async_do_release_upgrade(machine):
"""Run do-release-upgrade noninteractive.
:param machine: Machine Name
:type machine: str
:returns: None
:rtype: None
"""
logging.info('Upgrading ' + machine)
do_release_upgrade_cmd = (
'yes | sudo DEBIAN_FRONTEND=noninteractive '
'do-release-upgrade -f DistUpgradeViewNonInteractive')
await run_on_machine(machine, do_release_upgrade_cmd, timeout="120m")

View File

@@ -20,15 +20,17 @@ import zaza.model
SERVICE_GROUPS = collections.OrderedDict([
('Stateful Services', ['percona-cluster', 'rabbitmq-server']),
('Core Identity', ['keystone']),
('Control Plane', [
'aodh', 'barbican', 'ceilometer', 'ceph-mon', 'ceph-fs',
'aodh', 'barbican', 'ceilometer', 'ceph-fs',
'ceph-radosgw', 'cinder', 'designate',
'designate-bind', 'glance', 'gnocchi', 'heat', 'manila',
'manila-generic', 'neutron-api', 'neutron-gateway', 'placement',
'nova-cloud-controller', 'openstack-dashboard']),
('Data Plane', [
'nova-compute', 'ceph-osd', 'swift-proxy', 'swift-storage'])
'nova-compute', 'ceph-mon', 'ceph-osd',
'swift-proxy', 'swift-storage'])
])
UPGRADE_EXCLUDE_LIST = ['rabbitmq-server', 'percona-cluster']