Concurrent series upgrade updates (#466)

* Updates to concurrent series upgrade

Updates to make it run more in parallel and spend less time waiting on
the whole model when updating machines.

* Make the concurrent series upgrade tests work

This is a number of changes to get the concurrent (here called
'parallel' historically) series upgrade tests to work.  A number of
changes were required which included limiting the number of concurrent
async co-routines (futures) that could be run as with large models it
hits the limits of the Py3 runtime.

* Fix the tests and change pause order in maybe_pause_things

Due to an additional model helper call, an additional model AsyncMock is
required.  Also the pause order had changed, and this is restored to
ensure the original design is retained (for pause order).

Clean up some commented out code and sort out a few PEP8 errors.

* Update comment to reflect code (3 -> 4)

* Fix tests that fail on bionic but pass on focal

Essentially, asyncio.gather has different behaviour on bionic that
focal.  Although this doesn't affect testing, it does affect the unit
tests.  These changes are simply to normalise the behaviour of unit
tests on focal and bionic.
This commit is contained in:
Alex Kavanagh
2020-12-02 10:22:00 +00:00
committed by GitHub
parent 7f45d461e7
commit 7ced54b382
8 changed files with 220 additions and 66 deletions
@@ -182,6 +182,8 @@ class TestParallelSeriesUpgrade(AioTestCase):
self.model.async_wait_for_unit_idle = mock.AsyncMock()
self.async_run_on_machine = mock.AsyncMock()
self.model.async_run_on_machine = self.async_run_on_machine
self.model.async_block_until_units_on_machine_are_idle = \
mock.AsyncMock()
@mock.patch.object(upgrade_utils.cl_utils, 'get_class')
async def test_run_post_application_upgrade_functions(
@@ -492,7 +494,13 @@ class TestParallelSeriesUpgrade(AioTestCase):
mock_remove_confdef_file.assert_called_once_with('1')
mock_add_confdef_file.assert_called_once_with('1')
async def test_maybe_pause_things_primary(self):
@mock.patch("asyncio.gather")
async def test_maybe_pause_things_primary(self, mock_gather):
async def _gather(*args):
for f in args:
await f
mock_gather.side_effect = _gather
await upgrade_utils.maybe_pause_things(
FAKE_STATUS,
['app/1', 'app/2'],
@@ -503,7 +511,13 @@ class TestParallelSeriesUpgrade(AioTestCase):
mock.call('app/2', "pause", action_params={}),
])
async def test_maybe_pause_things_subordinates(self):
@mock.patch("asyncio.gather")
async def test_maybe_pause_things_subordinates(self, mock_gather):
async def _gather(*args):
for f in args:
await f
mock_gather.side_effect = _gather
await upgrade_utils.maybe_pause_things(
FAKE_STATUS,
['app/1', 'app/2'],
@@ -514,7 +528,13 @@ class TestParallelSeriesUpgrade(AioTestCase):
mock.call('app-hacluster/2', "pause", action_params={}),
])
async def test_maybe_pause_things_all(self):
@mock.patch("asyncio.gather")
async def test_maybe_pause_things_all(self, mock_gather):
async def _gather(*args):
for f in args:
await f
mock_gather.side_effect = _gather
await upgrade_utils.maybe_pause_things(
FAKE_STATUS,
['app/1', 'app/2'],
@@ -35,6 +35,7 @@ class FullCloudCharmUpgradeTest(unittest.TestCase):
"""Run setup for Charm Upgrades."""
cli_utils.setup_logging()
cls.lts = LTSGuestCreateTest()
cls.lts.setUpClass()
cls.target_charm_namespace = '~openstack-charmers-next'
def get_upgrade_url(self, charm_url):
+13 -5
View File
@@ -19,8 +19,16 @@ import zaza.model as model
async def complete_cluster_series_upgrade():
"""Run the complete-cluster-series-upgrade action on the lead unit."""
# TODO: Make this work across either mysql or percona-cluster names
await model.async_run_action_on_leader(
'mysql',
'complete-cluster-series-upgrade',
action_params={})
# Note that some models use mysql as the application name, and other's use
# percona-cluster. Try mysql first, and if it doesn't exist, then try
# percona-cluster instead.
try:
await model.async_run_action_on_leader(
'mysql',
'complete-cluster-series-upgrade',
action_params={})
except KeyError:
await model.async_run_action_on_leader(
'percona-cluster',
'complete-cluster-series-upgrade',
action_params={})
@@ -21,6 +21,7 @@ import logging
import os
import sys
import unittest
import juju
from zaza import model
from zaza.openstack.utilities import (
@@ -55,6 +56,12 @@ class ParallelSeriesUpgradeTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""Run setup for Series Upgrades."""
# NOTE(ajkavanagh): Set the jujulib Connection frame size to 4GB to
# cope with all the outputs from series upgrade; long term, don't send
# that output back, which will require that the upgrade function in the
# charm doesn't capture the output of the upgrade in the action, but
# instead puts it somewhere that can by "juju scp"ed.
juju.client.connection.Connection.MAX_FRAME_SIZE = 2**32
cli_utils.setup_logging()
cls.from_series = None
cls.to_series = None
@@ -89,19 +96,25 @@ class ParallelSeriesUpgradeTest(unittest.TestCase):
upgrade_function = \
parallel_series_upgrade.parallel_series_upgrade
# allow up to 4 parallel upgrades at a time. This is to limit the
# amount of data/calls that asyncio is handling as it's gets
# unstable if all the applications are done at the same time.
sem = asyncio.Semaphore(4)
for charm_name in apps:
charm = applications[charm_name]['charm']
name = upgrade_utils.extract_charm_name_from_url(charm)
upgrade_config = parallel_series_upgrade.app_config(name)
upgrade_functions.append(
upgrade_function(
charm_name,
**upgrade_config,
from_series=from_series,
to_series=to_series,
completed_machines=completed_machines,
workaround_script=workaround_script,
files=files))
wrap_coroutine_with_sem(
sem,
upgrade_function(
charm_name,
**upgrade_config,
from_series=from_series,
to_series=to_series,
completed_machines=completed_machines,
workaround_script=workaround_script,
files=files)))
asyncio.get_event_loop().run_until_complete(
asyncio.gather(*upgrade_functions))
model.block_until_all_units_idle()
@@ -109,6 +122,18 @@ class ParallelSeriesUpgradeTest(unittest.TestCase):
logging.info("Done!")
async def wrap_coroutine_with_sem(sem, coroutine):
"""Wrap a coroutine with a semaphore to limit concurrency.
:param sem: The semaphore to limit concurrency
:type sem: asyncio.Semaphore
:param coroutine: the corouting to limit concurrency
:type coroutine: types.CoroutineType
"""
async with sem:
await coroutine
class OpenStackParallelSeriesUpgrade(ParallelSeriesUpgradeTest):
"""OpenStack Series Upgrade.
+24 -2
View File
@@ -120,7 +120,23 @@ class BaseCharmTest(unittest.TestCase):
@classmethod
def setUpClass(cls, application_name=None, model_alias=None):
"""Run setup for test class to create common resources."""
"""Run setup for test class to create common resources.
Note: the derived class may not use the application_name; if it's set
to None then this setUpClass() method will attempt to extract the
application name from the charm_config (essentially the test.yaml)
using the key 'charm_name' in the test_config. If that isn't present,
then there will be no application_name set, and this is considered a
generic scenario of a whole model rather than a particular charm under
test.
:param application_name: the name of the applications that the derived
class is testing. If None, then it's a generic test not connected
to any single charm.
:type application_name: Optional[str]
:param model_alias: the alias to use if needed.
:type model_alias: Optional[str]
"""
cls.model_aliases = model.get_juju_model_aliases()
if model_alias:
cls.model_name = cls.model_aliases[model_alias]
@@ -131,7 +147,13 @@ class BaseCharmTest(unittest.TestCase):
if application_name:
cls.application_name = application_name
else:
charm_under_test_name = cls.test_config['charm_name']
try:
charm_under_test_name = cls.test_config['charm_name']
except KeyError:
logging.warning("No application_name and no charm config so "
"not setting the application_name. Likely a "
"scenario test.")
return
deployed_app_names = model.sync_deployed(model_name=cls.model_name)
if charm_under_test_name in deployed_app_names:
# There is an application named like the charm under test.
+37
View File
@@ -26,6 +26,7 @@ import zaza.model
import zaza.openstack.utilities.cert
import zaza.openstack.utilities.openstack
import zaza.openstack.utilities.generic
import zaza.openstack.utilities.exceptions as zaza_exceptions
import zaza.utilities.juju as juju_utils
@@ -73,9 +74,27 @@ def basic_setup_and_unseal(cacert=None):
zaza.model.run_on_unit(unit.name, './hooks/update-status')
async def mojo_or_default_unseal_by_unit():
"""Unseal any units reported as sealed using a cacert.
The mojo cacert is tried first, and if that doesn't exist, then the default
zaza located cacert is used.
"""
try:
await mojo_unseal_by_unit()
except zaza_exceptions.CACERTNotFound:
await unseal_by_unit()
def mojo_unseal_by_unit():
"""Unseal any units reported as sealed using mojo cacert."""
cacert = zaza.openstack.utilities.generic.get_mojo_cacert_path()
unseal_by_unit(cacert)
def unseal_by_unit(cacert=None):
"""Unseal any units reported as sealed using mojo cacert."""
cacert = cacert or get_cacert_file()
vault_creds = vault_utils.get_credentails()
for client in vault_utils.get_clients(cacert=cacert):
if client.hvac_client.is_sealed():
@@ -86,9 +105,27 @@ def mojo_unseal_by_unit():
zaza.model.run_on_unit(unit_name, './hooks/update-status')
async def async_mojo_or_default_unseal_by_unit():
"""Unseal any units reported as sealed using a cacert.
The mojo cacert is tried first, and if that doesn't exist, then the default
zaza located cacert is used.
"""
try:
await async_mojo_unseal_by_unit()
except zaza_exceptions.CACERTNotFound:
await async_unseal_by_unit()
async def async_mojo_unseal_by_unit():
"""Unseal any units reported as sealed using mojo cacert."""
cacert = zaza.openstack.utilities.generic.get_mojo_cacert_path()
await async_unseal_by_unit(cacert)
async def async_unseal_by_unit(cacert=None):
"""Unseal any units reported as sealed using vault cacert."""
cacert = cacert or get_cacert_file()
vault_creds = vault_utils.get_credentails()
for client in vault_utils.get_clients(cacert=cacert):
if client.hvac_client.is_sealed():
@@ -58,7 +58,9 @@ def app_config(charm_name):
}
exceptions = {
'rabbitmq-server': {
'origin': 'source',
# NOTE: AJK disable config-changed on rabbitmq-server due to bug:
# #1896520
'origin': None,
'pause_non_leader_subordinate': False,
'post_application_upgrade_functions': [
('zaza.openstack.charm_tests.rabbitmq_server.utils.'
@@ -94,7 +96,7 @@ def app_config(charm_name):
'pause_non_leader_subordinate': True,
'post_upgrade_functions': [
('zaza.openstack.charm_tests.vault.setup.'
'async_mojo_unseal_by_unit')]
'async_mojo_or_default_unseal_by_unit')]
},
'mongodb': {
'origin': None,
@@ -191,49 +193,64 @@ async def parallel_series_upgrade(
status = (await model.async_get_status()).applications[application]
logging.info(
"Configuring leader / non leaders for {}".format(application))
leader, non_leaders = get_leader_and_non_leaders(status)
for leader_name, leader_unit in leader.items():
leaders, non_leaders = get_leader_and_non_leaders(status)
for leader_unit in leaders.values():
leader_machine = leader_unit["machine"]
leader = leader_name
machines = [
unit["machine"] for name, unit
in non_leaders.items()
if unit['machine'] not in completed_machines]
machines = [unit["machine"] for name, unit in non_leaders.items()
if unit['machine'] not in completed_machines]
await maybe_pause_things(
status,
non_leaders,
pause_non_leader_subordinate,
pause_non_leader_primary)
await series_upgrade_utils.async_set_series(
application, to_series=to_series)
app_idle = [
# wait for the entire application set to be idle before starting upgrades
await asyncio.gather(*[
model.async_wait_for_unit_idle(unit, include_subordinates=True)
for unit in status["units"]
]
await asyncio.gather(*app_idle)
for unit in status["units"]])
await prepare_series_upgrade(leader_machine, to_series=to_series)
prepare_group = [
prepare_series_upgrade(machine, to_series=to_series)
for machine in machines]
await asyncio.gather(*prepare_group)
await asyncio.gather(*[
wait_for_idle_then_prepare_series_upgrade(
machine, to_series=to_series)
for machine in machines])
if leader_machine not in completed_machines:
machines.append(leader_machine)
upgrade_group = [
await asyncio.gather(*[
series_upgrade_machine(
machine,
origin=origin,
application=application,
files=files, workaround_script=workaround_script,
post_upgrade_functions=post_upgrade_functions)
for machine in machines
]
await asyncio.gather(*upgrade_group)
for machine in machines])
completed_machines.extend(machines)
await series_upgrade_utils.async_set_series(
application, to_series=to_series)
await run_post_application_upgrade_functions(
post_application_upgrade_functions)
async def wait_for_idle_then_prepare_series_upgrade(
machine, to_series, model_name=None):
"""Wait for the units to idle the do prepare_series_upgrade.
We need to be sure that all the units are idle prior to actually calling
prepare_series_upgrade() as otherwise the call will fail. It has to be
checked because when the leader is paused it may kick off relation hooks in
the other units in an HA group.
:param machine: the machine that is going to be prepared
:type machine: str
:param to_series: The series to which to upgrade
:type to_series: str
:param model_name: Name of model to query.
:type model_name: str
"""
await model.async_block_until_units_on_machine_are_idle(
machine, model_name=model_name)
await prepare_series_upgrade(machine, to_series=to_series)
async def serial_series_upgrade(
application,
from_series='xenial',
@@ -307,8 +324,10 @@ async def serial_series_upgrade(
non_leaders,
pause_non_leader_subordinate,
pause_non_leader_primary)
logging.info("Finishing pausing application: {}".format(application))
await series_upgrade_utils.async_set_series(
application, to_series=to_series)
logging.info("Finished set series for application: {}".format(application))
if not follower_first and leader_machine not in completed_machines:
await model.async_wait_for_unit_idle(leader, include_subordinates=True)
await prepare_series_upgrade(leader_machine, to_series=to_series)
@@ -321,6 +340,8 @@ async def serial_series_upgrade(
files=files, workaround_script=workaround_script,
post_upgrade_functions=post_upgrade_functions)
completed_machines.append(leader_machine)
logging.info("Finished upgrading of leader for application: {}"
.format(application))
# for machine in machines:
for unit_name, unit in non_leaders.items():
@@ -339,6 +360,8 @@ async def serial_series_upgrade(
files=files, workaround_script=workaround_script,
post_upgrade_functions=post_upgrade_functions)
completed_machines.append(machine)
logging.info("Finished upgrading non leaders for application: {}"
.format(application))
if follower_first and leader_machine not in completed_machines:
await model.async_wait_for_unit_idle(leader, include_subordinates=True)
@@ -354,6 +377,7 @@ async def serial_series_upgrade(
completed_machines.append(leader_machine)
await run_post_application_upgrade_functions(
post_application_upgrade_functions)
logging.info("Done series upgrade for: {}".format(application))
async def series_upgrade_machine(
@@ -381,17 +405,16 @@ async def series_upgrade_machine(
:returns: None
:rtype: None
"""
logging.info(
"About to series-upgrade ({})".format(machine))
logging.info("About to series-upgrade ({})".format(machine))
await run_pre_upgrade_functions(machine, pre_upgrade_functions)
await add_confdef_file(machine)
await async_dist_upgrade(machine)
await async_do_release_upgrade(machine)
await remove_confdef_file(machine)
await reboot(machine)
await series_upgrade_utils.async_complete_series_upgrade(machine)
if origin:
await os_utils.async_set_origin(application, origin)
await series_upgrade_utils.async_complete_series_upgrade(machine)
await run_post_upgrade_functions(post_upgrade_functions)
@@ -484,8 +507,7 @@ async def maybe_pause_things(
:returns: Nothing
:trype: None
"""
subordinate_pauses = []
leader_pauses = []
unit_pauses = []
for unit in units:
if pause_non_leader_subordinate:
if status["units"][unit].get("subordinates"):
@@ -495,15 +517,19 @@ async def maybe_pause_things(
logging.info("Skipping pausing {} - blacklisted"
.format(subordinate))
else:
logging.info("Pausing {}".format(subordinate))
subordinate_pauses.append(model.async_run_action(
subordinate, "pause", action_params={}))
unit_pauses.append(
_pause_helper("subordinate", subordinate))
if pause_non_leader_primary:
logging.info("Pausing {}".format(unit))
leader_pauses.append(
model.async_run_action(unit, "pause", action_params={}))
await asyncio.gather(*leader_pauses)
await asyncio.gather(*subordinate_pauses)
unit_pauses.append(_pause_helper("leader", unit))
if unit_pauses:
await asyncio.gather(*unit_pauses)
async def _pause_helper(_type, unit):
"""Pause helper to ensure that the log happens nearer to the action."""
logging.info("Pausing ({}) {}".format(_type, unit))
await model.async_run_action(unit, "pause", action_params={})
logging.info("Finished Pausing ({}) {}".format(_type, unit))
def get_leader_and_non_leaders(status):
@@ -541,14 +567,14 @@ async def prepare_series_upgrade(machine, to_series):
NOTE: This is a new feature in juju behind a feature flag and not yet in
libjuju.
export JUJU_DEV_FEATURE_FLAGS=upgrade-series
:param machine_num: Machine number
:type machine_num: str
:param machine: Machine number
:type machine: str
:param to_series: The series to which to upgrade
:type to_series: str
:returns: None
:rtype: None
"""
logging.info("Preparing series upgrade for: {}".format(machine))
logging.info("Preparing series upgrade for: %s", machine)
await series_upgrade_utils.async_prepare_series_upgrade(
machine, to_series=to_series)
@@ -564,9 +590,8 @@ async def reboot(machine):
try:
await model.async_run_on_machine(machine, 'sudo init 6 & exit')
# await run_on_machine(unit, "sudo reboot && exit")
except subprocess.CalledProcessError as e:
logging.warn("Error doing reboot: {}".format(e))
pass
except subprocess.CalledProcessError as error:
logging.warning("Error doing reboot: %s", error)
async def async_dist_upgrade(machine):
@@ -577,16 +602,31 @@ async def async_dist_upgrade(machine):
:returns: None
:rtype: None
"""
logging.info('Updating package db ' + machine)
logging.info('Updating package db %s', machine)
update_cmd = 'sudo apt-get update'
await model.async_run_on_machine(machine, update_cmd)
logging.info('Updating existing packages ' + machine)
logging.info('Updating existing packages %s', machine)
dist_upgrade_cmd = (
"""yes | sudo DEBIAN_FRONTEND=noninteractive apt-get --assume-yes """
"""-o "Dpkg::Options::=--force-confdef" """
"""-o "Dpkg::Options::=--force-confold" dist-upgrade""")
await model.async_run_on_machine(machine, dist_upgrade_cmd)
rdict = await model.async_run_on_machine(
machine,
"cat /var/run/reboot-required || true")
if "Stdout" in rdict and "restart" in rdict["Stdout"].lower():
logging.info("dist-upgrade required reboot machine: %s", machine)
await reboot(machine)
logging.info("Waiting for machine to come back afer reboot: %s",
machine)
await model.async_block_until_file_missing_on_machine(
machine, "/var/run/reboot-required")
logging.info("Waiting for machine idleness on %s", machine)
await asyncio.sleep(5.0)
await model.async_block_until_units_on_machine_are_idle(machine)
# TODO: change this to wait on units on the machine
# await model.async_block_until_all_units_idle()
async def async_do_release_upgrade(machine):
@@ -597,7 +637,7 @@ async def async_do_release_upgrade(machine):
:returns: None
:rtype: None
"""
logging.info('Upgrading ' + machine)
logging.info('Upgrading %s', machine)
do_release_upgrade_cmd = (
'yes | sudo DEBIAN_FRONTEND=noninteractive '
'do-release-upgrade -f DistUpgradeViewNonInteractive')
+4 -3
View File
@@ -884,7 +884,8 @@ def dist_upgrade(unit_name):
"""-o "Dpkg::Options::=--force-confdef" """
"""-o "Dpkg::Options::=--force-confold" dist-upgrade""")
model.run_on_unit(unit_name, dist_upgrade_cmd)
rdict = model.run_on_unit(unit_name, "cat /var/run/reboot-required")
rdict = model.run_on_unit(unit_name,
"cat /var/run/reboot-required || true")
if "Stdout" in rdict and "restart" in rdict["Stdout"].lower():
logging.info("dist-upgrade required reboot {}".format(unit_name))
os_utils.reboot(unit_name)
@@ -919,8 +920,8 @@ async def async_dist_upgrade(unit_name):
"""-o "Dpkg::Options::=--force-confdef" """
"""-o "Dpkg::Options::=--force-confold" dist-upgrade""")
await model.async_run_on_unit(unit_name, dist_upgrade_cmd)
rdict = await model.async_run_on_unit(unit_name,
"cat /var/run/reboot-required")
rdict = await model.async_run_on_unit(
unit_name, "cat /var/run/reboot-required || true")
if "Stdout" in rdict and "restart" in rdict["Stdout"].lower():
logging.info("dist-upgrade required reboot {}".format(unit_name))
await os_utils.async_reboot(unit_name)