From d65dd96f26b2666b6a8bbedc70094e52e3f8b390 Mon Sep 17 00:00:00 2001 From: Liam Young Date: Wed, 18 Apr 2018 13:36:13 +0000 Subject: [PATCH] Use async_generator pkg to simplify async calls Use the async_generator, yield_, asynccontextmanager methods from async_generator to simplify the creation of sync wrappers around async libjuju calls --- test-requirements.txt | 1 + unit_tests/test_zaza_model.py | 66 ++++--------- zaza/model.py | 176 +++++++++++++++++++--------------- 3 files changed, 117 insertions(+), 126 deletions(-) diff --git a/test-requirements.txt b/test-requirements.txt index 05d0180..19c644c 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,3 +1,4 @@ +async_generator juju juju_wait PyYAML diff --git a/unit_tests/test_zaza_model.py b/unit_tests/test_zaza_model.py index 4333443..65c8ff1 100644 --- a/unit_tests/test_zaza_model.py +++ b/unit_tests/test_zaza_model.py @@ -1,4 +1,3 @@ -import functools import mock import zaza.model as model import unit_tests.utils as ut_utils @@ -13,6 +12,9 @@ class TestModel(ut_utils.BaseTestCase): async def _scp_to(source, destination, user, proxy, scp_opts): return + async def _scp_from(source, destination, user, proxy, scp_opts): + return + async def _run(command, timeout=None): return self.action @@ -44,6 +46,8 @@ class TestModel(ut_utils.BaseTestCase): self.unit1.run.side_effect = _run self.unit1.scp_to.side_effect = _scp_to self.unit2.scp_to.side_effect = _scp_to + self.unit1.scp_from.side_effect = _scp_from + self.unit2.scp_from.side_effect = _scp_from self.units = [self.unit1, self.unit2] _units = mock.MagicMock() _units.units = self.units @@ -64,61 +68,28 @@ class TestModel(ut_utils.BaseTestCase): def test_run_in_model(self): self.patch_object(model, 'Model') - - async def _test_func(arg): - return arg * 2 self.Model.return_value = self.Model_mock - func = functools.partial(_test_func, 'hello') - out = loop.run( - model.run_in_model( - 'mymodel', - func, - awaitable=True)) - self.assertEqual(out, 'hellohello') - def test_run_in_model_not_awaitable(self): - self.patch_object(model, 'Model') - - def _test_func(arg): - return arg * 3 - self.Model.return_value = self.Model_mock - func = functools.partial(_test_func, 'hello') - out = loop.run( - model.run_in_model( - 'mymodel', - func, - awaitable=False)) - self.assertEqual(out, 'hellohellohello') - - def test_run_in_model_add_model_arg(self): - self.patch_object(model, 'Model') - - def _test_func(arg, model): - return model - self.Model.return_value = self.Model_mock - func = functools.partial(_test_func, 'hello') - out = loop.run( - model.run_in_model( - 'mymodel', - func, - add_model_arg=True, - awaitable=False)) - self.assertEqual(out, self.Model_mock) + async def _wrapper(): + async with model.run_in_model('modelname') as mymodel: + return mymodel + self.assertEqual(loop.run(_wrapper()), self.Model_mock) + self.Model_mock.connect_model.assert_called_once_with('modelname') + self.Model_mock.disconnect.assert_called_once_with() def test_scp_to_unit(self): self.patch_object(model, 'Model') self.patch_object(model, 'get_unit_from_name') - unit_mock = mock.MagicMock() - self.get_unit_from_name.return_value = unit_mock + self.get_unit_from_name.return_value = self.unit1 self.Model.return_value = self.Model_mock - model.scp_to_unit('app/1', 'modelname', '/tmp/src', '/tmp/dest') - unit_mock.scp_to.assert_called_once_with( + model.scp_to_unit('modelname', 'app/1', '/tmp/src', '/tmp/dest') + self.unit1.scp_to.assert_called_once_with( '/tmp/src', '/tmp/dest', proxy=False, scp_opts='', user='ubuntu') def test_scp_to_all_units(self): self.patch_object(model, 'Model') self.Model.return_value = self.Model_mock - model.scp_to_all_units('app', 'modelname', '/tmp/src', '/tmp/dest') + model.scp_to_all_units('modelname', 'app', '/tmp/src', '/tmp/dest') self.unit1.scp_to.assert_called_once_with( '/tmp/src', '/tmp/dest', proxy=False, scp_opts='', user='ubuntu') self.unit2.scp_to.assert_called_once_with( @@ -127,11 +98,10 @@ class TestModel(ut_utils.BaseTestCase): def test_scp_from_unit(self): self.patch_object(model, 'Model') self.patch_object(model, 'get_unit_from_name') - unit_mock = mock.MagicMock() - self.get_unit_from_name.return_value = unit_mock + self.get_unit_from_name.return_value = self.unit1 self.Model.return_value = self.Model_mock - model.scp_from_unit('app/1', 'modelname', '/tmp/src', '/tmp/dest') - unit_mock.scp_from.assert_called_once_with( + model.scp_from_unit('modelname', 'app/1', '/tmp/src', '/tmp/dest') + self.unit1.scp_from.assert_called_once_with( '/tmp/src', '/tmp/dest', proxy=False, scp_opts='', user='ubuntu') def test_get_units(self): diff --git a/zaza/model.py b/zaza/model.py index 40157da..2a6809e 100644 --- a/zaza/model.py +++ b/zaza/model.py @@ -1,4 +1,5 @@ -import functools +import asyncio +from async_generator import async_generator, yield_, asynccontextmanager from juju import loop from juju.model import Model @@ -39,7 +40,55 @@ def get_unit_from_name(unit_name, model): return unit -async def run_in_model(model_name, f, add_model_arg=False, awaitable=True): +def run(*steps): + """Run the given steps in an asyncio loop + + :returns: The result of the asyncio.Task + :rtype: obj + """ + if not steps: + return + loop = asyncio.get_event_loop() + + for step in steps: + task = loop.create_task(step) + loop.run_until_complete(asyncio.wait([task], loop=loop)) + return task.result() + + +def sync_wrapper(f): + """Convert the given async function into a sync function + + :returns: The de-async'd function + :rtype: function + """ + def _wrapper(*args, **kwargs): + async def _run_it(): + return await f(*args, **kwargs) + return run(_run_it()) + return _wrapper + + +@asynccontextmanager +@async_generator +async def run_in_model(model_name): + """Conext manager for executing code inside a libjuju model + Example of using run_in_model: + async with run_in_model(model_name) as model: + model.do_something() + + :param model_name: Name of model to run function in + :type model_name: str + :returns: The juju Model object correcsponding to model_name + :rtype: Iterator[:class:'juju.Model()'] + """ + model = Model() + await model.connect_model(model_name) + await yield_(model) + await model.disconnect() + + +async def run_in_model_old(model_name, f, add_model_arg=False, awaitable=True): """Run the given function in the model matching the model_name :param model_name: Name of model to run function in @@ -70,8 +119,8 @@ async def run_in_model(model_name, f, add_model_arg=False, awaitable=True): return output -def scp_to_unit(unit_name, model_name, source, destination, user='ubuntu', - proxy=False, scp_opts=''): +async def async_scp_to_unit(model_name, unit_name, source, destination, + user='ubuntu', proxy=False, scp_opts=''): """Transfer files to unit_name in model_name. :param unit_name: Name of unit to scp to @@ -89,25 +138,17 @@ def scp_to_unit(unit_name, model_name, source, destination, user='ubuntu', :param scp_opts: Additional options to the scp command :type scp_opts: str """ - async def _scp_to_unit(unit_name, source, destination, user, proxy, - scp_opts, model): + async with run_in_model(model_name) as model: unit = get_unit_from_name(unit_name, model) await unit.scp_to(source, destination, user=user, proxy=proxy, scp_opts=scp_opts) - scp_func = functools.partial( - _scp_to_unit, - unit_name, - source, - destination, - user=user, - proxy=proxy, - scp_opts=scp_opts) - loop.run( - run_in_model(model_name, scp_func, add_model_arg=True, awaitable=True)) + +scp_to_unit = sync_wrapper(async_scp_to_unit) -def scp_to_all_units(application_name, model_name, source, destination, - user='ubuntu', proxy=False, scp_opts=''): +async def async_scp_to_all_units(model_name, application_name, source, + destination, user='ubuntu', proxy=False, + scp_opts=''): """Transfer files from to all units of an application :param application_name: Name of application to scp file to @@ -125,25 +166,17 @@ def scp_to_all_units(application_name, model_name, source, destination, :param scp_opts: Additional options to the scp command :type scp_opts: str """ - async def _scp_to_all_units(application_name, source, destination, user, - proxy, scp_opts, model): + async with run_in_model(model_name) as model: for unit in model.applications[application_name].units: + # FIXME: Should scp in parallel await unit.scp_to(source, destination, user=user, proxy=proxy, scp_opts=scp_opts) - scp_func = functools.partial( - _scp_to_all_units, - application_name, - source, - destination, - user=user, - proxy=proxy, - scp_opts=scp_opts) - loop.run( - run_in_model(model_name, scp_func, add_model_arg=True, awaitable=True)) + +scp_to_all_units = sync_wrapper(async_scp_to_all_units) -def scp_from_unit(unit_name, model_name, source, destination, user='ubuntu', - proxy=False, scp_opts=''): +async def async_scp_from_unit(model_name, unit_name, source, destination, + user='ubuntu', proxy=False, scp_opts=''): """Transfer files from to unit_name in model_name. :param unit_name: Name of unit to scp from @@ -161,24 +194,16 @@ def scp_from_unit(unit_name, model_name, source, destination, user='ubuntu', :param scp_opts: Additional options to the scp command :type scp_opts: str """ - async def _scp_from_unit(unit_name, source, destination, user, proxy, - scp_opts, model): + async with run_in_model(model_name) as model: unit = get_unit_from_name(unit_name, model) await unit.scp_from(source, destination, user=user, proxy=proxy, scp_opts=scp_opts) - scp_func = functools.partial( - _scp_from_unit, - unit_name, - source, - destination, - user=user, - proxy=proxy, - scp_opts=scp_opts) - loop.run( - run_in_model(model_name, scp_func, add_model_arg=True, awaitable=True)) -def run_on_unit(unit_name, model_name, command, timeout=None): +scp_from_unit = sync_wrapper(async_scp_from_unit) + + +async def async_run_on_unit(model_name, unit_name, command, timeout=None): """Juju run on unit :param unit_name: Name of unit to match @@ -199,23 +224,18 @@ def run_on_unit(unit_name, model_name, command, timeout=None): if timeout: timeout = None - async def _run_on_unit(unit_name, command, model, timeout=None): + async with run_in_model(model_name) as model: unit = get_unit_from_name(unit_name, model) action = await unit.run(command, timeout=timeout) if action.data.get('results'): return action.data.get('results') else: return {} - run_func = functools.partial( - _run_on_unit, - unit_name, - command, - timeout=timeout) - return loop.run( - run_in_model(model_name, run_func, add_model_arg=True, awaitable=True)) + +run_on_unit = sync_wrapper(async_run_on_unit) -def get_application(model_name, application_name): +async def async_get_application(model_name, application_name): """Return an application object :param model_name: Name of model to query. @@ -226,13 +246,13 @@ def get_application(model_name, application_name): :returns: Appliction object :rtype: object """ - async def _get_application(application_name, model): + async with run_in_model(model_name) as model: return model.applications[application_name] - f = functools.partial(_get_application, application_name) - return loop.run(run_in_model(model_name, f, add_model_arg=True)) + +get_application = sync_wrapper(async_get_application) -def get_units(model_name, application_name): +async def async_get_units(model_name, application_name): """Return all the units of a given application :param model_name: Name of model to query. @@ -243,13 +263,13 @@ def get_units(model_name, application_name): :returns: List of juju units :rtype: [juju.unit.Unit, juju.unit.Unit,...] """ - async def _get_units(application_name, model): + async with run_in_model(model_name) as model: return model.applications[application_name].units - f = functools.partial(_get_units, application_name) - return loop.run(run_in_model(model_name, f, add_model_arg=True)) + +get_units = sync_wrapper(async_get_units) -def get_machines(model_name, application_name): +async def async_get_machines(model_name, application_name): """Return all the machines of a given application :param model_name: Name of model to query. @@ -260,13 +280,13 @@ def get_machines(model_name, application_name): :returns: List of juju machines :rtype: [juju.machine.Machine, juju.machine.Machine,...] """ - async def _get_machines(application_name, model): + async with run_in_model(model_name) as model: machines = [] for unit in model.applications[application_name].units: machines.append(unit.machine) return machines - f = functools.partial(_get_machines, application_name) - return loop.run(run_in_model(model_name, f, add_model_arg=True)) + +get_machines = sync_wrapper(async_get_machines) def get_first_unit_name(model_name, application_name): @@ -297,7 +317,7 @@ def get_app_ips(model_name, application_name): return [u.public_address for u in get_units(model_name, application_name)] -def get_application_config(model_name, application_name): +async def async_get_application_config(model_name, application_name): """Return application configuration :param model_name: Name of model to query. @@ -308,13 +328,14 @@ def get_application_config(model_name, application_name): :returns: Dictionary of configuration :rtype: dict """ - async def _get_config(application_name, model): + async with run_in_model(model_name) as model: return await model.applications[application_name].get_config() - f = functools.partial(_get_config, application_name) - return loop.run(run_in_model(model_name, f, add_model_arg=True)) + +get_application_config = sync_wrapper(async_get_application_config) -def set_application_config(model_name, application_name, configuration): +async def async_set_application_config(model_name, application_name, + configuration): """Set application configuration :param model_name: Name of model to query. @@ -326,15 +347,14 @@ def set_application_config(model_name, application_name, configuration): :returns: None :rtype: None """ - async def _set_config(application_name, model, configuration=None): + async with run_in_model(model_name) as model: return await (model.applications[application_name] .set_config(configuration)) - f = functools.partial(_set_config, application_name, - configuration=configuration) - return loop.run(run_in_model(model_name, f, add_model_arg=True)) + +set_application_config = sync_wrapper(async_set_application_config) -def get_status(model_name): +async def async_get_status(model_name): """Return full status :param model_name: Name of model to query. @@ -343,10 +363,10 @@ def get_status(model_name): :returns: dictionary of juju status :rtype: dict """ - async def _get_status(model): + async with run_in_model(model_name) as model: return await model.get_status() - f = functools.partial(_get_status) - return loop.run(run_in_model(model_name, f, add_model_arg=True)) + +get_status = sync_wrapper(async_get_status) def main():