From cbc14e32c625f2228dd0416f2c4de398e0f9f92e Mon Sep 17 00:00:00 2001 From: Liam Young Date: Tue, 22 May 2018 15:28:28 +0100 Subject: [PATCH 1/3] Another batch of zaza.model helpers This pr adds a number of new fuctions to zaza.model. As discussed zaza.model needs to be broken up as it has become too large. I would like to continue the discussion around that and submit a subsequent patch for that at a later date. --- unit_tests/test_zaza_model.py | 316 ++++++++++++++++++++++++++++++++++ zaza/model.py | 229 +++++++++++++++++++++++- 2 files changed, 543 insertions(+), 2 deletions(-) diff --git a/unit_tests/test_zaza_model.py b/unit_tests/test_zaza_model.py index 397613c..17d2635 100644 --- a/unit_tests/test_zaza_model.py +++ b/unit_tests/test_zaza_model.py @@ -441,6 +441,322 @@ class TestModel(ut_utils.BaseTestCase): self.unit1.scp_from.assert_called_once_with( '/tmp/src/myfile.txt', mock.ANY) + def test_async_block_until_all_units_idle(self): + + async def _block_until(f, timeout=None): + if not f(): + raise asyncio.futures.TimeoutError + + def _all_units_idle(): + return True + self.patch_object(model, 'Model') + self.Model.return_value = self.Model_mock + self.Model_mock.all_units_idle.side_effect = _all_units_idle + self.Model_mock.block_until.side_effect = _block_until + # Check exception is not raised: + model.block_until_all_units_idle('modelname') + + def test_async_block_until_all_units_idle_false(self): + + async def _block_until(f, timeout=None): + if not f(): + raise asyncio.futures.TimeoutError + + def _all_units_idle(): + return False + self.Model_mock.all_units_idle.side_effect = _all_units_idle + self.patch_object(model, 'Model') + self.Model.return_value = self.Model_mock + self.Model_mock.block_until.side_effect = _block_until + # Confirm exception is raised: + with self.assertRaises(asyncio.futures.TimeoutError): + model.block_until_all_units_idle('modelname') + + def block_until_service_status_base(self, rou_return): + + async def _block_until(f, timeout=None): + rc = await f() + if not rc: + raise asyncio.futures.TimeoutError + + async def _run_on_unit(model_name, unit_name, cmd, timeout=None): + return rou_return + self.patch_object(model, 'async_run_on_unit') + self.async_run_on_unit.side_effect = _run_on_unit + self.patch_object(model, 'Model') + self.Model.return_value = self.Model_mock + self.patch_object(model, 'async_block_until') + self.async_block_until.side_effect = _block_until + + def test_block_until_service_status_check_running(self): + self.block_until_service_status_base({'Stdout': '152 409 54'}) + model.block_until_service_status( + 'modelname', + 'app/2', + ['test_svc'], + 'running') + + def test_block_until_service_status_check_running_fail(self): + self.block_until_service_status_base({'Stdout': ''}) + with self.assertRaises(asyncio.futures.TimeoutError): + model.block_until_service_status( + 'modelname', + 'app/2', + ['test_svc'], + 'running') + + def test_block_until_service_status_check_stopped(self): + self.block_until_service_status_base({'Stdout': ''}) + model.block_until_service_status( + 'modelname', + 'app/2', + ['test_svc'], + 'stopped') + + def test_block_until_service_status_check_stopped_fail(self): + self.block_until_service_status_base({'Stdout': '152 409 54'}) + with self.assertRaises(asyncio.futures.TimeoutError): + model.block_until_service_status( + 'modelname', + 'app/2', + ['test_svc'], + 'stopped') + + def test_get_unit_time(self): + async def _run_on_unit(model_name, unit_name, cmd, timeout=None): + return {'Stdout': '1524409654'} + self.patch_object(model, 'async_run_on_unit') + self.async_run_on_unit.side_effect = _run_on_unit + self.assertEqual( + model.get_unit_time('mymodel', 'app/2'), + 1524409654) + + def test_get_unit_service_start_time(self): + async def _run_on_unit(model_name, unit_name, cmd, timeout=None): + return {'Stdout': '1524409654'} + self.patch_object(model, 'async_run_on_unit') + self.async_run_on_unit.side_effect = _run_on_unit + self.assertEqual( + model.get_unit_service_start_time('mymodel', 'app/2', 'mysvc1'), + 1524409654) + + def test_get_unit_service_start_time_not_running(self): + async def _run_on_unit(model_name, unit_name, cmd, timeout=None): + return {'Stdout': ''} + self.patch_object(model, 'async_run_on_unit') + self.async_run_on_unit.side_effect = _run_on_unit + with self.assertRaises(model.ServiceNotRunning): + model.get_unit_service_start_time('mymodel', 'app/2', 'mysvc1') + + def block_until_oslo_config_entries_match_base(self, file_contents, + expected_contents): + async def _scp_from(remote_file, tmpdir): + with open('{}/myfile.txt'.format(tmpdir), 'w') as f: + f.write(file_contents) + self.patch_object(model, 'Model') + self.Model.return_value = self.Model_mock + self.unit1.scp_from.side_effect = _scp_from + self.unit2.scp_from.side_effect = _scp_from + model.block_until_oslo_config_entries_match( + 'modelname', + 'app', + '/tmp/src/myfile.txt', + expected_contents, + timeout=0.1) + + def test_block_until_oslo_config_entries_match(self): + file_contents = """ +[DEFAULT] +verbose = False +use_syslog = False +debug = False +workers = 4 +bind_host = 0.0.0.0 + +[glance_store] +filesystem_store_datadir = /var/lib/glance/images/ +stores = glance.store.filesystem.Store,glance.store.http.Store +default_store = file + +[image_format] +disk_formats = ami,ari,aki,vhd,vmdk,raw,qcow2,vdi,iso,root-tar +""" + expected_contents = { + 'DEFAULT': { + 'debug': ['False']}, + 'glance_store': { + 'filesystem_store_datadir': ['/var/lib/glance/images/'], + 'default_store': ['file']}} + self.block_until_oslo_config_entries_match_base( + file_contents, + expected_contents) + self.unit1.scp_from.assert_called_once_with( + '/tmp/src/myfile.txt', mock.ANY) + self.unit2.scp_from.assert_called_once_with( + '/tmp/src/myfile.txt', mock.ANY) + + def test_block_until_oslo_config_entries_match_fail(self): + file_contents = """ +[DEFAULT] +verbose = False +use_syslog = False +debug = True +workers = 4 +bind_host = 0.0.0.0 + +[glance_store] +filesystem_store_datadir = /var/lib/glance/images/ +stores = glance.store.filesystem.Store,glance.store.http.Store +default_store = file + +[image_format] +disk_formats = ami,ari,aki,vhd,vmdk,raw,qcow2,vdi,iso,root-tar +""" + expected_contents = { + 'DEFAULT': { + 'debug': ['False']}, + 'glance_store': { + 'filesystem_store_datadir': ['/var/lib/glance/images/'], + 'default_store': ['file']}} + with self.assertRaises(asyncio.futures.TimeoutError): + self.block_until_oslo_config_entries_match_base( + file_contents, + expected_contents) + self.unit1.scp_from.assert_called_once_with( + '/tmp/src/myfile.txt', mock.ANY) + + def test_block_until_oslo_config_entries_match_missing_entry(self): + file_contents = """ +[DEFAULT] +verbose = False +use_syslog = False +workers = 4 +bind_host = 0.0.0.0 + +[glance_store] +filesystem_store_datadir = /var/lib/glance/images/ +stores = glance.store.filesystem.Store,glance.store.http.Store +default_store = file + +[image_format] +disk_formats = ami,ari,aki,vhd,vmdk,raw,qcow2,vdi,iso,root-tar +""" + expected_contents = { + 'DEFAULT': { + 'debug': ['False']}, + 'glance_store': { + 'filesystem_store_datadir': ['/var/lib/glance/images/'], + 'default_store': ['file']}} + with self.assertRaises(asyncio.futures.TimeoutError): + self.block_until_oslo_config_entries_match_base( + file_contents, + expected_contents) + self.unit1.scp_from.assert_called_once_with( + '/tmp/src/myfile.txt', mock.ANY) + + def test_block_until_oslo_config_entries_match_missing_section(self): + file_contents = """ +[DEFAULT] +verbose = False +use_syslog = False +workers = 4 +bind_host = 0.0.0.0 + +[image_format] +disk_formats = ami,ari,aki,vhd,vmdk,raw,qcow2,vdi,iso,root-tar +""" + expected_contents = { + 'DEFAULT': { + 'debug': ['False']}, + 'glance_store': { + 'filesystem_store_datadir': ['/var/lib/glance/images/'], + 'default_store': ['file']}} + with self.assertRaises(asyncio.futures.TimeoutError): + self.block_until_oslo_config_entries_match_base( + file_contents, + expected_contents) + self.unit1.scp_from.assert_called_once_with( + '/tmp/src/myfile.txt', mock.ANY) + + def block_until_services_restarted_base(self, gu_return=None, + gu_raise_exception=False): + async def _block_until(f, timeout=None): + rc = await f() + if not rc: + raise asyncio.futures.TimeoutError + self.patch_object(model, 'async_block_until') + self.async_block_until.side_effect = _block_until + + async def _async_get_unit_service_start_time(model_name, unit, svc): + if gu_raise_exception: + raise model.ServiceNotRunning('sv1') + else: + return gu_return + self.patch_object(model, 'async_get_unit_service_start_time') + self.async_get_unit_service_start_time.side_effect = \ + _async_get_unit_service_start_time + self.patch_object(model, 'Model') + self.Model.return_value = self.Model_mock + + def test_block_until_services_restarted(self): + self.block_until_services_restarted_base(gu_return=10) + model.block_until_services_restarted( + 'modelname', + 'app', + 8, + ['svc1', 'svc2']) + + def test_block_until_services_restarted_fail(self): + self.block_until_services_restarted_base(gu_return=10) + with self.assertRaises(asyncio.futures.TimeoutError): + model.block_until_services_restarted( + 'modelname', + 'app', + 12, + ['svc1', 'svc2']) + + def test_block_until_services_restarted_not_running(self): + self.block_until_services_restarted_base(gu_raise_exception=True) + with self.assertRaises(asyncio.futures.TimeoutError): + model.block_until_services_restarted( + 'modelname', + 'app', + 12, + ['svc1', 'svc2']) + + def test_block_until_unit_wl_status(self): + async def _block_until(f, timeout=None): + if not f(): + raise asyncio.futures.TimeoutError + self.patch_object(model, 'Model') + self.Model.return_value = self.Model_mock + self.Model_mock.block_until.side_effect = _block_until + self.patch_object(model, 'get_unit_from_name') + self.get_unit_from_name.return_value = mock.MagicMock( + workload_status='active') + model.block_until_unit_wl_status( + 'modelname', + 'app/2', + 'active', + timeout=0.1) + + def test_block_until_unit_wl_status_fail(self): + async def _block_until(f, timeout=None): + if not f(): + raise asyncio.futures.TimeoutError + self.patch_object(model, 'Model') + self.Model.return_value = self.Model_mock + self.Model_mock.block_until.side_effect = _block_until + self.patch_object(model, 'get_unit_from_name') + self.get_unit_from_name.return_value = mock.MagicMock( + workload_status='maintenance') + with self.assertRaises(asyncio.futures.TimeoutError): + model.block_until_unit_wl_status( + 'modelname', + 'app/2', + 'active', + timeout=0.1) + class AsyncModelTests(aiounittest.AsyncTestCase): diff --git a/zaza/model.py b/zaza/model.py index 9b4b304..678573b 100644 --- a/zaza/model.py +++ b/zaza/model.py @@ -5,6 +5,7 @@ import os import subprocess import tempfile import yaml +from oslo_config import cfg from juju import loop from juju.errors import JujuError @@ -183,6 +184,54 @@ async def async_run_on_unit(model_name, unit_name, command, timeout=None): run_on_unit = sync_wrapper(async_run_on_unit) +async def async_get_unit_time(model_name, unit_name, timeout=None): + """ Get the current time (in seconds since Epoch) on the given unit + + :param model_name: Name of model to query. + :type model_name: str + :param unit_name: Name of unit to run action on + :type unit_name: str + :returns: time in seconds since Epoch on unit + :rtype: int + """ + out = await async_run_on_unit( + model_name, + unit_name, + "date +'%s'", + timeout=timeout) + return int(out['Stdout']) + +get_unit_time = sync_wrapper(async_get_unit_time) + + +async def async_get_unit_service_start_time(model_name, unit_name, service, + timeout=None): + """Return the time (in seconds since Epoch) that the given service was + started on the given unit. If the service is not running return 0 + + :param model_name: Name of model to query. + :type model_name: str + :param unit_name: Name of unit to run action on + :type unit_name: str + :param service: Name of service to check is running + :type service: str + :param timeout: Time to wait for status to be achieved + :type timeout: int + :returns: time in seconds since Epoch on unit + :rtype: int + :raises: ServiceNotRunning + """ + cmd = "stat -c %Y /proc/$(pidof -x {} | cut -f1 -d ' ')".format(service) + out = await async_run_on_unit(model_name, unit_name, cmd, timeout=timeout) + out = out['Stdout'].strip() + if out: + return int(out) + else: + raise ServiceNotRunning(service) + +get_unit_service_start_time = sync_wrapper(async_get_unit_service_start_time) + + async def async_get_application(model_name, application_name): """Return an application object @@ -370,7 +419,6 @@ run_action_on_leader = sync_wrapper(async_run_action_on_leader) class UnitError(Exception): """Exception raised for units in error state - """ def __init__(self, units): @@ -379,6 +427,15 @@ class UnitError(Exception): super(UnitError, self).__init__(message) +class ServiceNotRunning(Exception): + """Exception raised when service not running + """ + + def __init__(self, service): + message = "Service {} not running".format(service) + super(ServiceNotRunning, self).__init__(message) + + def units_with_wl_status_state(model, state): """Return a list of unit which have a matching workload status @@ -475,7 +532,7 @@ async def async_wait_for_application_states(model_name, states=None, :param model_name: Name of model to query. :type model_name: str - :param states: Staes to look for + :param states: States to look for :type states: dict :param timeout: Time to wait for status to be achieved :type timeout: int @@ -521,6 +578,55 @@ async def async_wait_for_application_states(model_name, states=None, wait_for_application_states = sync_wrapper(async_wait_for_application_states) +async def async_block_until_all_units_idle(model_name, timeout=2700): + """Block until all units in the given model are idle + + :param model_name: Name of model to query. + :type model_name: str + :param timeout: Time to wait for status to be achieved + :type timeout: int + """ + async with run_in_model(model_name) as model: + await model.block_until( + lambda: model.all_units_idle(), timeout=timeout) + +block_until_all_units_idle = sync_wrapper(async_block_until_all_units_idle) + + +async def async_block_until_service_status(model_name, unit_name, services, + target_status, timeout=2700): + """Block until all services on the unit are in the desired state (stopped + or running) + + :param model_name: Name of model to query. + :type model_name: str + :param unit_name: Name of unit to run action on + :type unit_name: str + :param services: List of services to check + :type services: [] + :param target_status: State services should be in (stopped or running) + :type target_status: str + :param timeout: Time to wait for status to be achieved + :type timeout: int + """ + async def _check_service(): + for service in services: + out = await async_run_on_unit( + model_name, + unit_name, + "pidof -x {}".format(service), + timeout=timeout) + if target_status == "running" and len(out['Stdout'].strip()) == 0: + return False + elif target_status == "stopped" and len(out['Stdout'].strip()) > 0: + return False + return True + async with run_in_model(model_name): + await async_block_until(_check_service, timeout=timeout) + +block_until_service_status = sync_wrapper(async_block_until_service_status) + + def get_actions(model_name, application_name): """Get the actions an applications supports @@ -654,6 +760,125 @@ block_until_file_has_contents = sync_wrapper( async_block_until_file_has_contents) +async def async_block_until_oslo_config_entries_match(model_name, + application_name, + remote_file, + expected_contents, + timeout=2700): + """Block until the expected_contents are in the given file on all units of + the application + + :param model_name: Name of model to query. + :type model_name: str + :param application_name: Name of application + :type application_name: str + :param remote_file: Remote path of file to transfer + :type remote_file: str + :param expected_contents: The key values pairs in their corresponding + sections to be looked for in the remote_file + :type expected_contents: {} + :param timeout: Time to wait for contents to appear in file + :type timeout: float + + For example to check for, + + [DEFAULT] + debug = False + + [glance_store] + filesystem_store_datadir = /var/lib/glance/images/ + default_store = file + + use: + + expected_contents = { + 'DEFAULT': { + 'debug': ['False']}, + 'glance_store': { + 'filesystem_store_datadir': ['/var/lib/glance/images/'], + 'default_store': ['file']}} + """ + def f(x): + # Writing out the file that was just read is suboptimal + with tempfile.NamedTemporaryFile(mode='w', delete=True) as fp: + fp.write(x) + fp.seek(0) + sections = {} + parser = cfg.ConfigParser(fp.name, sections) + parser.parse() + for section, entries in expected_contents.items(): + for key, value in entries.items(): + if sections.get(section, {}).get(key) != value: + return False + return True + return await async_block_until_file_ready(model_name, application_name, + remote_file, f, timeout) + + +block_until_oslo_config_entries_match = sync_wrapper( + async_block_until_oslo_config_entries_match) + + +async def async_block_until_services_restarted(model_name, application_name, + mtime, services, timeout=2700): + """Block until the given services have a start time later then mtime + :param model_name: Name of model to query. + :type model_name: str + :param application_name: Name of application + :type application_name: str + :param mtime: Time in seconds since Epoch to check against + :type mtime: int + :param services: Listr of services to check restart time of + :type services: [] + :param timeout: Time to wait for services to be restarted + :type timeout: float + """ + async def _check_service(): + units = model.applications[application_name].units + for unit in units: + for service in services: + try: + svc_mtime = await async_get_unit_service_start_time( + model_name, + unit.entity_id, + service) + except ServiceNotRunning: + return False + if svc_mtime < mtime: + return False + return True + async with run_in_model(model_name) as model: + await async_block_until(_check_service, timeout=timeout) + + +block_until_services_restarted = sync_wrapper( + async_block_until_services_restarted) + + +async def async_block_until_unit_wl_status(model_name, unit_name, status, + timeout=2700): + """Block until the given unit has the desired workload status + :param model_name: Name of model to query. + :type model_name: str + :param unit_name: Name of unit to run action on + :type unit_name: str + :param status: Status to wait for (active, maintenance etc) + :type status: str + :param timeout: Time to wait for unit to achieved desired status + :type timeout: float + """ + async with run_in_model(model_name) as model: + unit = get_unit_from_name(unit_name, model) + print(unit.workload_status) + await model.block_until( + lambda: unit.workload_status == status, + timeout=timeout + ) + +block_until_unit_wl_status = sync_wrapper( + async_block_until_unit_wl_status) + + async def async_get_relation_id(model_name, application_name, remote_application_name, remote_interface_name=None): From 86d641e3331dcdb425a85e2f61e1a6f4838332b2 Mon Sep 17 00:00:00 2001 From: Liam Young Date: Tue, 22 May 2018 15:33:34 +0100 Subject: [PATCH 2/3] Minor tidy --- zaza/model.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/zaza/model.py b/zaza/model.py index 678573b..ba5d593 100644 --- a/zaza/model.py +++ b/zaza/model.py @@ -616,9 +616,10 @@ async def async_block_until_service_status(model_name, unit_name, services, unit_name, "pidof -x {}".format(service), timeout=timeout) - if target_status == "running" and len(out['Stdout'].strip()) == 0: + response_size = len(out['Stdout'].strip()) + if target_status == "running" and response_size == 0: return False - elif target_status == "stopped" and len(out['Stdout'].strip()) > 0: + elif target_status == "stopped" and response_size > 0: return False return True async with run_in_model(model_name): From 313f7bee0277914c023b0568202f78469c84ec47 Mon Sep 17 00:00:00 2001 From: Liam Young Date: Tue, 22 May 2018 15:34:58 +0100 Subject: [PATCH 3/3] Remove debug print --- zaza/model.py | 1 - 1 file changed, 1 deletion(-) diff --git a/zaza/model.py b/zaza/model.py index ba5d593..1ca7e86 100644 --- a/zaza/model.py +++ b/zaza/model.py @@ -870,7 +870,6 @@ async def async_block_until_unit_wl_status(model_name, unit_name, status, """ async with run_in_model(model_name) as model: unit = get_unit_from_name(unit_name, model) - print(unit.workload_status) await model.block_until( lambda: unit.workload_status == status, timeout=timeout