Add unit tests

This commit is contained in:
Liam Young
2021-01-25 09:21:00 +00:00
parent 401829f0a7
commit e047150f5b
4 changed files with 190 additions and 25 deletions

View File

@@ -17,6 +17,8 @@ import datetime
import io
import mock
import subprocess
import sys
import unittest
import tenacity
import unit_tests.utils as ut_utils
@@ -191,6 +193,7 @@ class TestOpenStackUtils(ut_utils.BaseTestCase):
self.patch_object(openstack_utils, 'get_application_config_option')
self.patch_object(openstack_utils, 'get_keystone_ip')
self.patch_object(openstack_utils, "get_current_os_versions")
self.patch_object(openstack_utils, "get_remote_ca_cert_file")
self.patch_object(openstack_utils.juju_utils, 'leader_get')
if tls_relation:
self.patch_object(openstack_utils.model, "scp_from_unit")
@@ -204,6 +207,7 @@ class TestOpenStackUtils(ut_utils.BaseTestCase):
self.get_relation_id.return_value = None
self.get_application_config_option.return_value = None
self.leader_get.return_value = 'openstack'
self.get_remote_ca_cert_file.return_value = None
if tls_relation or ssl_cert:
port = 35357
transport = 'https'
@@ -245,7 +249,8 @@ class TestOpenStackUtils(ut_utils.BaseTestCase):
'API_VERSION': 3,
}
if tls_relation:
expect['OS_CACERT'] = openstack_utils.KEYSTONE_LOCAL_CACERT
self.get_remote_ca_cert_file.return_value = '/tmp/a.cert'
expect['OS_CACERT'] = '/tmp/a.cert'
self.assertEqual(openstack_utils.get_overcloud_auth(),
expect)
@@ -1327,3 +1332,153 @@ class TestOpenStackUtils(ut_utils.BaseTestCase):
mock.ANY,
'bridge-interface-mappings',
{'ovn-bridge-mappings': 'physnet1:br-ex'}))
def test_get_cacert(self):
self.patch_object(openstack_utils.os.path, 'exists')
results = {
'tests/vault_juju_ca_cert.crt': True}
self.exists.side_effect = lambda x: results[x]
self.assertEqual(
openstack_utils.get_cacert(),
'tests/vault_juju_ca_cert.crt')
results = {
'tests/vault_juju_ca_cert.crt': False,
'tests/keystone_juju_ca_cert.crt': True}
self.assertEqual(
openstack_utils.get_cacert(),
'tests/keystone_juju_ca_cert.crt')
results = {
'tests/vault_juju_ca_cert.crt': False,
'tests/keystone_juju_ca_cert.crt': False}
self.assertIsNone(openstack_utils.get_cacert())
def test_get_remote_ca_cert_file(self):
self.patch_object(openstack_utils.model, 'get_first_unit_name')
self.patch_object(
openstack_utils,
'_get_remote_ca_cert_file_candidates')
self.patch_object(openstack_utils.model, 'scp_from_unit')
self.patch_object(openstack_utils.os.path, 'exists')
self.patch_object(openstack_utils.shutil, 'move')
self.patch_object(openstack_utils.os, 'chmod')
self.patch_object(openstack_utils.tempfile, 'NamedTemporaryFile')
enter_mock = mock.MagicMock()
enter_mock.__enter__.return_value.name = 'tempfilename'
self.NamedTemporaryFile.return_value = enter_mock
self.get_first_unit_name.return_value = 'neutron-api/0'
self._get_remote_ca_cert_file_candidates.return_value = [
'/tmp/ca1.cert']
self.exists.return_value = True
openstack_utils.get_remote_ca_cert_file('neutron-api')
self.scp_from_unit.assert_called_once_with(
'neutron-api/0',
'/tmp/ca1.cert',
'tempfilename')
self.chmod.assert_called_once_with('tests/ca1.cert', 0o644)
self.move.assert_called_once_with('tempfilename', 'tests/ca1.cert')
class TestAsyncOpenstackUtils(ut_utils.AioTestCase):
def setUp(self):
super(TestAsyncOpenstackUtils, self).setUp()
if sys.version_info < (3, 6, 0):
raise unittest.SkipTest("Can't AsyncMock in py35")
model_mock = mock.MagicMock()
test_mock = mock.MagicMock()
class AsyncContextManagerMock(test_mock):
async def __aenter__(self):
yield model_mock
async def __aexit__(self, *args):
pass
self.model_mock = model_mock
self.patch_object(openstack_utils.zaza.model, "async_block_until")
async def _block_until(f, timeout):
# Store the result of the call to _check_ca_present to validate
# tests
self.result = await f()
self.async_block_until.side_effect = _block_until
self.patch('zaza.model.run_in_model', name='_run_in_model')
self._run_in_model.return_value = AsyncContextManagerMock
self._run_in_model().__aenter__.return_value = self.model_mock
async def test_async_block_until_ca_exists(self):
def _get_action_output(stdout, code, stderr=None):
stderr = stderr or ''
action = mock.MagicMock()
action.data = {
'results': {
'Code': code,
'Stderr': stderr,
'Stdout': stdout}}
return action
results = {
'/tmp/missing.cert': _get_action_output(
'',
'1',
'cat: /tmp/missing.cert: No such file or directory'),
'/tmp/good.cert': _get_action_output('CERTIFICATE', '0')}
async def _run(command, timeout=None):
return results[command.split()[-1]]
self.unit1 = mock.MagicMock()
self.unit2 = mock.MagicMock()
self.unit2.run.side_effect = _run
self.unit1.run.side_effect = _run
self.units = [self.unit1, self.unit2]
_units = mock.MagicMock()
_units.units = self.units
self.model_mock.applications = {
'keystone': _units
}
self.patch_object(
openstack_utils,
"_async_get_remote_ca_cert_file_candidates")
# Test a missing cert then a good cert.
self._async_get_remote_ca_cert_file_candidates.return_value = [
'/tmp/missing.cert',
'/tmp/good.cert']
await openstack_utils.async_block_until_ca_exists(
'keystone',
'CERTIFICATE')
self.assertTrue(self.result)
# Test a single missing
self._async_get_remote_ca_cert_file_candidates.return_value = [
'/tmp/missing.cert']
await openstack_utils.async_block_until_ca_exists(
'keystone',
'CERTIFICATE')
self.assertFalse(self.result)
async def test__async_get_remote_ca_cert_file_candidates(self):
self.patch_object(openstack_utils.zaza.model, "async_get_relation_id")
rel_id_out = {
}
def _get_relation_id(app, cert_app, model_name, remote_interface_name):
return rel_id_out[cert_app]
self.async_get_relation_id.side_effect = _get_relation_id
rel_id_out['vault'] = 'certs:1'
r = await openstack_utils._async_get_remote_ca_cert_file_candidates(
'neutron-api', 'mymodel')
self.assertEqual(
r,
['/usr/local/share/ca-certificates/vault_juju_ca_cert.crt',
'/usr/local/share/ca-certificates/keystone_juju_ca_cert.crt'])
rel_id_out['vault'] = None
r = await openstack_utils._async_get_remote_ca_cert_file_candidates(
'neutron-api', 'mymodel')
self.assertEqual(
r,
['/usr/local/share/ca-certificates/keystone_juju_ca_cert.crt'])

View File

@@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import mock
import sys
import unittest
@@ -139,28 +138,7 @@ class Test_ParallelSeriesUpgradeSync(ut_utils.BaseTestCase):
self.assertEqual(expected, config)
class AioTestCase(ut_utils.BaseTestCase):
def __init__(self, methodName='runTest', loop=None):
self.loop = loop or asyncio.get_event_loop()
self._function_cache = {}
super(AioTestCase, self).__init__(methodName=methodName)
def coroutine_function_decorator(self, func):
def wrapper(*args, **kw):
return self.loop.run_until_complete(func(*args, **kw))
return wrapper
def __getattribute__(self, item):
attr = object.__getattribute__(self, item)
if asyncio.iscoroutinefunction(attr) and item.startswith('test_'):
if item not in self._function_cache:
self._function_cache[item] = (
self.coroutine_function_decorator(attr))
return self._function_cache[item]
return attr
class TestParallelSeriesUpgrade(AioTestCase):
class TestParallelSeriesUpgrade(ut_utils.AioTestCase):
def setUp(self):
super(TestParallelSeriesUpgrade, self).setUp()
if sys.version_info < (3, 6, 0):

View File

@@ -19,6 +19,7 @@
"""Module to provide helper for writing unit tests."""
import asyncio
import contextlib
import io
import mock
@@ -96,3 +97,24 @@ class BaseTestCase(unittest.TestCase):
started.return_value = return_value
self._patches_start[name] = started
setattr(self, name, started)
class AioTestCase(BaseTestCase):
def __init__(self, methodName='runTest', loop=None):
self.loop = loop or asyncio.get_event_loop()
self._function_cache = {}
super(AioTestCase, self).__init__(methodName=methodName)
def coroutine_function_decorator(self, func):
def wrapper(*args, **kw):
return self.loop.run_until_complete(func(*args, **kw))
return wrapper
def __getattribute__(self, item):
attr = object.__getattribute__(self, item)
if asyncio.iscoroutinefunction(attr) and item.startswith('test_'):
if item not in self._function_cache:
self._function_cache[item] = (
self.coroutine_function_decorator(attr))
return self._function_cache[item]
return attr

View File

@@ -222,6 +222,7 @@ async def async_block_until_ca_exists(application_name, ca_cert,
except JujuError:
continue
else:
# The CA was found in `ca_file` on all units.
return True
else:
return False
@@ -2012,6 +2013,15 @@ def get_overcloud_auth(address=None, model_name=None):
async def _async_get_remote_ca_cert_file_candidates(application,
model_name=None):
"""Return a list of possible remote CA file names.
:param application: Name of application to examine.
:type application: str
:param model_name: Name of model to query.
:type model_name: str
:returns: List of paths to possible ca files.
:rtype: List[str]
"""
cert_files = []
for _provider in CERT_PROVIDERS:
tls_rid = await model.async_get_relation_id(
@@ -2023,7 +2033,7 @@ async def _async_get_remote_ca_cert_file_candidates(application,
cert_files.append(
REMOTE_CERT_DIR + '/' + CACERT_FILENAME_FORMAT.format(
_provider))
cert_files.append(KEYSTONE_LOCAL_CACERT)
cert_files.append(KEYSTONE_REMOTE_CACERT)
return cert_files
_get_remote_ca_cert_file_candidates = zaza.model.sync_wrapper(