diff --git a/unit_tests/utilities/test_zaza_utilities_file_assertions.py b/unit_tests/utilities/test_zaza_utilities_file_assertions.py new file mode 100644 index 0000000..b40276c --- /dev/null +++ b/unit_tests/utilities/test_zaza_utilities_file_assertions.py @@ -0,0 +1,64 @@ +# Copyright 2018 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +import unit_tests.utils as ut_utils +import zaza.utilities.file_assertions as file_assertions + + +class TestFileAssertionUtils(ut_utils.BaseTestCase): + def setUp(self): + super(TestFileAssertionUtils, self).setUp() + # Patch all run_on_unit calls + self.patch( + 'zaza.utilities.file_assertions.model.run_on_unit', + new_callable=mock.MagicMock(), + name='run_on_unit' + ) + self._assert = mock.MagicMock() + self._assert.assertEqual = mock.MagicMock() + + def test_path_glob(self): + self.run_on_unit.return_value = { + 'Stdout': 'file-name root root 600' + } + file_details = {'path': '*'} + file_assertions.assert_path_glob( + self._assert, 'test/0', file_details) + self.run_on_unit.assert_called_once_with( + 'test/0', 'bash -c "shopt -s -q globstar;' + ' stat -c "%n %U %G %a" *"') + + def test_single_path(self): + self.run_on_unit.return_value = { + 'Stdout': 'root root 600' + } + file_details = {'path': 'test'} + file_assertions.assert_single_file( + self._assert, 'test/0', file_details) + self.run_on_unit.assert_called_once_with( + 'test/0', 'stat -c "%U %G %a" test') + + def test_error_message_glob(self): + message = file_assertions._error_message( + "Owner", "test/0", "root", "/path/to/something") + self.assertEqual( + message, + "Owner is incorrect for /path/to/something on test/0: root") + + def test_error_message_single(self): + + message = file_assertions._error_message( + "Owner", "test/0", "root") + self.assertEqual(message, "Owner is incorrect on test/0: root") diff --git a/zaza/charm_tests/neutron/setup.py b/zaza/charm_tests/neutron/setup.py index d4fcd63..be9ae72 100644 --- a/zaza/charm_tests/neutron/setup.py +++ b/zaza/charm_tests/neutron/setup.py @@ -25,6 +25,8 @@ from zaza.utilities import ( juju as juju_utils, openstack as openstack_utils, ) +import zaza.model as model + # The overcloud network configuration settings are declared. # These are the network configuration settings under test. @@ -72,6 +74,10 @@ def basic_overcloud_network(): network_config.update(DEFAULT_UNDERCLOUD_NETWORK_CONFIG) # Environment specific settings network_config.update(generic_utils.get_undercloud_env_vars()) + # Deployed model settings + if (model.get_application_config('neutron-api') + .get('enable-dvr').get('value')): + network_config.update({"dvr_enabled": True}) # Get keystone session keystone_session = openstack_utils.get_overcloud_keystone_session() diff --git a/zaza/charm_tests/security/tests.py b/zaza/charm_tests/security/tests.py index 51c685e..bc203f6 100644 --- a/zaza/charm_tests/security/tests.py +++ b/zaza/charm_tests/security/tests.py @@ -20,35 +20,36 @@ import unittest import zaza.model as model import zaza.charm_lifecycle.utils as utils +from zaza.utilities.file_assertions import ( + assert_path_glob, + assert_single_file, +) -def _make_test_function(application, file_details): +def _make_test_function(application, file_details, paths=None): + """Generate a test function given the specified inputs. + + :param application: Application name to assert file ownership on + :type application: str + :param file_details: Dictionary of file details to test + :type file_details: dict + :param paths: List of paths to test in this application + :type paths: Optional[list(str)] + :returns: Test function + :rtype: unittest.TestCase + """ def test(self): - expected_owner = file_details.get("owner", "root") - expected_group = file_details.get("group", "root") - expected_mode = file_details.get("mode", "600") for unit in model.get_units(application): unit = unit.entity_id - result = model.run_on_unit( - unit, 'stat -c "%U %G %a" {}'.format(file_details['path'])) - ownership = result['Stdout'] - owner, group, mode = ownership.split() - self.assertEqual(expected_owner, - owner, - "Owner is incorrect for {}: {}" - .format(unit, owner)) - self.assertEqual(expected_group, - group, - "Group is incorrect for {}: {}" - .format(unit, group)) - self.assertEqual(expected_mode, - mode, - "Mode is incorrect for {}: {}" - .format(unit, mode)) + if '*' in file_details['path']: + assert_path_glob(self, unit, file_details, paths) + else: + assert_single_file(self, unit, file_details) return test def _add_tests(): + """Add tests to the unittest.TestCase.""" def class_decorator(cls): """Add tests based on input yaml to `cls`.""" files = utils.get_charm_config('./file-assertions.yaml') @@ -56,8 +57,13 @@ def _add_tests(): for name, attributes in files.items(): # Lets make sure to only add tests for deployed applications if name in deployed_applications: + paths = [ + file['path'] for + file in attributes['files'] + if "*" not in file["path"] + ] for file in attributes['files']: - test_func = _make_test_function(name, file) + test_func = _make_test_function(name, file, paths=paths) setattr( cls, 'test_{}_{}'.format(name, file['path']), diff --git a/zaza/utilities/file_assertions.py b/zaza/utilities/file_assertions.py new file mode 100644 index 0000000..ae8a33e --- /dev/null +++ b/zaza/utilities/file_assertions.py @@ -0,0 +1,123 @@ +# Copyright 2018 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Module of helpers for Zaza file assertions.""" + +import zaza.model as model + + +def assert_path_glob(test_case, unit, file_details, paths=None): + """Verify all files in a given directory. + + :param test_case: Test case that we are asserting in + :type test_case: unittest.TestCase + :param unit: Unit name to operate on + :type unit: str + :param file_details: Dictionary with details of the file + :type file_details: dict + :param paths: list of paths that are explicitly tested + :type paths: list + :returns: Nothing + :rtype: None + """ + if not paths: + paths = [] + result = model.run_on_unit( + unit, 'bash -c "' + 'shopt -s -q globstar; ' + 'stat -c "%n %U %G %a" {}"'.format(file_details['path'])) + files = result['Stdout'] + for file in files.splitlines(): + file, owner, group, mode = file.split() + if file not in paths and file not in ['.', '..']: + _verify_file(test_case, + unit, + file_details, + owner, + group, + mode, + path=file) + + +def assert_single_file(test_case, unit, file_details): + """Verify ownership of a single file. + + :param test_case: Test case that we are asserting in + :type test_case: unittest.TestCase + :param unit: Unit name to operate on + :type unit: str + :param file_details: Dictionary with details of the file + :type file_details: dict + :returns: Nothing + :rtype: None + """ + result = model.run_on_unit( + unit, 'stat -c "%U %G %a" {}'.format(file_details['path'])) + ownership = result['Stdout'] + owner, group, mode = ownership.split() + _verify_file(test_case, unit, file_details, owner, group, mode) + + +def _verify_file(test_case, unit, file_details, + actual_owner, actual_group, actual_mode, path=None): + """Assert file has correct permissions. + + :param test_case: Test case that we are asserting in + :type test_case: unittest.TestCase + :param unit: Unit name to operate on + :type unit: str + :param file_details: Dictionary with details of the file + :type file_details: dict + :param actual_owner: Owner of the file + :type actual_owner str + :param actual_group: Group of the file + :type actual_group str + :param actual_mode: Mode of the file + :type actual_mode str + :returns: Nothing + :rtype: None + """ + expected_owner = file_details.get("owner", "root") + expected_group = file_details.get("group", "root") + expected_mode = file_details.get("mode", "600") + test_case.assertEqual(expected_owner, + actual_owner, + _error_message("Owner", unit, actual_owner, path)) + test_case.assertEqual(expected_group, + actual_group, + _error_message("Group", unit, actual_group, path)) + test_case.assertEqual(expected_mode, + actual_mode, + _error_message("Mode", unit, actual_mode, path)) + + +def _error_message(thing, unit, value, path=None): + """Format assertion error based on presence of path. + + :param thing: Ownership type + :type thing: str + :param unit: Unit tested + :type unit: str + :param value: Actual value from test + :type value: str + :param path: Path tested + :type path: Optional[str] + :returns: Erorr Message + :rtype: str + """ + if path: + return "{} is incorrect for {} on {}: {}".format( + thing, path, unit, value) + else: + return "{} is incorrect on {}: {}".format(thing, unit, value) diff --git a/zaza/utilities/openstack.py b/zaza/utilities/openstack.py index 139afae..2a71157 100644 --- a/zaza/utilities/openstack.py +++ b/zaza/utilities/openstack.py @@ -238,15 +238,17 @@ def get_octavia_session_client(session, service_type='load-balancer', endpoint=endpoint.url) -def get_cinder_session_client(session): +def get_cinder_session_client(session, version=2): """Return cinderclient authenticated by keystone session. :param session: Keystone session object :type session: keystoneauth1.session.Session object + :param version: Cinder API version + :type version: int :returns: Authenticated cinderclient :rtype: cinderclient.Client object """ - return cinderclient.Client(session=session) + return cinderclient.Client(session=session, version=version) def get_keystone_scope():