Add remaining policyd tests for 19.10 release
This adds the modified neutron-api, glance, cinder policyd tests. In particular, the test verifies that an operation can be overriden and removed and that the api calls works, doesn't work, and then works again.
This commit is contained in:
@@ -38,7 +38,8 @@ import tempfile
|
||||
import zipfile
|
||||
|
||||
import keystoneauth1
|
||||
import neutronclient.exceptions
|
||||
import glanceclient.common.exceptions
|
||||
import cinderclient.exceptions
|
||||
|
||||
import zaza.model as zaza_model
|
||||
|
||||
@@ -210,7 +211,7 @@ class PolicydOperationFailedException(Exception):
|
||||
class BasePolicydSpecialization(PolicydTest,
|
||||
ch_keystone.BaseKeystoneTest,
|
||||
test_utils.OpenStackBaseTest):
|
||||
"""Base test for specialising Policyd override tests
|
||||
"""Base test for specialising Policyd override tests.
|
||||
|
||||
This class is for specialization of the test to verify that a yaml file
|
||||
placed in the policy.d director is observed. This is done by first calling
|
||||
@@ -238,6 +239,7 @@ class BasePolicydSpecialization(PolicydTest,
|
||||
def get_client_and_attempt_operation(self, keystone_session):
|
||||
... etc.
|
||||
"""
|
||||
|
||||
# this needs to be defined as the rule that gets placed into a yaml policy
|
||||
# override. It is a string of the form: 'some-rule: "!"'
|
||||
# i.e. disable some policy and then try and test it.
|
||||
@@ -268,23 +270,20 @@ class BasePolicydSpecialization(PolicydTest,
|
||||
"""
|
||||
raise NotImplementedError("This method must be overridden")
|
||||
|
||||
def _get_keystone_session(self, ip):
|
||||
def _get_keystone_session(self, ip, openrc, scope='DOMAIN'):
|
||||
"""Return the keystone session for the IP address passed.
|
||||
|
||||
:param ip: the IP address to get the session against.
|
||||
:type ip: str
|
||||
:param openrc: the params to authenticate with.
|
||||
:type openrc: Dict[str, str]
|
||||
:param scope: the scope of the token
|
||||
:type scope: str
|
||||
:returns: a keystone session to the IP address
|
||||
:rtype: keystoneauth1.session.Session
|
||||
"""
|
||||
logging.info('keystone IP {}'.format(ip))
|
||||
openrc = {
|
||||
'API_VERSION': 3,
|
||||
'OS_USERNAME': ch_keystone.DEMO_ADMIN_USER,
|
||||
'OS_PASSWORD': ch_keystone.DEMO_ADMIN_USER_PASSWORD,
|
||||
'OS_AUTH_URL': 'http://{}:5000/v3'.format(ip),
|
||||
'OS_USER_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN,
|
||||
'OS_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN,
|
||||
}
|
||||
logging.info('Authentication for {} on keystone IP {}'
|
||||
.format(openrc['OS_USERNAME'], ip))
|
||||
if self.tls_rid:
|
||||
openrc['OS_CACERT'] = \
|
||||
openstack_utils.KEYSTONE_LOCAL_CACERT
|
||||
@@ -292,11 +291,64 @@ class BasePolicydSpecialization(PolicydTest,
|
||||
openrc['OS_AUTH_URL'].replace('http', 'https'))
|
||||
logging.info('keystone IP {}'.format(ip))
|
||||
keystone_session = openstack_utils.get_keystone_session(
|
||||
openrc, scope='DOMAIN')
|
||||
openrc, scope=scope)
|
||||
return keystone_session
|
||||
|
||||
def get_keystone_session_demo_user(self, ip, scope='PROJECT'):
|
||||
"""Return the keystone session for demo user.
|
||||
|
||||
:param ip: the IP address to get the session against.
|
||||
:type ip: str
|
||||
:param scope: the scope of the token
|
||||
:type scope: str
|
||||
:returns: a keystone session to the IP address
|
||||
:rtype: keystoneauth1.session.Session
|
||||
"""
|
||||
return self._get_keystone_session(ip, {
|
||||
'API_VERSION': 3,
|
||||
'OS_USERNAME': ch_keystone.DEMO_USER,
|
||||
'OS_PASSWORD': ch_keystone.DEMO_PASSWORD,
|
||||
'OS_AUTH_URL': 'http://{}:5000/v3'.format(ip),
|
||||
'OS_USER_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN,
|
||||
'OS_PROJECT_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN,
|
||||
'OS_PROJECT_NAME': ch_keystone.DEMO_PROJECT,
|
||||
'OS_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN,
|
||||
}, scope)
|
||||
|
||||
def get_keystone_session_demo_admin_user(self, ip, scope='PROJECT'):
|
||||
"""Return the keystone session demo_admin user.
|
||||
|
||||
:param ip: the IP address to get the session against.
|
||||
:type ip: str
|
||||
:param scope: the scope of the token
|
||||
:type scope: str
|
||||
:returns: a keystone session to the IP address
|
||||
:rtype: keystoneauth1.session.Session
|
||||
"""
|
||||
return self._get_keystone_session(ip, {
|
||||
'API_VERSION': 3,
|
||||
'OS_USERNAME': ch_keystone.DEMO_ADMIN_USER,
|
||||
'OS_PASSWORD': ch_keystone.DEMO_ADMIN_USER_PASSWORD,
|
||||
'OS_AUTH_URL': 'http://{}:5000/v3'.format(ip),
|
||||
'OS_USER_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN,
|
||||
'OS_PROJECT_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN,
|
||||
'OS_PROJECT_NAME': ch_keystone.DEMO_PROJECT,
|
||||
'OS_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN,
|
||||
}, scope)
|
||||
|
||||
def get_keystone_session_admin_user(self, ip):
|
||||
"""Return the keystone session admin user.
|
||||
|
||||
:param ip: the IP address to get the session against.
|
||||
:type ip: str
|
||||
:returns: a keystone session to the IP address
|
||||
:rtype: keystoneauth1.session.Session
|
||||
"""
|
||||
return openstack_utils.get_keystone_session(
|
||||
openstack_utils.get_overcloud_auth(address=ip))
|
||||
|
||||
def test_003_test_overide_is_observed(self):
|
||||
"""Test that the override is observed by the underlying service"""
|
||||
"""Test that the override is observed by the underlying service."""
|
||||
if self._test_name is None:
|
||||
logging.info("Doing policyd override for {}"
|
||||
.format(self._service_name))
|
||||
@@ -304,19 +356,18 @@ class BasePolicydSpecialization(PolicydTest,
|
||||
logging.info(self._test_name)
|
||||
# note policyd override only works with Xenial-queens and so keystone
|
||||
# is already v3
|
||||
# get a keystone session from any keystone unit; we only need one
|
||||
# to check to see if it's working.
|
||||
user_keystone_session = self._get_keystone_session(
|
||||
self.keystone_ips[0])
|
||||
|
||||
# verify that the operation works before performing the policyd
|
||||
# override.
|
||||
zaza_model.block_until_wl_status_info_starts_with(
|
||||
self.application_name, "PO:", negate_match=True)
|
||||
zaza_model.block_until_all_units_idle()
|
||||
logging.info("First verify that operation works prior to override")
|
||||
try:
|
||||
self.get_client_and_attempt_operation(user_keystone_session)
|
||||
self.get_client_and_attempt_operation(self.keystone_ips[0])
|
||||
except Exception as e:
|
||||
raise zaza_exceptions.PolicydError(
|
||||
'Retrieve service action as demo user with domain scoped '
|
||||
'token failed and should have passed. "{}"'
|
||||
'Service action failed and should have passed. "{}"'
|
||||
.format(str(e)))
|
||||
|
||||
# now do the policyd override.
|
||||
@@ -325,15 +376,20 @@ class BasePolicydSpecialization(PolicydTest,
|
||||
zaza_model.block_until_all_units_idle()
|
||||
|
||||
# now make sure the operation fails
|
||||
logging.info("Now verify that operation doesn't work with override")
|
||||
try:
|
||||
self.get_client_and_attempt_operation(user_keystone_session)
|
||||
self.get_client_and_attempt_operation(self.keystone_ips[0])
|
||||
raise zaza_exceptions.PolicydError(
|
||||
'Retrieve service action as demo user with domain scoped '
|
||||
'token passed but should have failed due to the policy '
|
||||
'override.')
|
||||
"Service action passed and should have failed.")
|
||||
except PolicydOperationFailedException:
|
||||
pass
|
||||
except zaza_exceptions.PolicydError:
|
||||
logging.info("Policy override worked.")
|
||||
raise
|
||||
except Exception as e:
|
||||
logging.info("exception was: {}".format(e.__class__.__name__))
|
||||
import traceback
|
||||
logging.info(traceback.format_exc())
|
||||
raise zaza_exceptions.PolicydError(
|
||||
'Retrieve service action as demo user with domain scoped '
|
||||
'token failed: "{}"'
|
||||
@@ -353,52 +409,137 @@ class BasePolicydSpecialization(PolicydTest,
|
||||
zaza_model.block_until_wl_status_info_starts_with(
|
||||
self.application_name, "PO:", negate_match=True)
|
||||
|
||||
# Finally make sure it works again!
|
||||
logging.info("Finally verify that operation works after removing the "
|
||||
"override.")
|
||||
try:
|
||||
self.get_client_and_attempt_operation(self.keystone_ips[0])
|
||||
except Exception as e:
|
||||
raise zaza_exceptions.PolicydError(
|
||||
'Service action failed and should have passed after removing '
|
||||
'policy override: "{}"'
|
||||
.format(str(e)))
|
||||
|
||||
logging.info('OK')
|
||||
|
||||
|
||||
class KeystonePolicydTest(BasePolicydSpecialization):
|
||||
class KeystoneTests(BasePolicydSpecialization):
|
||||
"""Test the policyd override using the keystone client."""
|
||||
|
||||
_rule = "{'identity:list_services': '!'}"
|
||||
|
||||
def get_client_and_attempt_operation(self, keystone_session):
|
||||
@classmethod
|
||||
def setUpClass(cls, application_name=None):
|
||||
"""Run class setup for running NeutronApiTest charm operation tests."""
|
||||
super(KeystoneTests, cls).setUpClass(
|
||||
application_name="keystone")
|
||||
|
||||
def get_client_and_attempt_operation(self, ip):
|
||||
"""Attempt to list services. If it fails, raise an exception.
|
||||
|
||||
This operation should pass normally for the demo_user, and fail when
|
||||
the rule has been overriden (see the `rule` class variable.
|
||||
|
||||
:param keystone_session: the keystone session to use to obtain the
|
||||
client necessary for the test.
|
||||
:type keystone_session: keystoneauth1.session.Session
|
||||
:param ip: the IP address to get the session against.
|
||||
:type ip: str
|
||||
:raises: PolicydOperationFailedException if operation fails.
|
||||
"""
|
||||
keystone_client = openstack_utils.get_keystone_session_client(
|
||||
keystone_session)
|
||||
self.get_keystone_session_demo_admin_user(ip))
|
||||
try:
|
||||
keystone_client.services.list()
|
||||
except keystoneauth1.exceptions.http.Forbidden:
|
||||
raise PolicydOperationFailedException()
|
||||
|
||||
|
||||
class NeutronApiTest(BasePolicydSpecialization):
|
||||
"""Test the policyd override using the keystone client."""
|
||||
class NeutronApiTests(BasePolicydSpecialization):
|
||||
"""Test the policyd override using the neutron client."""
|
||||
|
||||
_rule = "{'get_network': '!'}"
|
||||
|
||||
def get_client_and_attempt_operation(self, keystone_session):
|
||||
"""Attempt to ????
|
||||
@classmethod
|
||||
def setUpClass(cls, application_name=None):
|
||||
"""Run class setup for running NeutronApiTest charm operation tests."""
|
||||
super(NeutronApiTests, cls).setUpClass(application_name="neutron-api")
|
||||
cls.application_name = "neutron-api"
|
||||
|
||||
def get_client_and_attempt_operation(self, ip):
|
||||
"""Attempt to list the networks as a policyd override.
|
||||
|
||||
This operation should pass normally for the demo_user, and fail when
|
||||
the rule has been overriden (see the `rule` class variable.
|
||||
|
||||
:param keystone_session: the keystone session to use to obtain the
|
||||
client necessary for the test.
|
||||
:type keystone_session: keystoneauth1.session.Session
|
||||
:param ip: the IP address to get the session against.
|
||||
:type ip: str
|
||||
:raises: PolicydOperationFailedException if operation fails.
|
||||
"""
|
||||
neutron_client = openstack_utils.get_neutron_session_client(
|
||||
keystone_session)
|
||||
self.get_keystone_session_demo_user(ip))
|
||||
try:
|
||||
neutron_client.list_networks()
|
||||
except neutronclient.exceptions.Forbidden:
|
||||
# If we are allowed to list networks, this will return something.
|
||||
# if the policyd override is present, then no error is generated,
|
||||
# but no networks are returned.
|
||||
networks = neutron_client.list_networks()
|
||||
logging.debug("networks: {}".format(networks))
|
||||
if len(networks['networks']) == 0:
|
||||
raise PolicydOperationFailedException()
|
||||
except Exception:
|
||||
raise PolicydOperationFailedException()
|
||||
|
||||
|
||||
class GlanceTests(BasePolicydSpecialization):
|
||||
"""Test the policyd override using the glance client."""
|
||||
|
||||
_rule = "{'get_images': '!'}"
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls, application_name=None):
|
||||
"""Run class setup for running GlanceTests charm operation tests."""
|
||||
super(GlanceTests, cls).setUpClass(application_name="glance")
|
||||
cls.application_name = "glance"
|
||||
|
||||
def get_client_and_attempt_operation(self, ip):
|
||||
"""Attempt to list the images as a policyd override.
|
||||
|
||||
This operation should pass normally for the demo_user, and fail when
|
||||
the rule has been overriden (see the `rule` class variable.
|
||||
|
||||
:param ip: the IP address to get the session against.
|
||||
:type ip: str
|
||||
:raises: PolicydOperationFailedException if operation fails.
|
||||
"""
|
||||
glance_client = openstack_utils.get_glance_session_client(
|
||||
self.get_keystone_session_demo_user(ip))
|
||||
try:
|
||||
glance_client.images.list()
|
||||
except glanceclient.common.exceptions.HTTPForbidden:
|
||||
raise PolicydOperationFailedException()
|
||||
|
||||
|
||||
class CinderTests(BasePolicydSpecialization):
|
||||
"""Test the policyd override using the cinder client."""
|
||||
|
||||
_rule = "{'volume:get_all': '!'}"
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls, application_name=None):
|
||||
"""Run class setup for running CinderTests charm operation tests."""
|
||||
super(CinderTests, cls).setUpClass(application_name="cinder")
|
||||
cls.application_name = "cinder"
|
||||
|
||||
def get_client_and_attempt_operation(self, ip):
|
||||
"""Attempt to list the images as a policyd override.
|
||||
|
||||
This operation should pass normally for the demo_user, and fail when
|
||||
the rule has been overriden (see the `rule` class variable.
|
||||
|
||||
:param ip: the IP address to get the session against.
|
||||
:type ip: str
|
||||
:raises: PolicydOperationFailedException if operation fails.
|
||||
"""
|
||||
cinder_client = openstack_utils.get_cinder_session_client(
|
||||
self.get_keystone_session_admin_user(ip))
|
||||
try:
|
||||
cinder_client.volumes.list()
|
||||
except cinderclient.exceptions.Forbidden:
|
||||
raise PolicydOperationFailedException()
|
||||
|
||||
Reference in New Issue
Block a user