diff --git a/unit_tests/utilities/test_zaza_utilities_parallel_series_upgrade.py b/unit_tests/utilities/test_zaza_utilities_parallel_series_upgrade.py index 457ce1b..0231dbf 100644 --- a/unit_tests/utilities/test_zaza_utilities_parallel_series_upgrade.py +++ b/unit_tests/utilities/test_zaza_utilities_parallel_series_upgrade.py @@ -182,6 +182,8 @@ class TestParallelSeriesUpgrade(AioTestCase): self.model.async_wait_for_unit_idle = mock.AsyncMock() self.async_run_on_machine = mock.AsyncMock() self.model.async_run_on_machine = self.async_run_on_machine + self.model.async_block_until_units_on_machine_are_idle = \ + mock.AsyncMock() @mock.patch.object(upgrade_utils.cl_utils, 'get_class') async def test_run_post_application_upgrade_functions( @@ -492,7 +494,13 @@ class TestParallelSeriesUpgrade(AioTestCase): mock_remove_confdef_file.assert_called_once_with('1') mock_add_confdef_file.assert_called_once_with('1') - async def test_maybe_pause_things_primary(self): + @mock.patch("asyncio.gather") + async def test_maybe_pause_things_primary(self, mock_gather): + async def _gather(*args): + for f in args: + await f + + mock_gather.side_effect = _gather await upgrade_utils.maybe_pause_things( FAKE_STATUS, ['app/1', 'app/2'], @@ -503,7 +511,13 @@ class TestParallelSeriesUpgrade(AioTestCase): mock.call('app/2', "pause", action_params={}), ]) - async def test_maybe_pause_things_subordinates(self): + @mock.patch("asyncio.gather") + async def test_maybe_pause_things_subordinates(self, mock_gather): + async def _gather(*args): + for f in args: + await f + + mock_gather.side_effect = _gather await upgrade_utils.maybe_pause_things( FAKE_STATUS, ['app/1', 'app/2'], @@ -514,7 +528,13 @@ class TestParallelSeriesUpgrade(AioTestCase): mock.call('app-hacluster/2', "pause", action_params={}), ]) - async def test_maybe_pause_things_all(self): + @mock.patch("asyncio.gather") + async def test_maybe_pause_things_all(self, mock_gather): + async def _gather(*args): + for f in args: + await f + + mock_gather.side_effect = _gather await upgrade_utils.maybe_pause_things( FAKE_STATUS, ['app/1', 'app/2'], diff --git a/zaza/openstack/charm_tests/charm_upgrade/tests.py b/zaza/openstack/charm_tests/charm_upgrade/tests.py index f5f301a..f55caf0 100644 --- a/zaza/openstack/charm_tests/charm_upgrade/tests.py +++ b/zaza/openstack/charm_tests/charm_upgrade/tests.py @@ -35,6 +35,7 @@ class FullCloudCharmUpgradeTest(unittest.TestCase): """Run setup for Charm Upgrades.""" cli_utils.setup_logging() cls.lts = LTSGuestCreateTest() + cls.lts.setUpClass() cls.target_charm_namespace = '~openstack-charmers-next' def get_upgrade_url(self, charm_url): diff --git a/zaza/openstack/charm_tests/mysql/utils.py b/zaza/openstack/charm_tests/mysql/utils.py index 1fe5114..75da5d9 100644 --- a/zaza/openstack/charm_tests/mysql/utils.py +++ b/zaza/openstack/charm_tests/mysql/utils.py @@ -19,8 +19,16 @@ import zaza.model as model async def complete_cluster_series_upgrade(): """Run the complete-cluster-series-upgrade action on the lead unit.""" - # TODO: Make this work across either mysql or percona-cluster names - await model.async_run_action_on_leader( - 'mysql', - 'complete-cluster-series-upgrade', - action_params={}) + # Note that some models use mysql as the application name, and other's use + # percona-cluster. Try mysql first, and if it doesn't exist, then try + # percona-cluster instead. + try: + await model.async_run_action_on_leader( + 'mysql', + 'complete-cluster-series-upgrade', + action_params={}) + except KeyError: + await model.async_run_action_on_leader( + 'percona-cluster', + 'complete-cluster-series-upgrade', + action_params={}) diff --git a/zaza/openstack/charm_tests/series_upgrade/parallel_tests.py b/zaza/openstack/charm_tests/series_upgrade/parallel_tests.py index a49eae6..e524fa2 100644 --- a/zaza/openstack/charm_tests/series_upgrade/parallel_tests.py +++ b/zaza/openstack/charm_tests/series_upgrade/parallel_tests.py @@ -21,6 +21,7 @@ import logging import os import sys import unittest +import juju from zaza import model from zaza.openstack.utilities import ( @@ -55,6 +56,12 @@ class ParallelSeriesUpgradeTest(unittest.TestCase): @classmethod def setUpClass(cls): """Run setup for Series Upgrades.""" + # NOTE(ajkavanagh): Set the jujulib Connection frame size to 4GB to + # cope with all the outputs from series upgrade; long term, don't send + # that output back, which will require that the upgrade function in the + # charm doesn't capture the output of the upgrade in the action, but + # instead puts it somewhere that can by "juju scp"ed. + juju.client.connection.Connection.MAX_FRAME_SIZE = 2**32 cli_utils.setup_logging() cls.from_series = None cls.to_series = None @@ -89,19 +96,25 @@ class ParallelSeriesUpgradeTest(unittest.TestCase): upgrade_function = \ parallel_series_upgrade.parallel_series_upgrade + # allow up to 4 parallel upgrades at a time. This is to limit the + # amount of data/calls that asyncio is handling as it's gets + # unstable if all the applications are done at the same time. + sem = asyncio.Semaphore(4) for charm_name in apps: charm = applications[charm_name]['charm'] name = upgrade_utils.extract_charm_name_from_url(charm) upgrade_config = parallel_series_upgrade.app_config(name) upgrade_functions.append( - upgrade_function( - charm_name, - **upgrade_config, - from_series=from_series, - to_series=to_series, - completed_machines=completed_machines, - workaround_script=workaround_script, - files=files)) + wrap_coroutine_with_sem( + sem, + upgrade_function( + charm_name, + **upgrade_config, + from_series=from_series, + to_series=to_series, + completed_machines=completed_machines, + workaround_script=workaround_script, + files=files))) asyncio.get_event_loop().run_until_complete( asyncio.gather(*upgrade_functions)) model.block_until_all_units_idle() @@ -109,6 +122,18 @@ class ParallelSeriesUpgradeTest(unittest.TestCase): logging.info("Done!") +async def wrap_coroutine_with_sem(sem, coroutine): + """Wrap a coroutine with a semaphore to limit concurrency. + + :param sem: The semaphore to limit concurrency + :type sem: asyncio.Semaphore + :param coroutine: the corouting to limit concurrency + :type coroutine: types.CoroutineType + """ + async with sem: + await coroutine + + class OpenStackParallelSeriesUpgrade(ParallelSeriesUpgradeTest): """OpenStack Series Upgrade. diff --git a/zaza/openstack/charm_tests/test_utils.py b/zaza/openstack/charm_tests/test_utils.py index 14ee0f3..5a3e6c1 100644 --- a/zaza/openstack/charm_tests/test_utils.py +++ b/zaza/openstack/charm_tests/test_utils.py @@ -120,7 +120,23 @@ class BaseCharmTest(unittest.TestCase): @classmethod def setUpClass(cls, application_name=None, model_alias=None): - """Run setup for test class to create common resources.""" + """Run setup for test class to create common resources. + + Note: the derived class may not use the application_name; if it's set + to None then this setUpClass() method will attempt to extract the + application name from the charm_config (essentially the test.yaml) + using the key 'charm_name' in the test_config. If that isn't present, + then there will be no application_name set, and this is considered a + generic scenario of a whole model rather than a particular charm under + test. + + :param application_name: the name of the applications that the derived + class is testing. If None, then it's a generic test not connected + to any single charm. + :type application_name: Optional[str] + :param model_alias: the alias to use if needed. + :type model_alias: Optional[str] + """ cls.model_aliases = model.get_juju_model_aliases() if model_alias: cls.model_name = cls.model_aliases[model_alias] @@ -131,7 +147,13 @@ class BaseCharmTest(unittest.TestCase): if application_name: cls.application_name = application_name else: - charm_under_test_name = cls.test_config['charm_name'] + try: + charm_under_test_name = cls.test_config['charm_name'] + except KeyError: + logging.warning("No application_name and no charm config so " + "not setting the application_name. Likely a " + "scenario test.") + return deployed_app_names = model.sync_deployed(model_name=cls.model_name) if charm_under_test_name in deployed_app_names: # There is an application named like the charm under test. diff --git a/zaza/openstack/charm_tests/vault/setup.py b/zaza/openstack/charm_tests/vault/setup.py index 4db90b5..adfb92d 100644 --- a/zaza/openstack/charm_tests/vault/setup.py +++ b/zaza/openstack/charm_tests/vault/setup.py @@ -26,6 +26,7 @@ import zaza.model import zaza.openstack.utilities.cert import zaza.openstack.utilities.openstack import zaza.openstack.utilities.generic +import zaza.openstack.utilities.exceptions as zaza_exceptions import zaza.utilities.juju as juju_utils @@ -73,9 +74,27 @@ def basic_setup_and_unseal(cacert=None): zaza.model.run_on_unit(unit.name, './hooks/update-status') +async def mojo_or_default_unseal_by_unit(): + """Unseal any units reported as sealed using a cacert. + + The mojo cacert is tried first, and if that doesn't exist, then the default + zaza located cacert is used. + """ + try: + await mojo_unseal_by_unit() + except zaza_exceptions.CACERTNotFound: + await unseal_by_unit() + + def mojo_unseal_by_unit(): """Unseal any units reported as sealed using mojo cacert.""" cacert = zaza.openstack.utilities.generic.get_mojo_cacert_path() + unseal_by_unit(cacert) + + +def unseal_by_unit(cacert=None): + """Unseal any units reported as sealed using mojo cacert.""" + cacert = cacert or get_cacert_file() vault_creds = vault_utils.get_credentails() for client in vault_utils.get_clients(cacert=cacert): if client.hvac_client.is_sealed(): @@ -86,9 +105,27 @@ def mojo_unseal_by_unit(): zaza.model.run_on_unit(unit_name, './hooks/update-status') +async def async_mojo_or_default_unseal_by_unit(): + """Unseal any units reported as sealed using a cacert. + + The mojo cacert is tried first, and if that doesn't exist, then the default + zaza located cacert is used. + """ + try: + await async_mojo_unseal_by_unit() + except zaza_exceptions.CACERTNotFound: + await async_unseal_by_unit() + + async def async_mojo_unseal_by_unit(): """Unseal any units reported as sealed using mojo cacert.""" cacert = zaza.openstack.utilities.generic.get_mojo_cacert_path() + await async_unseal_by_unit(cacert) + + +async def async_unseal_by_unit(cacert=None): + """Unseal any units reported as sealed using vault cacert.""" + cacert = cacert or get_cacert_file() vault_creds = vault_utils.get_credentails() for client in vault_utils.get_clients(cacert=cacert): if client.hvac_client.is_sealed(): diff --git a/zaza/openstack/utilities/parallel_series_upgrade.py b/zaza/openstack/utilities/parallel_series_upgrade.py index aa6ab0e..610496d 100755 --- a/zaza/openstack/utilities/parallel_series_upgrade.py +++ b/zaza/openstack/utilities/parallel_series_upgrade.py @@ -58,7 +58,9 @@ def app_config(charm_name): } exceptions = { 'rabbitmq-server': { - 'origin': 'source', + # NOTE: AJK disable config-changed on rabbitmq-server due to bug: + # #1896520 + 'origin': None, 'pause_non_leader_subordinate': False, 'post_application_upgrade_functions': [ ('zaza.openstack.charm_tests.rabbitmq_server.utils.' @@ -94,7 +96,7 @@ def app_config(charm_name): 'pause_non_leader_subordinate': True, 'post_upgrade_functions': [ ('zaza.openstack.charm_tests.vault.setup.' - 'async_mojo_unseal_by_unit')] + 'async_mojo_or_default_unseal_by_unit')] }, 'mongodb': { 'origin': None, @@ -191,49 +193,64 @@ async def parallel_series_upgrade( status = (await model.async_get_status()).applications[application] logging.info( "Configuring leader / non leaders for {}".format(application)) - leader, non_leaders = get_leader_and_non_leaders(status) - for leader_name, leader_unit in leader.items(): + leaders, non_leaders = get_leader_and_non_leaders(status) + for leader_unit in leaders.values(): leader_machine = leader_unit["machine"] - leader = leader_name - machines = [ - unit["machine"] for name, unit - in non_leaders.items() - if unit['machine'] not in completed_machines] + machines = [unit["machine"] for name, unit in non_leaders.items() + if unit['machine'] not in completed_machines] await maybe_pause_things( status, non_leaders, pause_non_leader_subordinate, pause_non_leader_primary) - await series_upgrade_utils.async_set_series( - application, to_series=to_series) - app_idle = [ + # wait for the entire application set to be idle before starting upgrades + await asyncio.gather(*[ model.async_wait_for_unit_idle(unit, include_subordinates=True) - for unit in status["units"] - ] - await asyncio.gather(*app_idle) + for unit in status["units"]]) await prepare_series_upgrade(leader_machine, to_series=to_series) - prepare_group = [ - prepare_series_upgrade(machine, to_series=to_series) - for machine in machines] - await asyncio.gather(*prepare_group) + await asyncio.gather(*[ + wait_for_idle_then_prepare_series_upgrade( + machine, to_series=to_series) + for machine in machines]) if leader_machine not in completed_machines: machines.append(leader_machine) - upgrade_group = [ + await asyncio.gather(*[ series_upgrade_machine( machine, origin=origin, application=application, files=files, workaround_script=workaround_script, post_upgrade_functions=post_upgrade_functions) - for machine in machines - ] - await asyncio.gather(*upgrade_group) + for machine in machines]) completed_machines.extend(machines) + await series_upgrade_utils.async_set_series( + application, to_series=to_series) await run_post_application_upgrade_functions( post_application_upgrade_functions) +async def wait_for_idle_then_prepare_series_upgrade( + machine, to_series, model_name=None): + """Wait for the units to idle the do prepare_series_upgrade. + + We need to be sure that all the units are idle prior to actually calling + prepare_series_upgrade() as otherwise the call will fail. It has to be + checked because when the leader is paused it may kick off relation hooks in + the other units in an HA group. + + :param machine: the machine that is going to be prepared + :type machine: str + :param to_series: The series to which to upgrade + :type to_series: str + :param model_name: Name of model to query. + :type model_name: str + """ + await model.async_block_until_units_on_machine_are_idle( + machine, model_name=model_name) + await prepare_series_upgrade(machine, to_series=to_series) + + async def serial_series_upgrade( application, from_series='xenial', @@ -307,8 +324,10 @@ async def serial_series_upgrade( non_leaders, pause_non_leader_subordinate, pause_non_leader_primary) + logging.info("Finishing pausing application: {}".format(application)) await series_upgrade_utils.async_set_series( application, to_series=to_series) + logging.info("Finished set series for application: {}".format(application)) if not follower_first and leader_machine not in completed_machines: await model.async_wait_for_unit_idle(leader, include_subordinates=True) await prepare_series_upgrade(leader_machine, to_series=to_series) @@ -321,6 +340,8 @@ async def serial_series_upgrade( files=files, workaround_script=workaround_script, post_upgrade_functions=post_upgrade_functions) completed_machines.append(leader_machine) + logging.info("Finished upgrading of leader for application: {}" + .format(application)) # for machine in machines: for unit_name, unit in non_leaders.items(): @@ -339,6 +360,8 @@ async def serial_series_upgrade( files=files, workaround_script=workaround_script, post_upgrade_functions=post_upgrade_functions) completed_machines.append(machine) + logging.info("Finished upgrading non leaders for application: {}" + .format(application)) if follower_first and leader_machine not in completed_machines: await model.async_wait_for_unit_idle(leader, include_subordinates=True) @@ -354,6 +377,7 @@ async def serial_series_upgrade( completed_machines.append(leader_machine) await run_post_application_upgrade_functions( post_application_upgrade_functions) + logging.info("Done series upgrade for: {}".format(application)) async def series_upgrade_machine( @@ -381,17 +405,16 @@ async def series_upgrade_machine( :returns: None :rtype: None """ - logging.info( - "About to series-upgrade ({})".format(machine)) + logging.info("About to series-upgrade ({})".format(machine)) await run_pre_upgrade_functions(machine, pre_upgrade_functions) await add_confdef_file(machine) await async_dist_upgrade(machine) await async_do_release_upgrade(machine) await remove_confdef_file(machine) await reboot(machine) + await series_upgrade_utils.async_complete_series_upgrade(machine) if origin: await os_utils.async_set_origin(application, origin) - await series_upgrade_utils.async_complete_series_upgrade(machine) await run_post_upgrade_functions(post_upgrade_functions) @@ -484,8 +507,7 @@ async def maybe_pause_things( :returns: Nothing :trype: None """ - subordinate_pauses = [] - leader_pauses = [] + unit_pauses = [] for unit in units: if pause_non_leader_subordinate: if status["units"][unit].get("subordinates"): @@ -495,15 +517,19 @@ async def maybe_pause_things( logging.info("Skipping pausing {} - blacklisted" .format(subordinate)) else: - logging.info("Pausing {}".format(subordinate)) - subordinate_pauses.append(model.async_run_action( - subordinate, "pause", action_params={})) + unit_pauses.append( + _pause_helper("subordinate", subordinate)) if pause_non_leader_primary: - logging.info("Pausing {}".format(unit)) - leader_pauses.append( - model.async_run_action(unit, "pause", action_params={})) - await asyncio.gather(*leader_pauses) - await asyncio.gather(*subordinate_pauses) + unit_pauses.append(_pause_helper("leader", unit)) + if unit_pauses: + await asyncio.gather(*unit_pauses) + + +async def _pause_helper(_type, unit): + """Pause helper to ensure that the log happens nearer to the action.""" + logging.info("Pausing ({}) {}".format(_type, unit)) + await model.async_run_action(unit, "pause", action_params={}) + logging.info("Finished Pausing ({}) {}".format(_type, unit)) def get_leader_and_non_leaders(status): @@ -541,14 +567,14 @@ async def prepare_series_upgrade(machine, to_series): NOTE: This is a new feature in juju behind a feature flag and not yet in libjuju. export JUJU_DEV_FEATURE_FLAGS=upgrade-series - :param machine_num: Machine number - :type machine_num: str + :param machine: Machine number + :type machine: str :param to_series: The series to which to upgrade :type to_series: str :returns: None :rtype: None """ - logging.info("Preparing series upgrade for: {}".format(machine)) + logging.info("Preparing series upgrade for: %s", machine) await series_upgrade_utils.async_prepare_series_upgrade( machine, to_series=to_series) @@ -564,9 +590,8 @@ async def reboot(machine): try: await model.async_run_on_machine(machine, 'sudo init 6 & exit') # await run_on_machine(unit, "sudo reboot && exit") - except subprocess.CalledProcessError as e: - logging.warn("Error doing reboot: {}".format(e)) - pass + except subprocess.CalledProcessError as error: + logging.warning("Error doing reboot: %s", error) async def async_dist_upgrade(machine): @@ -577,16 +602,31 @@ async def async_dist_upgrade(machine): :returns: None :rtype: None """ - logging.info('Updating package db ' + machine) + logging.info('Updating package db %s', machine) update_cmd = 'sudo apt-get update' await model.async_run_on_machine(machine, update_cmd) - logging.info('Updating existing packages ' + machine) + logging.info('Updating existing packages %s', machine) dist_upgrade_cmd = ( """yes | sudo DEBIAN_FRONTEND=noninteractive apt-get --assume-yes """ """-o "Dpkg::Options::=--force-confdef" """ """-o "Dpkg::Options::=--force-confold" dist-upgrade""") await model.async_run_on_machine(machine, dist_upgrade_cmd) + rdict = await model.async_run_on_machine( + machine, + "cat /var/run/reboot-required || true") + if "Stdout" in rdict and "restart" in rdict["Stdout"].lower(): + logging.info("dist-upgrade required reboot machine: %s", machine) + await reboot(machine) + logging.info("Waiting for machine to come back afer reboot: %s", + machine) + await model.async_block_until_file_missing_on_machine( + machine, "/var/run/reboot-required") + logging.info("Waiting for machine idleness on %s", machine) + await asyncio.sleep(5.0) + await model.async_block_until_units_on_machine_are_idle(machine) + # TODO: change this to wait on units on the machine + # await model.async_block_until_all_units_idle() async def async_do_release_upgrade(machine): @@ -597,7 +637,7 @@ async def async_do_release_upgrade(machine): :returns: None :rtype: None """ - logging.info('Upgrading ' + machine) + logging.info('Upgrading %s', machine) do_release_upgrade_cmd = ( 'yes | sudo DEBIAN_FRONTEND=noninteractive ' 'do-release-upgrade -f DistUpgradeViewNonInteractive') diff --git a/zaza/openstack/utilities/series_upgrade.py b/zaza/openstack/utilities/series_upgrade.py index 97ba153..42683f6 100644 --- a/zaza/openstack/utilities/series_upgrade.py +++ b/zaza/openstack/utilities/series_upgrade.py @@ -884,7 +884,8 @@ def dist_upgrade(unit_name): """-o "Dpkg::Options::=--force-confdef" """ """-o "Dpkg::Options::=--force-confold" dist-upgrade""") model.run_on_unit(unit_name, dist_upgrade_cmd) - rdict = model.run_on_unit(unit_name, "cat /var/run/reboot-required") + rdict = model.run_on_unit(unit_name, + "cat /var/run/reboot-required || true") if "Stdout" in rdict and "restart" in rdict["Stdout"].lower(): logging.info("dist-upgrade required reboot {}".format(unit_name)) os_utils.reboot(unit_name) @@ -919,8 +920,8 @@ async def async_dist_upgrade(unit_name): """-o "Dpkg::Options::=--force-confdef" """ """-o "Dpkg::Options::=--force-confold" dist-upgrade""") await model.async_run_on_unit(unit_name, dist_upgrade_cmd) - rdict = await model.async_run_on_unit(unit_name, - "cat /var/run/reboot-required") + rdict = await model.async_run_on_unit( + unit_name, "cat /var/run/reboot-required || true") if "Stdout" in rdict and "restart" in rdict["Stdout"].lower(): logging.info("dist-upgrade required reboot {}".format(unit_name)) await os_utils.async_reboot(unit_name)