Merge pull request #67 from ajkavanagh/policy-overrides-neutron
Finalise policyd tests for 19.10 release.
This commit is contained in:
@@ -25,22 +25,6 @@ The Policyd Tests test the following:
|
||||
verify that the charm has implemented policy overrides and ensured that the
|
||||
service actually picks them up).
|
||||
|
||||
In order to use the generic tests, just include them in the specific test
|
||||
class. The KeystonePolicydTest as an example does:
|
||||
|
||||
class KeystonePolicydTest(PolicydTest,
|
||||
ch_keystone.BaseKeystoneTest,
|
||||
test_utils.OpenStackBaseTest):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls, application_name=None):
|
||||
super(KeystonePolicydTest, cls).setUpClass(application_name)
|
||||
|
||||
Note that the generic test class (PolicyDTest) comes first, and then the
|
||||
ch_keystone.BaseKeystoneTest, followed by the test_utils.OpenStackBaseTest.
|
||||
This is to get the order of super().setUpClass(...) calls to work with
|
||||
application_name.
|
||||
|
||||
If a charm doesn't require a specific test, then the GenericPolicydTest class
|
||||
can be used that just includes the two generic tests. The config in the
|
||||
tests.yaml would stil be required. See the PolicydTest class docstring for
|
||||
@@ -54,6 +38,8 @@ import tempfile
|
||||
import zipfile
|
||||
|
||||
import keystoneauth1
|
||||
import glanceclient.common.exceptions
|
||||
import cinderclient.exceptions
|
||||
|
||||
import zaza.model as zaza_model
|
||||
|
||||
@@ -125,7 +111,7 @@ class PolicydTest(object):
|
||||
rules_zip_path)
|
||||
self._set_config(True)
|
||||
zaza_model.block_until_wl_status_info_starts_with(
|
||||
self.application_name, "PO:", negate_match=True)
|
||||
self.application_name, "PO:", negate_match=False)
|
||||
|
||||
def test_001_policyd_good_yaml(self):
|
||||
"""Test that the policyd with a good zipped yaml file."""
|
||||
@@ -160,7 +146,7 @@ class PolicydTest(object):
|
||||
# we have to do it twice due to async races and that some info lines
|
||||
# erase the PO: bit prior to actuall getting back to idle. The double
|
||||
# check verifies that the charms have started, the idle waits until it
|
||||
# is finiehed, and then the final check really makes sure they got
|
||||
# is finished, and then the final check really makes sure they got
|
||||
# switched off.
|
||||
zaza_model.block_until_wl_status_info_starts_with(
|
||||
self.application_name, "PO:", negate_match=True)
|
||||
@@ -200,110 +186,360 @@ class PolicydTest(object):
|
||||
"/etc", self._service_name, "policy.d", 'file2.yaml')
|
||||
logging.info("Now checking that file {} is not present.".format(path))
|
||||
zaza_model.block_until_file_missing(self.application_name, path)
|
||||
self._set_config(True)
|
||||
self._set_config(False)
|
||||
zaza_model.block_until_all_units_idle()
|
||||
logging.info("OK")
|
||||
|
||||
|
||||
class KeystonePolicydTest(PolicydTest,
|
||||
ch_keystone.BaseKeystoneTest,
|
||||
test_utils.OpenStackBaseTest):
|
||||
"""Specific test for policyd for keystone charm."""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls, application_name=None):
|
||||
"""Run class setup for running KeystonePolicydTest tests."""
|
||||
super(KeystonePolicydTest, cls).setUpClass(application_name)
|
||||
|
||||
def test_disable_service(self):
|
||||
"""Test that service can be disabled."""
|
||||
logging.info("Doing policyd override to disable listing domains")
|
||||
self._set_policy_with(
|
||||
{'rule.yaml': "{'identity:list_services': '!'}"})
|
||||
|
||||
# verify that the policy.d override does disable the endpoint
|
||||
with self.config_change(
|
||||
{'preferred-api-version': self.default_api_version,
|
||||
'use-policyd-override': 'False'},
|
||||
{'preferred-api-version': '3',
|
||||
'use-policyd-override': 'True'},
|
||||
application_name="keystone"):
|
||||
zaza_model.block_until_all_units_idle()
|
||||
for ip in self.keystone_ips:
|
||||
try:
|
||||
logging.info('keystone IP {}'.format(ip))
|
||||
openrc = {
|
||||
'API_VERSION': 3,
|
||||
'OS_USERNAME': ch_keystone.DEMO_ADMIN_USER,
|
||||
'OS_PASSWORD': ch_keystone.DEMO_ADMIN_USER_PASSWORD,
|
||||
'OS_AUTH_URL': 'http://{}:5000/v3'.format(ip),
|
||||
'OS_USER_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN,
|
||||
'OS_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN,
|
||||
}
|
||||
if self.tls_rid:
|
||||
openrc['OS_CACERT'] = \
|
||||
openstack_utils.KEYSTONE_LOCAL_CACERT
|
||||
openrc['OS_AUTH_URL'] = (
|
||||
openrc['OS_AUTH_URL'].replace('http', 'https'))
|
||||
logging.info('keystone IP {}'.format(ip))
|
||||
keystone_session = openstack_utils.get_keystone_session(
|
||||
openrc, scope='DOMAIN')
|
||||
keystone_client = (
|
||||
openstack_utils.get_keystone_session_client(
|
||||
keystone_session))
|
||||
keystone_client.services.list()
|
||||
raise zaza_exceptions.PolicydError(
|
||||
'Retrieve service list as admin with project scoped '
|
||||
'token passed and should have failed. IP = {}'
|
||||
.format(ip))
|
||||
except keystoneauth1.exceptions.http.Forbidden:
|
||||
logging.info("keystone IP:{} policyd override disabled "
|
||||
"services listing by demo user"
|
||||
.format(ip))
|
||||
|
||||
# now verify (with the config off) that we can actually access
|
||||
# these points
|
||||
with self.config_change(
|
||||
{'preferred-api-version': self.default_api_version},
|
||||
{'preferred-api-version': '3'},
|
||||
application_name="keystone"):
|
||||
zaza_model.block_until_all_units_idle()
|
||||
for ip in self.keystone_ips:
|
||||
try:
|
||||
logging.info('keystone IP {}'.format(ip))
|
||||
openrc = {
|
||||
'API_VERSION': 3,
|
||||
'OS_USERNAME': ch_keystone.DEMO_ADMIN_USER,
|
||||
'OS_PASSWORD': ch_keystone.DEMO_ADMIN_USER_PASSWORD,
|
||||
'OS_AUTH_URL': 'http://{}:5000/v3'.format(ip),
|
||||
'OS_USER_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN,
|
||||
'OS_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN,
|
||||
}
|
||||
if self.tls_rid:
|
||||
openrc['OS_CACERT'] = \
|
||||
openstack_utils.KEYSTONE_LOCAL_CACERT
|
||||
openrc['OS_AUTH_URL'] = (
|
||||
openrc['OS_AUTH_URL'].replace('http', 'https'))
|
||||
logging.info('keystone IP {}'.format(ip))
|
||||
keystone_session = openstack_utils.get_keystone_session(
|
||||
openrc, scope='DOMAIN')
|
||||
keystone_client = (
|
||||
openstack_utils.get_keystone_session_client(
|
||||
keystone_session))
|
||||
keystone_client.services.list()
|
||||
logging.info("keystone IP:{} without policyd override "
|
||||
"services list working"
|
||||
.format(ip))
|
||||
except keystoneauth1.exceptions.http.Forbidden:
|
||||
raise zaza_exceptions.PolicydError(
|
||||
'Retrieve services list as demo user with project '
|
||||
'scoped token passed and should have passed. IP = {}'
|
||||
.format(ip))
|
||||
|
||||
logging.info('OK')
|
||||
|
||||
|
||||
class GenericPolicydTest(PolicydTest, test_utils.OpenStackBaseTest):
|
||||
"""Generic policyd test for any charm without a specific test."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class PolicydOperationFailedException(Exception):
|
||||
"""This is raised by the get_client_and_attempt_operation() method.
|
||||
|
||||
This is used to signal that the operation in the
|
||||
get_client_and_attempt_operation() method in the BaseSpecialization class
|
||||
has failed.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class BasePolicydSpecialization(PolicydTest,
|
||||
ch_keystone.BaseKeystoneTest,
|
||||
test_utils.OpenStackBaseTest):
|
||||
"""Base test for specialising Policyd override tests.
|
||||
|
||||
This class is for specialization of the test to verify that a yaml file
|
||||
placed in the policy.d director is observed. This is done by first calling
|
||||
the get_client_and_attempt_operation() method and ensuring that it works.
|
||||
This method should attempt an operation on the service that can be blocked
|
||||
by the policy override in the `_rule` class variable. The method should
|
||||
pass cleanly without the override in place.
|
||||
|
||||
The test_003_test_overide_is_observed will then apply the override and then
|
||||
call get_client_and_attempt_operation() again, and this time it should
|
||||
detect the failure and raise the PolicydOperationFailedException()
|
||||
exception. This will be detected as the override working and thus the test
|
||||
will pass.
|
||||
|
||||
The test will fail if the first call fails for any reason, or if the 2nd
|
||||
call doesn't raise PolicydOperationFailedException or raises any other
|
||||
exception.
|
||||
|
||||
To use this class, follow the keystone example:
|
||||
|
||||
class KeystonePolicydTest(BasePolicydSpecialization):
|
||||
|
||||
_rule = "{'identity:list_services': '!'}"
|
||||
|
||||
def get_client_and_attempt_operation(self, keystone_session):
|
||||
... etc.
|
||||
"""
|
||||
|
||||
# this needs to be defined as the rule that gets placed into a yaml policy
|
||||
# override. It is a string of the form: 'some-rule: "!"'
|
||||
# i.e. disable some policy and then try and test it.
|
||||
_rule = None
|
||||
|
||||
# Optional: the name to log at the beginning of the test
|
||||
_test_name = None
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls, application_name=None):
|
||||
"""Run class setup for running KeystonePolicydTest tests."""
|
||||
super(BasePolicydSpecialization, cls).setUpClass(application_name)
|
||||
if cls._rule is None:
|
||||
cls.SkipTest("Test not valid if {}.rule is not configured"
|
||||
.format(cls.__name__))
|
||||
return
|
||||
|
||||
def get_client_and_attempt_operation(self, keystone_session):
|
||||
"""Override this method to perform the operation.
|
||||
|
||||
This operation should pass normally for the demo_user, and fail when
|
||||
the rule has been overriden (see the `rule` class variable.
|
||||
|
||||
:param keystone_session: the keystone session to use to obtain the
|
||||
client necessary for the test.
|
||||
:type keystone_session: keystoneauth1.session.Session
|
||||
:raises: PolicydOperationFailedException if operation fails.
|
||||
"""
|
||||
raise NotImplementedError("This method must be overridden")
|
||||
|
||||
def _get_keystone_session(self, ip, openrc, scope='DOMAIN'):
|
||||
"""Return the keystone session for the IP address passed.
|
||||
|
||||
:param ip: the IP address to get the session against.
|
||||
:type ip: str
|
||||
:param openrc: the params to authenticate with.
|
||||
:type openrc: Dict[str, str]
|
||||
:param scope: the scope of the token
|
||||
:type scope: str
|
||||
:returns: a keystone session to the IP address
|
||||
:rtype: keystoneauth1.session.Session
|
||||
"""
|
||||
logging.info('Authentication for {} on keystone IP {}'
|
||||
.format(openrc['OS_USERNAME'], ip))
|
||||
if self.tls_rid:
|
||||
openrc['OS_CACERT'] = \
|
||||
openstack_utils.KEYSTONE_LOCAL_CACERT
|
||||
openrc['OS_AUTH_URL'] = (
|
||||
openrc['OS_AUTH_URL'].replace('http', 'https'))
|
||||
logging.info('keystone IP {}'.format(ip))
|
||||
keystone_session = openstack_utils.get_keystone_session(
|
||||
openrc, scope=scope)
|
||||
return keystone_session
|
||||
|
||||
def get_keystone_session_demo_user(self, ip, scope='PROJECT'):
|
||||
"""Return the keystone session for demo user.
|
||||
|
||||
:param ip: the IP address to get the session against.
|
||||
:type ip: str
|
||||
:param scope: the scope of the token
|
||||
:type scope: str
|
||||
:returns: a keystone session to the IP address
|
||||
:rtype: keystoneauth1.session.Session
|
||||
"""
|
||||
return self._get_keystone_session(ip, {
|
||||
'API_VERSION': 3,
|
||||
'OS_USERNAME': ch_keystone.DEMO_USER,
|
||||
'OS_PASSWORD': ch_keystone.DEMO_PASSWORD,
|
||||
'OS_AUTH_URL': 'http://{}:5000/v3'.format(ip),
|
||||
'OS_USER_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN,
|
||||
'OS_PROJECT_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN,
|
||||
'OS_PROJECT_NAME': ch_keystone.DEMO_PROJECT,
|
||||
'OS_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN,
|
||||
}, scope)
|
||||
|
||||
def get_keystone_session_demo_admin_user(self, ip, scope='PROJECT'):
|
||||
"""Return the keystone session demo_admin user.
|
||||
|
||||
:param ip: the IP address to get the session against.
|
||||
:type ip: str
|
||||
:param scope: the scope of the token
|
||||
:type scope: str
|
||||
:returns: a keystone session to the IP address
|
||||
:rtype: keystoneauth1.session.Session
|
||||
"""
|
||||
return self._get_keystone_session(ip, {
|
||||
'API_VERSION': 3,
|
||||
'OS_USERNAME': ch_keystone.DEMO_ADMIN_USER,
|
||||
'OS_PASSWORD': ch_keystone.DEMO_ADMIN_USER_PASSWORD,
|
||||
'OS_AUTH_URL': 'http://{}:5000/v3'.format(ip),
|
||||
'OS_USER_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN,
|
||||
'OS_PROJECT_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN,
|
||||
'OS_PROJECT_NAME': ch_keystone.DEMO_PROJECT,
|
||||
'OS_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN,
|
||||
}, scope)
|
||||
|
||||
def get_keystone_session_admin_user(self, ip):
|
||||
"""Return the keystone session admin user.
|
||||
|
||||
:param ip: the IP address to get the session against.
|
||||
:type ip: str
|
||||
:returns: a keystone session to the IP address
|
||||
:rtype: keystoneauth1.session.Session
|
||||
"""
|
||||
return openstack_utils.get_keystone_session(
|
||||
openstack_utils.get_overcloud_auth(address=ip))
|
||||
|
||||
def test_003_test_overide_is_observed(self):
|
||||
"""Test that the override is observed by the underlying service."""
|
||||
if self._test_name is None:
|
||||
logging.info("Doing policyd override for {}"
|
||||
.format(self._service_name))
|
||||
else:
|
||||
logging.info(self._test_name)
|
||||
# note policyd override only works with Xenial-queens and so keystone
|
||||
# is already v3
|
||||
|
||||
# verify that the operation works before performing the policyd
|
||||
# override.
|
||||
zaza_model.block_until_wl_status_info_starts_with(
|
||||
self.application_name, "PO:", negate_match=True)
|
||||
zaza_model.block_until_all_units_idle()
|
||||
logging.info("First verify that operation works prior to override")
|
||||
try:
|
||||
self.get_client_and_attempt_operation(self.keystone_ips[0])
|
||||
except Exception as e:
|
||||
raise zaza_exceptions.PolicydError(
|
||||
'Service action failed and should have passed. "{}"'
|
||||
.format(str(e)))
|
||||
|
||||
# now do the policyd override.
|
||||
logging.info("Doing policyd override with: {}".format(self._rule))
|
||||
self._set_policy_with({'rule.yaml': self._rule})
|
||||
zaza_model.block_until_all_units_idle()
|
||||
|
||||
# now make sure the operation fails
|
||||
logging.info("Now verify that operation doesn't work with override")
|
||||
try:
|
||||
self.get_client_and_attempt_operation(self.keystone_ips[0])
|
||||
raise zaza_exceptions.PolicydError(
|
||||
"Service action passed and should have failed.")
|
||||
except PolicydOperationFailedException:
|
||||
pass
|
||||
except zaza_exceptions.PolicydError:
|
||||
logging.info("Policy override worked.")
|
||||
raise
|
||||
except Exception as e:
|
||||
logging.info("exception was: {}".format(e.__class__.__name__))
|
||||
import traceback
|
||||
logging.info(traceback.format_exc())
|
||||
raise zaza_exceptions.PolicydError(
|
||||
'Retrieve service action as demo user with domain scoped '
|
||||
'token failed: "{}"'
|
||||
.format(str(e)))
|
||||
|
||||
# clean out the policy and wait
|
||||
self._set_config(False)
|
||||
# check that the status no longer has "PO:" on it.
|
||||
# we have to do it twice due to async races and that some info lines
|
||||
# erase the PO: bit prior to actuall getting back to idle. The double
|
||||
# check verifies that the charms have started, the idle waits until it
|
||||
# is finished, and then the final check really makes sure they got
|
||||
# switched off.
|
||||
zaza_model.block_until_wl_status_info_starts_with(
|
||||
self.application_name, "PO:", negate_match=True)
|
||||
zaza_model.block_until_all_units_idle()
|
||||
zaza_model.block_until_wl_status_info_starts_with(
|
||||
self.application_name, "PO:", negate_match=True)
|
||||
|
||||
# Finally make sure it works again!
|
||||
logging.info("Finally verify that operation works after removing the "
|
||||
"override.")
|
||||
try:
|
||||
self.get_client_and_attempt_operation(self.keystone_ips[0])
|
||||
except Exception as e:
|
||||
raise zaza_exceptions.PolicydError(
|
||||
'Service action failed and should have passed after removing '
|
||||
'policy override: "{}"'
|
||||
.format(str(e)))
|
||||
|
||||
logging.info('OK')
|
||||
|
||||
|
||||
class KeystoneTests(BasePolicydSpecialization):
|
||||
"""Test the policyd override using the keystone client."""
|
||||
|
||||
_rule = "{'identity:list_services': '!'}"
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls, application_name=None):
|
||||
"""Run class setup for running NeutronApiTest charm operation tests."""
|
||||
super(KeystoneTests, cls).setUpClass(
|
||||
application_name="keystone")
|
||||
|
||||
def get_client_and_attempt_operation(self, ip):
|
||||
"""Attempt to list services. If it fails, raise an exception.
|
||||
|
||||
This operation should pass normally for the demo_user, and fail when
|
||||
the rule has been overriden (see the `rule` class variable.
|
||||
|
||||
:param ip: the IP address to get the session against.
|
||||
:type ip: str
|
||||
:raises: PolicydOperationFailedException if operation fails.
|
||||
"""
|
||||
keystone_client = openstack_utils.get_keystone_session_client(
|
||||
self.get_keystone_session_demo_admin_user(ip))
|
||||
try:
|
||||
keystone_client.services.list()
|
||||
except keystoneauth1.exceptions.http.Forbidden:
|
||||
raise PolicydOperationFailedException()
|
||||
|
||||
|
||||
class NeutronApiTests(BasePolicydSpecialization):
|
||||
"""Test the policyd override using the neutron client."""
|
||||
|
||||
_rule = "{'get_network': '!'}"
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls, application_name=None):
|
||||
"""Run class setup for running NeutronApiTest charm operation tests."""
|
||||
super(NeutronApiTests, cls).setUpClass(application_name="neutron-api")
|
||||
cls.application_name = "neutron-api"
|
||||
|
||||
def get_client_and_attempt_operation(self, ip):
|
||||
"""Attempt to list the networks as a policyd override.
|
||||
|
||||
This operation should pass normally for the demo_user, and fail when
|
||||
the rule has been overriden (see the `rule` class variable.
|
||||
|
||||
:param ip: the IP address to get the session against.
|
||||
:type ip: str
|
||||
:raises: PolicydOperationFailedException if operation fails.
|
||||
"""
|
||||
neutron_client = openstack_utils.get_neutron_session_client(
|
||||
self.get_keystone_session_demo_user(ip))
|
||||
try:
|
||||
# If we are allowed to list networks, this will return something.
|
||||
# if the policyd override is present, then no error is generated,
|
||||
# but no networks are returned.
|
||||
networks = neutron_client.list_networks()
|
||||
logging.debug("networks: {}".format(networks))
|
||||
if len(networks['networks']) == 0:
|
||||
raise PolicydOperationFailedException()
|
||||
except Exception:
|
||||
raise PolicydOperationFailedException()
|
||||
|
||||
|
||||
class GlanceTests(BasePolicydSpecialization):
|
||||
"""Test the policyd override using the glance client."""
|
||||
|
||||
_rule = "{'get_images': '!'}"
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls, application_name=None):
|
||||
"""Run class setup for running GlanceTests charm operation tests."""
|
||||
super(GlanceTests, cls).setUpClass(application_name="glance")
|
||||
cls.application_name = "glance"
|
||||
|
||||
def get_client_and_attempt_operation(self, ip):
|
||||
"""Attempt to list the images as a policyd override.
|
||||
|
||||
This operation should pass normally for the demo_user, and fail when
|
||||
the rule has been overriden (see the `rule` class variable.
|
||||
|
||||
:param ip: the IP address to get the session against.
|
||||
:type ip: str
|
||||
:raises: PolicydOperationFailedException if operation fails.
|
||||
"""
|
||||
glance_client = openstack_utils.get_glance_session_client(
|
||||
self.get_keystone_session_demo_user(ip))
|
||||
try:
|
||||
glance_client.images.list()
|
||||
except glanceclient.common.exceptions.HTTPForbidden:
|
||||
raise PolicydOperationFailedException()
|
||||
|
||||
|
||||
class CinderTests(BasePolicydSpecialization):
|
||||
"""Test the policyd override using the cinder client."""
|
||||
|
||||
_rule = "{'volume:get_all': '!'}"
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls, application_name=None):
|
||||
"""Run class setup for running CinderTests charm operation tests."""
|
||||
super(CinderTests, cls).setUpClass(application_name="cinder")
|
||||
cls.application_name = "cinder"
|
||||
|
||||
def get_client_and_attempt_operation(self, ip):
|
||||
"""Attempt to list the images as a policyd override.
|
||||
|
||||
This operation should pass normally for the demo_user, and fail when
|
||||
the rule has been overriden (see the `rule` class variable.
|
||||
|
||||
:param ip: the IP address to get the session against.
|
||||
:type ip: str
|
||||
:raises: PolicydOperationFailedException if operation fails.
|
||||
"""
|
||||
cinder_client = openstack_utils.get_cinder_session_client(
|
||||
self.get_keystone_session_admin_user(ip))
|
||||
try:
|
||||
cinder_client.volumes.list()
|
||||
except cinderclient.exceptions.Forbidden:
|
||||
raise PolicydOperationFailedException()
|
||||
|
||||
Reference in New Issue
Block a user