Use async_generator pkg to simplify async calls

Use the async_generator, yield_, asynccontextmanager methods from
async_generator to simplify the creation of sync wrappers around
async libjuju calls
This commit is contained in:
Liam Young
2018-04-18 13:36:13 +00:00
parent 371ded8da1
commit d65dd96f26
3 changed files with 117 additions and 126 deletions
+1
View File
@@ -1,3 +1,4 @@
async_generator
juju
juju_wait
PyYAML
+18 -48
View File
@@ -1,4 +1,3 @@
import functools
import mock
import zaza.model as model
import unit_tests.utils as ut_utils
@@ -13,6 +12,9 @@ class TestModel(ut_utils.BaseTestCase):
async def _scp_to(source, destination, user, proxy, scp_opts):
return
async def _scp_from(source, destination, user, proxy, scp_opts):
return
async def _run(command, timeout=None):
return self.action
@@ -44,6 +46,8 @@ class TestModel(ut_utils.BaseTestCase):
self.unit1.run.side_effect = _run
self.unit1.scp_to.side_effect = _scp_to
self.unit2.scp_to.side_effect = _scp_to
self.unit1.scp_from.side_effect = _scp_from
self.unit2.scp_from.side_effect = _scp_from
self.units = [self.unit1, self.unit2]
_units = mock.MagicMock()
_units.units = self.units
@@ -64,61 +68,28 @@ class TestModel(ut_utils.BaseTestCase):
def test_run_in_model(self):
self.patch_object(model, 'Model')
async def _test_func(arg):
return arg * 2
self.Model.return_value = self.Model_mock
func = functools.partial(_test_func, 'hello')
out = loop.run(
model.run_in_model(
'mymodel',
func,
awaitable=True))
self.assertEqual(out, 'hellohello')
def test_run_in_model_not_awaitable(self):
self.patch_object(model, 'Model')
def _test_func(arg):
return arg * 3
self.Model.return_value = self.Model_mock
func = functools.partial(_test_func, 'hello')
out = loop.run(
model.run_in_model(
'mymodel',
func,
awaitable=False))
self.assertEqual(out, 'hellohellohello')
def test_run_in_model_add_model_arg(self):
self.patch_object(model, 'Model')
def _test_func(arg, model):
return model
self.Model.return_value = self.Model_mock
func = functools.partial(_test_func, 'hello')
out = loop.run(
model.run_in_model(
'mymodel',
func,
add_model_arg=True,
awaitable=False))
self.assertEqual(out, self.Model_mock)
async def _wrapper():
async with model.run_in_model('modelname') as mymodel:
return mymodel
self.assertEqual(loop.run(_wrapper()), self.Model_mock)
self.Model_mock.connect_model.assert_called_once_with('modelname')
self.Model_mock.disconnect.assert_called_once_with()
def test_scp_to_unit(self):
self.patch_object(model, 'Model')
self.patch_object(model, 'get_unit_from_name')
unit_mock = mock.MagicMock()
self.get_unit_from_name.return_value = unit_mock
self.get_unit_from_name.return_value = self.unit1
self.Model.return_value = self.Model_mock
model.scp_to_unit('app/1', 'modelname', '/tmp/src', '/tmp/dest')
unit_mock.scp_to.assert_called_once_with(
model.scp_to_unit('modelname', 'app/1', '/tmp/src', '/tmp/dest')
self.unit1.scp_to.assert_called_once_with(
'/tmp/src', '/tmp/dest', proxy=False, scp_opts='', user='ubuntu')
def test_scp_to_all_units(self):
self.patch_object(model, 'Model')
self.Model.return_value = self.Model_mock
model.scp_to_all_units('app', 'modelname', '/tmp/src', '/tmp/dest')
model.scp_to_all_units('modelname', 'app', '/tmp/src', '/tmp/dest')
self.unit1.scp_to.assert_called_once_with(
'/tmp/src', '/tmp/dest', proxy=False, scp_opts='', user='ubuntu')
self.unit2.scp_to.assert_called_once_with(
@@ -127,11 +98,10 @@ class TestModel(ut_utils.BaseTestCase):
def test_scp_from_unit(self):
self.patch_object(model, 'Model')
self.patch_object(model, 'get_unit_from_name')
unit_mock = mock.MagicMock()
self.get_unit_from_name.return_value = unit_mock
self.get_unit_from_name.return_value = self.unit1
self.Model.return_value = self.Model_mock
model.scp_from_unit('app/1', 'modelname', '/tmp/src', '/tmp/dest')
unit_mock.scp_from.assert_called_once_with(
model.scp_from_unit('modelname', 'app/1', '/tmp/src', '/tmp/dest')
self.unit1.scp_from.assert_called_once_with(
'/tmp/src', '/tmp/dest', proxy=False, scp_opts='', user='ubuntu')
def test_get_units(self):
+98 -78
View File
@@ -1,4 +1,5 @@
import functools
import asyncio
from async_generator import async_generator, yield_, asynccontextmanager
from juju import loop
from juju.model import Model
@@ -39,7 +40,55 @@ def get_unit_from_name(unit_name, model):
return unit
async def run_in_model(model_name, f, add_model_arg=False, awaitable=True):
def run(*steps):
"""Run the given steps in an asyncio loop
:returns: The result of the asyncio.Task
:rtype: obj
"""
if not steps:
return
loop = asyncio.get_event_loop()
for step in steps:
task = loop.create_task(step)
loop.run_until_complete(asyncio.wait([task], loop=loop))
return task.result()
def sync_wrapper(f):
"""Convert the given async function into a sync function
:returns: The de-async'd function
:rtype: function
"""
def _wrapper(*args, **kwargs):
async def _run_it():
return await f(*args, **kwargs)
return run(_run_it())
return _wrapper
@asynccontextmanager
@async_generator
async def run_in_model(model_name):
"""Conext manager for executing code inside a libjuju model
Example of using run_in_model:
async with run_in_model(model_name) as model:
model.do_something()
:param model_name: Name of model to run function in
:type model_name: str
:returns: The juju Model object correcsponding to model_name
:rtype: Iterator[:class:'juju.Model()']
"""
model = Model()
await model.connect_model(model_name)
await yield_(model)
await model.disconnect()
async def run_in_model_old(model_name, f, add_model_arg=False, awaitable=True):
"""Run the given function in the model matching the model_name
:param model_name: Name of model to run function in
@@ -70,8 +119,8 @@ async def run_in_model(model_name, f, add_model_arg=False, awaitable=True):
return output
def scp_to_unit(unit_name, model_name, source, destination, user='ubuntu',
proxy=False, scp_opts=''):
async def async_scp_to_unit(model_name, unit_name, source, destination,
user='ubuntu', proxy=False, scp_opts=''):
"""Transfer files to unit_name in model_name.
:param unit_name: Name of unit to scp to
@@ -89,25 +138,17 @@ def scp_to_unit(unit_name, model_name, source, destination, user='ubuntu',
:param scp_opts: Additional options to the scp command
:type scp_opts: str
"""
async def _scp_to_unit(unit_name, source, destination, user, proxy,
scp_opts, model):
async with run_in_model(model_name) as model:
unit = get_unit_from_name(unit_name, model)
await unit.scp_to(source, destination, user=user, proxy=proxy,
scp_opts=scp_opts)
scp_func = functools.partial(
_scp_to_unit,
unit_name,
source,
destination,
user=user,
proxy=proxy,
scp_opts=scp_opts)
loop.run(
run_in_model(model_name, scp_func, add_model_arg=True, awaitable=True))
scp_to_unit = sync_wrapper(async_scp_to_unit)
def scp_to_all_units(application_name, model_name, source, destination,
user='ubuntu', proxy=False, scp_opts=''):
async def async_scp_to_all_units(model_name, application_name, source,
destination, user='ubuntu', proxy=False,
scp_opts=''):
"""Transfer files from to all units of an application
:param application_name: Name of application to scp file to
@@ -125,25 +166,17 @@ def scp_to_all_units(application_name, model_name, source, destination,
:param scp_opts: Additional options to the scp command
:type scp_opts: str
"""
async def _scp_to_all_units(application_name, source, destination, user,
proxy, scp_opts, model):
async with run_in_model(model_name) as model:
for unit in model.applications[application_name].units:
# FIXME: Should scp in parallel
await unit.scp_to(source, destination, user=user, proxy=proxy,
scp_opts=scp_opts)
scp_func = functools.partial(
_scp_to_all_units,
application_name,
source,
destination,
user=user,
proxy=proxy,
scp_opts=scp_opts)
loop.run(
run_in_model(model_name, scp_func, add_model_arg=True, awaitable=True))
scp_to_all_units = sync_wrapper(async_scp_to_all_units)
def scp_from_unit(unit_name, model_name, source, destination, user='ubuntu',
proxy=False, scp_opts=''):
async def async_scp_from_unit(model_name, unit_name, source, destination,
user='ubuntu', proxy=False, scp_opts=''):
"""Transfer files from to unit_name in model_name.
:param unit_name: Name of unit to scp from
@@ -161,24 +194,16 @@ def scp_from_unit(unit_name, model_name, source, destination, user='ubuntu',
:param scp_opts: Additional options to the scp command
:type scp_opts: str
"""
async def _scp_from_unit(unit_name, source, destination, user, proxy,
scp_opts, model):
async with run_in_model(model_name) as model:
unit = get_unit_from_name(unit_name, model)
await unit.scp_from(source, destination, user=user, proxy=proxy,
scp_opts=scp_opts)
scp_func = functools.partial(
_scp_from_unit,
unit_name,
source,
destination,
user=user,
proxy=proxy,
scp_opts=scp_opts)
loop.run(
run_in_model(model_name, scp_func, add_model_arg=True, awaitable=True))
def run_on_unit(unit_name, model_name, command, timeout=None):
scp_from_unit = sync_wrapper(async_scp_from_unit)
async def async_run_on_unit(model_name, unit_name, command, timeout=None):
"""Juju run on unit
:param unit_name: Name of unit to match
@@ -199,23 +224,18 @@ def run_on_unit(unit_name, model_name, command, timeout=None):
if timeout:
timeout = None
async def _run_on_unit(unit_name, command, model, timeout=None):
async with run_in_model(model_name) as model:
unit = get_unit_from_name(unit_name, model)
action = await unit.run(command, timeout=timeout)
if action.data.get('results'):
return action.data.get('results')
else:
return {}
run_func = functools.partial(
_run_on_unit,
unit_name,
command,
timeout=timeout)
return loop.run(
run_in_model(model_name, run_func, add_model_arg=True, awaitable=True))
run_on_unit = sync_wrapper(async_run_on_unit)
def get_application(model_name, application_name):
async def async_get_application(model_name, application_name):
"""Return an application object
:param model_name: Name of model to query.
@@ -226,13 +246,13 @@ def get_application(model_name, application_name):
:returns: Appliction object
:rtype: object
"""
async def _get_application(application_name, model):
async with run_in_model(model_name) as model:
return model.applications[application_name]
f = functools.partial(_get_application, application_name)
return loop.run(run_in_model(model_name, f, add_model_arg=True))
get_application = sync_wrapper(async_get_application)
def get_units(model_name, application_name):
async def async_get_units(model_name, application_name):
"""Return all the units of a given application
:param model_name: Name of model to query.
@@ -243,13 +263,13 @@ def get_units(model_name, application_name):
:returns: List of juju units
:rtype: [juju.unit.Unit, juju.unit.Unit,...]
"""
async def _get_units(application_name, model):
async with run_in_model(model_name) as model:
return model.applications[application_name].units
f = functools.partial(_get_units, application_name)
return loop.run(run_in_model(model_name, f, add_model_arg=True))
get_units = sync_wrapper(async_get_units)
def get_machines(model_name, application_name):
async def async_get_machines(model_name, application_name):
"""Return all the machines of a given application
:param model_name: Name of model to query.
@@ -260,13 +280,13 @@ def get_machines(model_name, application_name):
:returns: List of juju machines
:rtype: [juju.machine.Machine, juju.machine.Machine,...]
"""
async def _get_machines(application_name, model):
async with run_in_model(model_name) as model:
machines = []
for unit in model.applications[application_name].units:
machines.append(unit.machine)
return machines
f = functools.partial(_get_machines, application_name)
return loop.run(run_in_model(model_name, f, add_model_arg=True))
get_machines = sync_wrapper(async_get_machines)
def get_first_unit_name(model_name, application_name):
@@ -297,7 +317,7 @@ def get_app_ips(model_name, application_name):
return [u.public_address for u in get_units(model_name, application_name)]
def get_application_config(model_name, application_name):
async def async_get_application_config(model_name, application_name):
"""Return application configuration
:param model_name: Name of model to query.
@@ -308,13 +328,14 @@ def get_application_config(model_name, application_name):
:returns: Dictionary of configuration
:rtype: dict
"""
async def _get_config(application_name, model):
async with run_in_model(model_name) as model:
return await model.applications[application_name].get_config()
f = functools.partial(_get_config, application_name)
return loop.run(run_in_model(model_name, f, add_model_arg=True))
get_application_config = sync_wrapper(async_get_application_config)
def set_application_config(model_name, application_name, configuration):
async def async_set_application_config(model_name, application_name,
configuration):
"""Set application configuration
:param model_name: Name of model to query.
@@ -326,15 +347,14 @@ def set_application_config(model_name, application_name, configuration):
:returns: None
:rtype: None
"""
async def _set_config(application_name, model, configuration=None):
async with run_in_model(model_name) as model:
return await (model.applications[application_name]
.set_config(configuration))
f = functools.partial(_set_config, application_name,
configuration=configuration)
return loop.run(run_in_model(model_name, f, add_model_arg=True))
set_application_config = sync_wrapper(async_set_application_config)
def get_status(model_name):
async def async_get_status(model_name):
"""Return full status
:param model_name: Name of model to query.
@@ -343,10 +363,10 @@ def get_status(model_name):
:returns: dictionary of juju status
:rtype: dict
"""
async def _get_status(model):
async with run_in_model(model_name) as model:
return await model.get_status()
f = functools.partial(_get_status)
return loop.run(run_in_model(model_name, f, add_model_arg=True))
get_status = sync_wrapper(async_get_status)
def main():