From 458a7207d848ec72b9f4101badcbb7fa90355d9a Mon Sep 17 00:00:00 2001 From: Liam Young Date: Tue, 8 May 2018 08:48:19 +0000 Subject: [PATCH] Add block_until_file_has_contents helper Add a helper which will block until a string appears in a particular file of an application. --- test-requirements.txt | 1 + unit_tests/test_zaza_model.py | 75 ++++++++++++++++++++++++++++++++-- zaza/model.py | 76 +++++++++++++++++++++++++++++++++++ 3 files changed, 149 insertions(+), 3 deletions(-) diff --git a/test-requirements.txt b/test-requirements.txt index ac542c0..55acbc2 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,3 +1,4 @@ +aiounittest async_generator juju juju_wait diff --git a/unit_tests/test_zaza_model.py b/unit_tests/test_zaza_model.py index 7b78f39..9febe6b 100644 --- a/unit_tests/test_zaza_model.py +++ b/unit_tests/test_zaza_model.py @@ -1,18 +1,24 @@ +import aiounittest +import asyncio.futures import mock -import zaza.model as model + import unit_tests.utils as ut_utils from juju import loop +import zaza.model as model + class TestModel(ut_utils.BaseTestCase): def setUp(self): super(TestModel, self).setUp() - async def _scp_to(source, destination, user, proxy, scp_opts): + async def _scp_to(source, destination, user=None, proxy=None, + scp_opts=None): return - async def _scp_from(source, destination, user, proxy, scp_opts): + async def _scp_from(source, destination, user=None, proxy=None, + scp_opts=None): return async def _run(command, timeout=None): @@ -374,3 +380,66 @@ class TestModel(ut_utils.BaseTestCase): self.patch_object(model, 'Model') self.Model.return_value = self.Model_mock self.assertEqual(model.get_current_model(), self.model_name) + + def test_block_until_file_has_contents(self): + self.patch_object(model, 'Model') + self.Model.return_value = self.Model_mock + self.patch("builtins.open", + new_callable=mock.mock_open(), + name="_open") + _fileobj = mock.MagicMock() + _fileobj.__enter__().read.return_value = "somestring" + self._open.return_value = _fileobj + model.block_until_file_has_contents( + 'modelname', + 'app', + '/tmp/src/myfile.txt', + 'somestring', + timeout=0.1) + self.unit1.scp_from.assert_called_once_with( + '/tmp/src/myfile.txt', mock.ANY) + self.unit2.scp_from.assert_called_once_with( + '/tmp/src/myfile.txt', mock.ANY) + + def test_block_until_file_has_contents_missing(self): + self.patch_object(model, 'Model') + self.Model.return_value = self.Model_mock + self.patch("builtins.open", + new_callable=mock.mock_open(), + name="_open") + _fileobj = mock.MagicMock() + _fileobj.__enter__().read.return_value = "anything else" + self._open.return_value = _fileobj + with self.assertRaises(asyncio.futures.TimeoutError): + model.block_until_file_has_contents( + 'modelname', + 'app', + '/tmp/src/myfile.txt', + 'somestring', + timeout=0.1) + self.unit1.scp_from.assert_called_once_with( + '/tmp/src/myfile.txt', mock.ANY) + + +class AsyncModelTests(aiounittest.AsyncTestCase): + + async def test_async_block_until_timeout(self): + + async def _f(): + return False + + async def _g(): + return True + + with self.assertRaises(asyncio.futures.TimeoutError): + await model.async_block_until(_f, _g, timeout=0.1) + + async def test_async_block_until_pass(self): + + async def _f(): + return True + + async def _g(): + return True + + await model.async_block_until(_f, _g, timeout=0.1) diff --git a/zaza/model.py b/zaza/model.py index b0eceac..de227be 100644 --- a/zaza/model.py +++ b/zaza/model.py @@ -1,10 +1,13 @@ import asyncio from async_generator import async_generator, yield_, asynccontextmanager import logging +import os import subprocess +import tempfile import yaml from juju import loop +from juju.errors import JujuError from juju.model import Model @@ -581,6 +584,79 @@ async def async_get_current_model(): get_current_model = sync_wrapper(async_get_current_model) +async def async_block_until(*conditions, timeout=None, wait_period=0.5, + loop=None): + """Return only after all async conditions are true. + + Based on juju.utils.block_until which currently does not support + async methods as conditions. + + :param conditions: Functions to evaluate. + :type conditions: functions + :param timeout: Timeout in secounds + :type timeout: int + :param wait_period: Time to wait between re-assing conditions. + :type wait_period: float + :param loop: The evnt loop to use + :type loop: An event loop + """ + + async def _block(): + while True: + evaluated = [] + for c in conditions: + result = await c() + evaluated.append(result) + if all(evaluated): + return + else: + await asyncio.sleep(wait_period, loop=loop) + await asyncio.wait_for(_block(), timeout, loop=loop) + + +async def async_block_until_file_has_contents(model_name, application_name, + remote_file, expected_contents, + timeout=2700): + """Block until all the units of a given application have the expected + contents in the given file. + + :param model_name: Name of model to query. + :type model_name: str + :param application_name: Name of application + :type application_name: str + :param remote_file: Remote path of file(s) to transfer + :type remote_file: str + :param expected_contents: Contents expected to be found in file + :type expected_contents,: str + :param timeout: Time to wait for contents to appear in file + :type timeout: int + """ + async def _check_file(): + file_name = os.path.basename(remote_file) + units = model.applications[application_name].units + with tempfile.TemporaryDirectory() as tmpdir: + for unit in units: + try: + await unit.scp_from(remote_file, tmpdir) + with open(os.path.join(tmpdir, file_name), 'r') as lf: + contents = lf.read() + if expected_contents not in contents: + return False + # libjuju throws a generic error for scp failure. So we cannot + # differentiate between a connectivity issue and a target file + # not existing error. For now just assume the latter. + except JujuError as e: + return False + else: + return True + + async with run_in_model(model_name) as model: + await async_block_until(_check_file, timeout=timeout) + +block_until_file_has_contents = sync_wrapper( + async_block_until_file_has_contents) + + def main(): # Run the deploy coroutine in an asyncio event loop, using a helper # that abstracts loop creation and teardown.