Add neutron-api charm policyd test
This commit is contained in:
@@ -25,22 +25,6 @@ The Policyd Tests test the following:
|
||||
verify that the charm has implemented policy overrides and ensured that the
|
||||
service actually picks them up).
|
||||
|
||||
In order to use the generic tests, just include them in the specific test
|
||||
class. The KeystonePolicydTest as an example does:
|
||||
|
||||
class KeystonePolicydTest(PolicydTest,
|
||||
ch_keystone.BaseKeystoneTest,
|
||||
test_utils.OpenStackBaseTest):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls, application_name=None):
|
||||
super(KeystonePolicydTest, cls).setUpClass(application_name)
|
||||
|
||||
Note that the generic test class (PolicyDTest) comes first, and then the
|
||||
ch_keystone.BaseKeystoneTest, followed by the test_utils.OpenStackBaseTest.
|
||||
This is to get the order of super().setUpClass(...) calls to work with
|
||||
application_name.
|
||||
|
||||
If a charm doesn't require a specific test, then the GenericPolicydTest class
|
||||
can be used that just includes the two generic tests. The config in the
|
||||
tests.yaml would stil be required. See the PolicydTest class docstring for
|
||||
@@ -54,6 +38,7 @@ import tempfile
|
||||
import zipfile
|
||||
|
||||
import keystoneauth1
|
||||
import neutronclient.exceptions
|
||||
|
||||
import zaza.model as zaza_model
|
||||
|
||||
@@ -127,47 +112,6 @@ class PolicydTest(object):
|
||||
zaza_model.block_until_wl_status_info_starts_with(
|
||||
self.application_name, "PO:", negate_match=False)
|
||||
|
||||
def test_002_policyd_bad_yaml(self):
|
||||
"""Test bad yaml file in the zip file is handled."""
|
||||
bad = {
|
||||
"file2.yaml": "{'rule': '!}"
|
||||
}
|
||||
bad_zip_path = self._make_zip_file_from('bad.zip', bad)
|
||||
logging.info("Attaching bad zip file as a resource")
|
||||
zaza_model.attach_resource(self.application_name,
|
||||
'policyd-override',
|
||||
bad_zip_path)
|
||||
zaza_model.block_until_all_units_idle()
|
||||
logging.debug("Now setting config to true")
|
||||
self._set_config(True)
|
||||
# ensure that the workload status info line starts with PO (broken):
|
||||
# to show that it didn't work
|
||||
logging.info(
|
||||
"Checking for workload status line starts with PO (broken):")
|
||||
zaza_model.block_until_wl_status_info_starts_with(
|
||||
self.application_name, "PO (broken):")
|
||||
logging.debug("App status is valid for broken yaml file")
|
||||
zaza_model.block_until_all_units_idle()
|
||||
# now verify that no file got landed on the machine
|
||||
path = os.path.join(
|
||||
"/etc", self._service_name, "policy.d", 'file2.yaml')
|
||||
logging.info("Now checking that file {} is not present.".format(path))
|
||||
zaza_model.block_until_file_missing(self.application_name, path)
|
||||
self._set_config(False)
|
||||
zaza_model.block_until_all_units_idle()
|
||||
logging.info("OK")
|
||||
|
||||
|
||||
class GenericPolicydTest(PolicydTest, test_utils.OpenStackBaseTest):
|
||||
"""Generic policyd test for any charm without a specific test.
|
||||
|
||||
It includes a single additional test to test_002_policyd_bad_yaml() in the
|
||||
base class PolicydTest which checks that a good yaml file is placed into
|
||||
the correct directory. This additional test (test_001_policyd_good_yaml)
|
||||
is not needed in the specialised tests as the feature is implicitly tested
|
||||
when the specialised test fails with the rule overrided.
|
||||
"""
|
||||
|
||||
def test_001_policyd_good_yaml(self):
|
||||
"""Test that the policyd with a good zipped yaml file."""
|
||||
good = {
|
||||
@@ -215,6 +159,42 @@ class GenericPolicydTest(PolicydTest, test_utils.OpenStackBaseTest):
|
||||
|
||||
logging.info("OK")
|
||||
|
||||
def test_002_policyd_bad_yaml(self):
|
||||
"""Test bad yaml file in the zip file is handled."""
|
||||
bad = {
|
||||
"file2.yaml": "{'rule': '!}"
|
||||
}
|
||||
bad_zip_path = self._make_zip_file_from('bad.zip', bad)
|
||||
logging.info("Attaching bad zip file as a resource")
|
||||
zaza_model.attach_resource(self.application_name,
|
||||
'policyd-override',
|
||||
bad_zip_path)
|
||||
zaza_model.block_until_all_units_idle()
|
||||
logging.debug("Now setting config to true")
|
||||
self._set_config(True)
|
||||
# ensure that the workload status info line starts with PO (broken):
|
||||
# to show that it didn't work
|
||||
logging.info(
|
||||
"Checking for workload status line starts with PO (broken):")
|
||||
zaza_model.block_until_wl_status_info_starts_with(
|
||||
self.application_name, "PO (broken):")
|
||||
logging.debug("App status is valid for broken yaml file")
|
||||
zaza_model.block_until_all_units_idle()
|
||||
# now verify that no file got landed on the machine
|
||||
path = os.path.join(
|
||||
"/etc", self._service_name, "policy.d", 'file2.yaml')
|
||||
logging.info("Now checking that file {} is not present.".format(path))
|
||||
zaza_model.block_until_file_missing(self.application_name, path)
|
||||
self._set_config(False)
|
||||
zaza_model.block_until_all_units_idle()
|
||||
logging.info("OK")
|
||||
|
||||
|
||||
class GenericPolicydTest(PolicydTest, test_utils.OpenStackBaseTest):
|
||||
"""Generic policyd test for any charm without a specific test."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class PolicydOperationFailedException(Exception):
|
||||
"""This is raised by the get_client_and_attempt_operation() method.
|
||||
@@ -248,14 +228,22 @@ class BasePolicydSpecialization(PolicydTest,
|
||||
The test will fail if the first call fails for any reason, or if the 2nd
|
||||
call doesn't raise PolicydOperationFailedException or raises any other
|
||||
exception.
|
||||
"""
|
||||
|
||||
To use this class, follow the keystone example:
|
||||
|
||||
class KeystonePolicydTest(BasePolicydSpecialization):
|
||||
|
||||
_rule = "{'identity:list_services': '!'}"
|
||||
|
||||
def get_client_and_attempt_operation(self, keystone_session):
|
||||
... etc.
|
||||
"""
|
||||
# this needs to be defined as the rule that gets placed into a yaml policy
|
||||
# override. It is a string of the form: 'some-rule: "!"'
|
||||
# i.e. disable some policy and then try and test it.
|
||||
_rule = None
|
||||
|
||||
# the name to log at the beginning of the test
|
||||
# Optional: the name to log at the beginning of the test
|
||||
_test_name = None
|
||||
|
||||
@classmethod
|
||||
@@ -275,7 +263,7 @@ class BasePolicydSpecialization(PolicydTest,
|
||||
|
||||
:param keystone_session: the keystone session to use to obtain the
|
||||
client necessary for the test.
|
||||
:type keystone_session: ???
|
||||
:type keystone_session: keystoneauth1.session.Session
|
||||
:raises: PolicydOperationFailedException if operation fails.
|
||||
"""
|
||||
raise NotImplementedError("This method must be overridden")
|
||||
@@ -286,7 +274,7 @@ class BasePolicydSpecialization(PolicydTest,
|
||||
:param ip: the IP address to get the session against.
|
||||
:type ip: str
|
||||
:returns: a keystone session to the IP address
|
||||
:rtype: ???
|
||||
:rtype: keystoneauth1.session.Session
|
||||
"""
|
||||
logging.info('keystone IP {}'.format(ip))
|
||||
openrc = {
|
||||
@@ -374,14 +362,14 @@ class KeystonePolicydTest(BasePolicydSpecialization):
|
||||
_rule = "{'identity:list_services': '!'}"
|
||||
|
||||
def get_client_and_attempt_operation(self, keystone_session):
|
||||
"""Override this method to perform the operation.
|
||||
"""Attempt to list services. If it fails, raise an exception.
|
||||
|
||||
This operation should pass normally for the demo_user, and fail when
|
||||
the rule has been overriden (see the `rule` class variable.
|
||||
|
||||
:param keystone_session: the keystone session to use to obtain the
|
||||
client necessary for the test.
|
||||
:type keystone_session: ???
|
||||
:type keystone_session: keystoneauth1.session.Session
|
||||
:raises: PolicydOperationFailedException if operation fails.
|
||||
"""
|
||||
keystone_client = openstack_utils.get_keystone_session_client(
|
||||
@@ -392,117 +380,25 @@ class KeystonePolicydTest(BasePolicydSpecialization):
|
||||
raise PolicydOperationFailedException()
|
||||
|
||||
|
||||
class OldKeystonePolicydTest(PolicydTest,
|
||||
ch_keystone.BaseKeystoneTest,
|
||||
test_utils.OpenStackBaseTest):
|
||||
"""Specific test for policyd for keystone charm."""
|
||||
class NeutronApiTest(BasePolicydSpecialization):
|
||||
"""Test the policyd override using the keystone client."""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls, application_name=None):
|
||||
"""Run class setup for running KeystonePolicydTest tests."""
|
||||
super(KeystonePolicydTest, cls).setUpClass(application_name)
|
||||
_rule = "{'get_network': '!'}"
|
||||
|
||||
def test_disable_service(self):
|
||||
"""Test that service can be disabled."""
|
||||
logging.info("Doing policyd override to disable listing domains")
|
||||
self._set_policy_with(
|
||||
{'rule.yaml': "{'identity:list_services': '!'}"})
|
||||
|
||||
# verify that the policy.d override does disable the endpoint
|
||||
with self.config_change(
|
||||
{'preferred-api-version': self.default_api_version,
|
||||
'use-policyd-override': 'False'},
|
||||
{'preferred-api-version': '3',
|
||||
'use-policyd-override': 'True'},
|
||||
application_name="keystone"):
|
||||
zaza_model.block_until_all_units_idle()
|
||||
for ip in self.keystone_ips:
|
||||
try:
|
||||
logging.info('keystone IP {}'.format(ip))
|
||||
openrc = {
|
||||
'API_VERSION': 3,
|
||||
'OS_USERNAME': ch_keystone.DEMO_ADMIN_USER,
|
||||
'OS_PASSWORD': ch_keystone.DEMO_ADMIN_USER_PASSWORD,
|
||||
'OS_AUTH_URL': 'http://{}:5000/v3'.format(ip),
|
||||
'OS_USER_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN,
|
||||
'OS_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN,
|
||||
}
|
||||
if self.tls_rid:
|
||||
openrc['OS_CACERT'] = \
|
||||
openstack_utils.KEYSTONE_LOCAL_CACERT
|
||||
openrc['OS_AUTH_URL'] = (
|
||||
openrc['OS_AUTH_URL'].replace('http', 'https'))
|
||||
logging.info('keystone IP {}'.format(ip))
|
||||
keystone_session = openstack_utils.get_keystone_session(
|
||||
openrc, scope='DOMAIN')
|
||||
keystone_client = (
|
||||
openstack_utils.get_keystone_session_client(
|
||||
keystone_session))
|
||||
keystone_client.services.list()
|
||||
raise zaza_exceptions.PolicydError(
|
||||
'Retrieve service list as admin with project scoped '
|
||||
'token passed and should have failed. IP = {}'
|
||||
.format(ip))
|
||||
except keystoneauth1.exceptions.http.Forbidden:
|
||||
logging.info("keystone IP:{} policyd override disabled "
|
||||
"services listing by demo user"
|
||||
.format(ip))
|
||||
|
||||
# now verify (with the config off) that we can actually access
|
||||
# these points
|
||||
with self.config_change(
|
||||
{'preferred-api-version': self.default_api_version},
|
||||
{'preferred-api-version': '3'},
|
||||
application_name="keystone"):
|
||||
zaza_model.block_until_all_units_idle()
|
||||
for ip in self.keystone_ips:
|
||||
try:
|
||||
logging.info('keystone IP {}'.format(ip))
|
||||
openrc = {
|
||||
'API_VERSION': 3,
|
||||
'OS_USERNAME': ch_keystone.DEMO_ADMIN_USER,
|
||||
'OS_PASSWORD': ch_keystone.DEMO_ADMIN_USER_PASSWORD,
|
||||
'OS_AUTH_URL': 'http://{}:5000/v3'.format(ip),
|
||||
'OS_USER_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN,
|
||||
'OS_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN,
|
||||
}
|
||||
if self.tls_rid:
|
||||
openrc['OS_CACERT'] = \
|
||||
openstack_utils.KEYSTONE_LOCAL_CACERT
|
||||
openrc['OS_AUTH_URL'] = (
|
||||
openrc['OS_AUTH_URL'].replace('http', 'https'))
|
||||
logging.info('keystone IP {}'.format(ip))
|
||||
keystone_session = openstack_utils.get_keystone_session(
|
||||
openrc, scope='DOMAIN')
|
||||
keystone_client = (
|
||||
openstack_utils.get_keystone_session_client(
|
||||
keystone_session))
|
||||
keystone_client.services.list()
|
||||
logging.info("keystone IP:{} without policyd override "
|
||||
"services list working"
|
||||
.format(ip))
|
||||
except keystoneauth1.exceptions.http.Forbidden:
|
||||
raise zaza_exceptions.PolicydError(
|
||||
'Retrieve services list as demo user with project '
|
||||
'scoped token passed and should have passed. IP = {}'
|
||||
.format(ip))
|
||||
|
||||
logging.info('OK')
|
||||
|
||||
|
||||
# class NeutronAPITest(PolicydTest,
|
||||
# test_utils.OpenStackBaseTest):
|
||||
# """Specific test for policyd for neutron-api charm."""
|
||||
|
||||
# def test_disable_service(self):
|
||||
# """Test that service can be disabled."""
|
||||
# logging.info("Doing policyd override to disable listing domains")
|
||||
# self._set_policy_with(
|
||||
# {'rule.yaml': "{'identity:list_services': '!'}"})
|
||||
# # Get authenticated clients
|
||||
# keystone_client = openstack_utils.get_keystone_session_client(
|
||||
# keystone_session)
|
||||
# neutron_client = openstack_utils.get_neutron_session_client(
|
||||
# keystone_session)
|
||||
def get_client_and_attempt_operation(self, keystone_session):
|
||||
"""Attempt to ????
|
||||
|
||||
This operation should pass normally for the demo_user, and fail when
|
||||
the rule has been overriden (see the `rule` class variable.
|
||||
|
||||
:param keystone_session: the keystone session to use to obtain the
|
||||
client necessary for the test.
|
||||
:type keystone_session: keystoneauth1.session.Session
|
||||
:raises: PolicydOperationFailedException if operation fails.
|
||||
"""
|
||||
neutron_client = openstack_utils.get_neutron_session_client(
|
||||
keystone_session)
|
||||
try:
|
||||
neutron_client.list_networks()
|
||||
except neutronclient.exceptions.Forbidden:
|
||||
raise PolicydOperationFailedException()
|
||||
|
||||
Reference in New Issue
Block a user