Another batch of zaza.model helpers

This pr adds a number of new fuctions to zaza.model. As discussed
zaza.model needs to be broken up as it has become too large. I would
like to continue the discussion around that and submit a subsequent
patch for that at a later date.
This commit is contained in:
Liam Young
2018-05-22 15:28:28 +01:00
parent 41b234e1f6
commit cbc14e32c6
2 changed files with 543 additions and 2 deletions

View File

@@ -441,6 +441,322 @@ class TestModel(ut_utils.BaseTestCase):
self.unit1.scp_from.assert_called_once_with(
'/tmp/src/myfile.txt', mock.ANY)
def test_async_block_until_all_units_idle(self):
async def _block_until(f, timeout=None):
if not f():
raise asyncio.futures.TimeoutError
def _all_units_idle():
return True
self.patch_object(model, 'Model')
self.Model.return_value = self.Model_mock
self.Model_mock.all_units_idle.side_effect = _all_units_idle
self.Model_mock.block_until.side_effect = _block_until
# Check exception is not raised:
model.block_until_all_units_idle('modelname')
def test_async_block_until_all_units_idle_false(self):
async def _block_until(f, timeout=None):
if not f():
raise asyncio.futures.TimeoutError
def _all_units_idle():
return False
self.Model_mock.all_units_idle.side_effect = _all_units_idle
self.patch_object(model, 'Model')
self.Model.return_value = self.Model_mock
self.Model_mock.block_until.side_effect = _block_until
# Confirm exception is raised:
with self.assertRaises(asyncio.futures.TimeoutError):
model.block_until_all_units_idle('modelname')
def block_until_service_status_base(self, rou_return):
async def _block_until(f, timeout=None):
rc = await f()
if not rc:
raise asyncio.futures.TimeoutError
async def _run_on_unit(model_name, unit_name, cmd, timeout=None):
return rou_return
self.patch_object(model, 'async_run_on_unit')
self.async_run_on_unit.side_effect = _run_on_unit
self.patch_object(model, 'Model')
self.Model.return_value = self.Model_mock
self.patch_object(model, 'async_block_until')
self.async_block_until.side_effect = _block_until
def test_block_until_service_status_check_running(self):
self.block_until_service_status_base({'Stdout': '152 409 54'})
model.block_until_service_status(
'modelname',
'app/2',
['test_svc'],
'running')
def test_block_until_service_status_check_running_fail(self):
self.block_until_service_status_base({'Stdout': ''})
with self.assertRaises(asyncio.futures.TimeoutError):
model.block_until_service_status(
'modelname',
'app/2',
['test_svc'],
'running')
def test_block_until_service_status_check_stopped(self):
self.block_until_service_status_base({'Stdout': ''})
model.block_until_service_status(
'modelname',
'app/2',
['test_svc'],
'stopped')
def test_block_until_service_status_check_stopped_fail(self):
self.block_until_service_status_base({'Stdout': '152 409 54'})
with self.assertRaises(asyncio.futures.TimeoutError):
model.block_until_service_status(
'modelname',
'app/2',
['test_svc'],
'stopped')
def test_get_unit_time(self):
async def _run_on_unit(model_name, unit_name, cmd, timeout=None):
return {'Stdout': '1524409654'}
self.patch_object(model, 'async_run_on_unit')
self.async_run_on_unit.side_effect = _run_on_unit
self.assertEqual(
model.get_unit_time('mymodel', 'app/2'),
1524409654)
def test_get_unit_service_start_time(self):
async def _run_on_unit(model_name, unit_name, cmd, timeout=None):
return {'Stdout': '1524409654'}
self.patch_object(model, 'async_run_on_unit')
self.async_run_on_unit.side_effect = _run_on_unit
self.assertEqual(
model.get_unit_service_start_time('mymodel', 'app/2', 'mysvc1'),
1524409654)
def test_get_unit_service_start_time_not_running(self):
async def _run_on_unit(model_name, unit_name, cmd, timeout=None):
return {'Stdout': ''}
self.patch_object(model, 'async_run_on_unit')
self.async_run_on_unit.side_effect = _run_on_unit
with self.assertRaises(model.ServiceNotRunning):
model.get_unit_service_start_time('mymodel', 'app/2', 'mysvc1')
def block_until_oslo_config_entries_match_base(self, file_contents,
expected_contents):
async def _scp_from(remote_file, tmpdir):
with open('{}/myfile.txt'.format(tmpdir), 'w') as f:
f.write(file_contents)
self.patch_object(model, 'Model')
self.Model.return_value = self.Model_mock
self.unit1.scp_from.side_effect = _scp_from
self.unit2.scp_from.side_effect = _scp_from
model.block_until_oslo_config_entries_match(
'modelname',
'app',
'/tmp/src/myfile.txt',
expected_contents,
timeout=0.1)
def test_block_until_oslo_config_entries_match(self):
file_contents = """
[DEFAULT]
verbose = False
use_syslog = False
debug = False
workers = 4
bind_host = 0.0.0.0
[glance_store]
filesystem_store_datadir = /var/lib/glance/images/
stores = glance.store.filesystem.Store,glance.store.http.Store
default_store = file
[image_format]
disk_formats = ami,ari,aki,vhd,vmdk,raw,qcow2,vdi,iso,root-tar
"""
expected_contents = {
'DEFAULT': {
'debug': ['False']},
'glance_store': {
'filesystem_store_datadir': ['/var/lib/glance/images/'],
'default_store': ['file']}}
self.block_until_oslo_config_entries_match_base(
file_contents,
expected_contents)
self.unit1.scp_from.assert_called_once_with(
'/tmp/src/myfile.txt', mock.ANY)
self.unit2.scp_from.assert_called_once_with(
'/tmp/src/myfile.txt', mock.ANY)
def test_block_until_oslo_config_entries_match_fail(self):
file_contents = """
[DEFAULT]
verbose = False
use_syslog = False
debug = True
workers = 4
bind_host = 0.0.0.0
[glance_store]
filesystem_store_datadir = /var/lib/glance/images/
stores = glance.store.filesystem.Store,glance.store.http.Store
default_store = file
[image_format]
disk_formats = ami,ari,aki,vhd,vmdk,raw,qcow2,vdi,iso,root-tar
"""
expected_contents = {
'DEFAULT': {
'debug': ['False']},
'glance_store': {
'filesystem_store_datadir': ['/var/lib/glance/images/'],
'default_store': ['file']}}
with self.assertRaises(asyncio.futures.TimeoutError):
self.block_until_oslo_config_entries_match_base(
file_contents,
expected_contents)
self.unit1.scp_from.assert_called_once_with(
'/tmp/src/myfile.txt', mock.ANY)
def test_block_until_oslo_config_entries_match_missing_entry(self):
file_contents = """
[DEFAULT]
verbose = False
use_syslog = False
workers = 4
bind_host = 0.0.0.0
[glance_store]
filesystem_store_datadir = /var/lib/glance/images/
stores = glance.store.filesystem.Store,glance.store.http.Store
default_store = file
[image_format]
disk_formats = ami,ari,aki,vhd,vmdk,raw,qcow2,vdi,iso,root-tar
"""
expected_contents = {
'DEFAULT': {
'debug': ['False']},
'glance_store': {
'filesystem_store_datadir': ['/var/lib/glance/images/'],
'default_store': ['file']}}
with self.assertRaises(asyncio.futures.TimeoutError):
self.block_until_oslo_config_entries_match_base(
file_contents,
expected_contents)
self.unit1.scp_from.assert_called_once_with(
'/tmp/src/myfile.txt', mock.ANY)
def test_block_until_oslo_config_entries_match_missing_section(self):
file_contents = """
[DEFAULT]
verbose = False
use_syslog = False
workers = 4
bind_host = 0.0.0.0
[image_format]
disk_formats = ami,ari,aki,vhd,vmdk,raw,qcow2,vdi,iso,root-tar
"""
expected_contents = {
'DEFAULT': {
'debug': ['False']},
'glance_store': {
'filesystem_store_datadir': ['/var/lib/glance/images/'],
'default_store': ['file']}}
with self.assertRaises(asyncio.futures.TimeoutError):
self.block_until_oslo_config_entries_match_base(
file_contents,
expected_contents)
self.unit1.scp_from.assert_called_once_with(
'/tmp/src/myfile.txt', mock.ANY)
def block_until_services_restarted_base(self, gu_return=None,
gu_raise_exception=False):
async def _block_until(f, timeout=None):
rc = await f()
if not rc:
raise asyncio.futures.TimeoutError
self.patch_object(model, 'async_block_until')
self.async_block_until.side_effect = _block_until
async def _async_get_unit_service_start_time(model_name, unit, svc):
if gu_raise_exception:
raise model.ServiceNotRunning('sv1')
else:
return gu_return
self.patch_object(model, 'async_get_unit_service_start_time')
self.async_get_unit_service_start_time.side_effect = \
_async_get_unit_service_start_time
self.patch_object(model, 'Model')
self.Model.return_value = self.Model_mock
def test_block_until_services_restarted(self):
self.block_until_services_restarted_base(gu_return=10)
model.block_until_services_restarted(
'modelname',
'app',
8,
['svc1', 'svc2'])
def test_block_until_services_restarted_fail(self):
self.block_until_services_restarted_base(gu_return=10)
with self.assertRaises(asyncio.futures.TimeoutError):
model.block_until_services_restarted(
'modelname',
'app',
12,
['svc1', 'svc2'])
def test_block_until_services_restarted_not_running(self):
self.block_until_services_restarted_base(gu_raise_exception=True)
with self.assertRaises(asyncio.futures.TimeoutError):
model.block_until_services_restarted(
'modelname',
'app',
12,
['svc1', 'svc2'])
def test_block_until_unit_wl_status(self):
async def _block_until(f, timeout=None):
if not f():
raise asyncio.futures.TimeoutError
self.patch_object(model, 'Model')
self.Model.return_value = self.Model_mock
self.Model_mock.block_until.side_effect = _block_until
self.patch_object(model, 'get_unit_from_name')
self.get_unit_from_name.return_value = mock.MagicMock(
workload_status='active')
model.block_until_unit_wl_status(
'modelname',
'app/2',
'active',
timeout=0.1)
def test_block_until_unit_wl_status_fail(self):
async def _block_until(f, timeout=None):
if not f():
raise asyncio.futures.TimeoutError
self.patch_object(model, 'Model')
self.Model.return_value = self.Model_mock
self.Model_mock.block_until.side_effect = _block_until
self.patch_object(model, 'get_unit_from_name')
self.get_unit_from_name.return_value = mock.MagicMock(
workload_status='maintenance')
with self.assertRaises(asyncio.futures.TimeoutError):
model.block_until_unit_wl_status(
'modelname',
'app/2',
'active',
timeout=0.1)
class AsyncModelTests(aiounittest.AsyncTestCase):

View File

@@ -5,6 +5,7 @@ import os
import subprocess
import tempfile
import yaml
from oslo_config import cfg
from juju import loop
from juju.errors import JujuError
@@ -183,6 +184,54 @@ async def async_run_on_unit(model_name, unit_name, command, timeout=None):
run_on_unit = sync_wrapper(async_run_on_unit)
async def async_get_unit_time(model_name, unit_name, timeout=None):
""" Get the current time (in seconds since Epoch) on the given unit
:param model_name: Name of model to query.
:type model_name: str
:param unit_name: Name of unit to run action on
:type unit_name: str
:returns: time in seconds since Epoch on unit
:rtype: int
"""
out = await async_run_on_unit(
model_name,
unit_name,
"date +'%s'",
timeout=timeout)
return int(out['Stdout'])
get_unit_time = sync_wrapper(async_get_unit_time)
async def async_get_unit_service_start_time(model_name, unit_name, service,
timeout=None):
"""Return the time (in seconds since Epoch) that the given service was
started on the given unit. If the service is not running return 0
:param model_name: Name of model to query.
:type model_name: str
:param unit_name: Name of unit to run action on
:type unit_name: str
:param service: Name of service to check is running
:type service: str
:param timeout: Time to wait for status to be achieved
:type timeout: int
:returns: time in seconds since Epoch on unit
:rtype: int
:raises: ServiceNotRunning
"""
cmd = "stat -c %Y /proc/$(pidof -x {} | cut -f1 -d ' ')".format(service)
out = await async_run_on_unit(model_name, unit_name, cmd, timeout=timeout)
out = out['Stdout'].strip()
if out:
return int(out)
else:
raise ServiceNotRunning(service)
get_unit_service_start_time = sync_wrapper(async_get_unit_service_start_time)
async def async_get_application(model_name, application_name):
"""Return an application object
@@ -370,7 +419,6 @@ run_action_on_leader = sync_wrapper(async_run_action_on_leader)
class UnitError(Exception):
"""Exception raised for units in error state
"""
def __init__(self, units):
@@ -379,6 +427,15 @@ class UnitError(Exception):
super(UnitError, self).__init__(message)
class ServiceNotRunning(Exception):
"""Exception raised when service not running
"""
def __init__(self, service):
message = "Service {} not running".format(service)
super(ServiceNotRunning, self).__init__(message)
def units_with_wl_status_state(model, state):
"""Return a list of unit which have a matching workload status
@@ -475,7 +532,7 @@ async def async_wait_for_application_states(model_name, states=None,
:param model_name: Name of model to query.
:type model_name: str
:param states: Staes to look for
:param states: States to look for
:type states: dict
:param timeout: Time to wait for status to be achieved
:type timeout: int
@@ -521,6 +578,55 @@ async def async_wait_for_application_states(model_name, states=None,
wait_for_application_states = sync_wrapper(async_wait_for_application_states)
async def async_block_until_all_units_idle(model_name, timeout=2700):
"""Block until all units in the given model are idle
:param model_name: Name of model to query.
:type model_name: str
:param timeout: Time to wait for status to be achieved
:type timeout: int
"""
async with run_in_model(model_name) as model:
await model.block_until(
lambda: model.all_units_idle(), timeout=timeout)
block_until_all_units_idle = sync_wrapper(async_block_until_all_units_idle)
async def async_block_until_service_status(model_name, unit_name, services,
target_status, timeout=2700):
"""Block until all services on the unit are in the desired state (stopped
or running)
:param model_name: Name of model to query.
:type model_name: str
:param unit_name: Name of unit to run action on
:type unit_name: str
:param services: List of services to check
:type services: []
:param target_status: State services should be in (stopped or running)
:type target_status: str
:param timeout: Time to wait for status to be achieved
:type timeout: int
"""
async def _check_service():
for service in services:
out = await async_run_on_unit(
model_name,
unit_name,
"pidof -x {}".format(service),
timeout=timeout)
if target_status == "running" and len(out['Stdout'].strip()) == 0:
return False
elif target_status == "stopped" and len(out['Stdout'].strip()) > 0:
return False
return True
async with run_in_model(model_name):
await async_block_until(_check_service, timeout=timeout)
block_until_service_status = sync_wrapper(async_block_until_service_status)
def get_actions(model_name, application_name):
"""Get the actions an applications supports
@@ -654,6 +760,125 @@ block_until_file_has_contents = sync_wrapper(
async_block_until_file_has_contents)
async def async_block_until_oslo_config_entries_match(model_name,
application_name,
remote_file,
expected_contents,
timeout=2700):
"""Block until the expected_contents are in the given file on all units of
the application
:param model_name: Name of model to query.
:type model_name: str
:param application_name: Name of application
:type application_name: str
:param remote_file: Remote path of file to transfer
:type remote_file: str
:param expected_contents: The key values pairs in their corresponding
sections to be looked for in the remote_file
:type expected_contents: {}
:param timeout: Time to wait for contents to appear in file
:type timeout: float
For example to check for,
[DEFAULT]
debug = False
[glance_store]
filesystem_store_datadir = /var/lib/glance/images/
default_store = file
use:
expected_contents = {
'DEFAULT': {
'debug': ['False']},
'glance_store': {
'filesystem_store_datadir': ['/var/lib/glance/images/'],
'default_store': ['file']}}
"""
def f(x):
# Writing out the file that was just read is suboptimal
with tempfile.NamedTemporaryFile(mode='w', delete=True) as fp:
fp.write(x)
fp.seek(0)
sections = {}
parser = cfg.ConfigParser(fp.name, sections)
parser.parse()
for section, entries in expected_contents.items():
for key, value in entries.items():
if sections.get(section, {}).get(key) != value:
return False
return True
return await async_block_until_file_ready(model_name, application_name,
remote_file, f, timeout)
block_until_oslo_config_entries_match = sync_wrapper(
async_block_until_oslo_config_entries_match)
async def async_block_until_services_restarted(model_name, application_name,
mtime, services, timeout=2700):
"""Block until the given services have a start time later then mtime
:param model_name: Name of model to query.
:type model_name: str
:param application_name: Name of application
:type application_name: str
:param mtime: Time in seconds since Epoch to check against
:type mtime: int
:param services: Listr of services to check restart time of
:type services: []
:param timeout: Time to wait for services to be restarted
:type timeout: float
"""
async def _check_service():
units = model.applications[application_name].units
for unit in units:
for service in services:
try:
svc_mtime = await async_get_unit_service_start_time(
model_name,
unit.entity_id,
service)
except ServiceNotRunning:
return False
if svc_mtime < mtime:
return False
return True
async with run_in_model(model_name) as model:
await async_block_until(_check_service, timeout=timeout)
block_until_services_restarted = sync_wrapper(
async_block_until_services_restarted)
async def async_block_until_unit_wl_status(model_name, unit_name, status,
timeout=2700):
"""Block until the given unit has the desired workload status
:param model_name: Name of model to query.
:type model_name: str
:param unit_name: Name of unit to run action on
:type unit_name: str
:param status: Status to wait for (active, maintenance etc)
:type status: str
:param timeout: Time to wait for unit to achieved desired status
:type timeout: float
"""
async with run_in_model(model_name) as model:
unit = get_unit_from_name(unit_name, model)
print(unit.workload_status)
await model.block_until(
lambda: unit.workload_status == status,
timeout=timeout
)
block_until_unit_wl_status = sync_wrapper(
async_block_until_unit_wl_status)
async def async_get_relation_id(model_name, application_name,
remote_application_name,
remote_interface_name=None):