diff --git a/zaza/openstack/charm_tests/policyd/tests.py b/zaza/openstack/charm_tests/policyd/tests.py index 7030a9a..396d3df 100644 --- a/zaza/openstack/charm_tests/policyd/tests.py +++ b/zaza/openstack/charm_tests/policyd/tests.py @@ -38,7 +38,8 @@ import tempfile import zipfile import keystoneauth1 -import neutronclient.exceptions +import glanceclient.common.exceptions +import cinderclient.exceptions import zaza.model as zaza_model @@ -210,7 +211,7 @@ class PolicydOperationFailedException(Exception): class BasePolicydSpecialization(PolicydTest, ch_keystone.BaseKeystoneTest, test_utils.OpenStackBaseTest): - """Base test for specialising Policyd override tests + """Base test for specialising Policyd override tests. This class is for specialization of the test to verify that a yaml file placed in the policy.d director is observed. This is done by first calling @@ -238,6 +239,7 @@ class BasePolicydSpecialization(PolicydTest, def get_client_and_attempt_operation(self, keystone_session): ... etc. """ + # this needs to be defined as the rule that gets placed into a yaml policy # override. It is a string of the form: 'some-rule: "!"' # i.e. disable some policy and then try and test it. @@ -268,23 +270,20 @@ class BasePolicydSpecialization(PolicydTest, """ raise NotImplementedError("This method must be overridden") - def _get_keystone_session(self, ip): + def _get_keystone_session(self, ip, openrc, scope='DOMAIN'): """Return the keystone session for the IP address passed. :param ip: the IP address to get the session against. :type ip: str + :param openrc: the params to authenticate with. + :type openrc: Dict[str, str] + :param scope: the scope of the token + :type scope: str :returns: a keystone session to the IP address :rtype: keystoneauth1.session.Session """ - logging.info('keystone IP {}'.format(ip)) - openrc = { - 'API_VERSION': 3, - 'OS_USERNAME': ch_keystone.DEMO_ADMIN_USER, - 'OS_PASSWORD': ch_keystone.DEMO_ADMIN_USER_PASSWORD, - 'OS_AUTH_URL': 'http://{}:5000/v3'.format(ip), - 'OS_USER_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN, - 'OS_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN, - } + logging.info('Authentication for {} on keystone IP {}' + .format(openrc['OS_USERNAME'], ip)) if self.tls_rid: openrc['OS_CACERT'] = \ openstack_utils.KEYSTONE_LOCAL_CACERT @@ -292,11 +291,64 @@ class BasePolicydSpecialization(PolicydTest, openrc['OS_AUTH_URL'].replace('http', 'https')) logging.info('keystone IP {}'.format(ip)) keystone_session = openstack_utils.get_keystone_session( - openrc, scope='DOMAIN') + openrc, scope=scope) return keystone_session + def get_keystone_session_demo_user(self, ip, scope='PROJECT'): + """Return the keystone session for demo user. + + :param ip: the IP address to get the session against. + :type ip: str + :param scope: the scope of the token + :type scope: str + :returns: a keystone session to the IP address + :rtype: keystoneauth1.session.Session + """ + return self._get_keystone_session(ip, { + 'API_VERSION': 3, + 'OS_USERNAME': ch_keystone.DEMO_USER, + 'OS_PASSWORD': ch_keystone.DEMO_PASSWORD, + 'OS_AUTH_URL': 'http://{}:5000/v3'.format(ip), + 'OS_USER_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN, + 'OS_PROJECT_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN, + 'OS_PROJECT_NAME': ch_keystone.DEMO_PROJECT, + 'OS_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN, + }, scope) + + def get_keystone_session_demo_admin_user(self, ip, scope='PROJECT'): + """Return the keystone session demo_admin user. + + :param ip: the IP address to get the session against. + :type ip: str + :param scope: the scope of the token + :type scope: str + :returns: a keystone session to the IP address + :rtype: keystoneauth1.session.Session + """ + return self._get_keystone_session(ip, { + 'API_VERSION': 3, + 'OS_USERNAME': ch_keystone.DEMO_ADMIN_USER, + 'OS_PASSWORD': ch_keystone.DEMO_ADMIN_USER_PASSWORD, + 'OS_AUTH_URL': 'http://{}:5000/v3'.format(ip), + 'OS_USER_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN, + 'OS_PROJECT_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN, + 'OS_PROJECT_NAME': ch_keystone.DEMO_PROJECT, + 'OS_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN, + }, scope) + + def get_keystone_session_admin_user(self, ip): + """Return the keystone session admin user. + + :param ip: the IP address to get the session against. + :type ip: str + :returns: a keystone session to the IP address + :rtype: keystoneauth1.session.Session + """ + return openstack_utils.get_keystone_session( + openstack_utils.get_overcloud_auth(address=ip)) + def test_003_test_overide_is_observed(self): - """Test that the override is observed by the underlying service""" + """Test that the override is observed by the underlying service.""" if self._test_name is None: logging.info("Doing policyd override for {}" .format(self._service_name)) @@ -304,19 +356,18 @@ class BasePolicydSpecialization(PolicydTest, logging.info(self._test_name) # note policyd override only works with Xenial-queens and so keystone # is already v3 - # get a keystone session from any keystone unit; we only need one - # to check to see if it's working. - user_keystone_session = self._get_keystone_session( - self.keystone_ips[0]) # verify that the operation works before performing the policyd # override. + zaza_model.block_until_wl_status_info_starts_with( + self.application_name, "PO:", negate_match=True) + zaza_model.block_until_all_units_idle() + logging.info("First verify that operation works prior to override") try: - self.get_client_and_attempt_operation(user_keystone_session) + self.get_client_and_attempt_operation(self.keystone_ips[0]) except Exception as e: raise zaza_exceptions.PolicydError( - 'Retrieve service action as demo user with domain scoped ' - 'token failed and should have passed. "{}"' + 'Service action failed and should have passed. "{}"' .format(str(e))) # now do the policyd override. @@ -325,15 +376,20 @@ class BasePolicydSpecialization(PolicydTest, zaza_model.block_until_all_units_idle() # now make sure the operation fails + logging.info("Now verify that operation doesn't work with override") try: - self.get_client_and_attempt_operation(user_keystone_session) + self.get_client_and_attempt_operation(self.keystone_ips[0]) raise zaza_exceptions.PolicydError( - 'Retrieve service action as demo user with domain scoped ' - 'token passed but should have failed due to the policy ' - 'override.') + "Service action passed and should have failed.") except PolicydOperationFailedException: pass + except zaza_exceptions.PolicydError: + logging.info("Policy override worked.") + raise except Exception as e: + logging.info("exception was: {}".format(e.__class__.__name__)) + import traceback + logging.info(traceback.format_exc()) raise zaza_exceptions.PolicydError( 'Retrieve service action as demo user with domain scoped ' 'token failed: "{}"' @@ -353,52 +409,137 @@ class BasePolicydSpecialization(PolicydTest, zaza_model.block_until_wl_status_info_starts_with( self.application_name, "PO:", negate_match=True) + # Finally make sure it works again! + logging.info("Finally verify that operation works after removing the " + "override.") + try: + self.get_client_and_attempt_operation(self.keystone_ips[0]) + except Exception as e: + raise zaza_exceptions.PolicydError( + 'Service action failed and should have passed after removing ' + 'policy override: "{}"' + .format(str(e))) + logging.info('OK') -class KeystonePolicydTest(BasePolicydSpecialization): +class KeystoneTests(BasePolicydSpecialization): """Test the policyd override using the keystone client.""" _rule = "{'identity:list_services': '!'}" - def get_client_and_attempt_operation(self, keystone_session): + @classmethod + def setUpClass(cls, application_name=None): + """Run class setup for running NeutronApiTest charm operation tests.""" + super(KeystoneTests, cls).setUpClass( + application_name="keystone") + + def get_client_and_attempt_operation(self, ip): """Attempt to list services. If it fails, raise an exception. This operation should pass normally for the demo_user, and fail when the rule has been overriden (see the `rule` class variable. - :param keystone_session: the keystone session to use to obtain the - client necessary for the test. - :type keystone_session: keystoneauth1.session.Session + :param ip: the IP address to get the session against. + :type ip: str :raises: PolicydOperationFailedException if operation fails. """ keystone_client = openstack_utils.get_keystone_session_client( - keystone_session) + self.get_keystone_session_demo_admin_user(ip)) try: keystone_client.services.list() except keystoneauth1.exceptions.http.Forbidden: raise PolicydOperationFailedException() -class NeutronApiTest(BasePolicydSpecialization): - """Test the policyd override using the keystone client.""" +class NeutronApiTests(BasePolicydSpecialization): + """Test the policyd override using the neutron client.""" _rule = "{'get_network': '!'}" - def get_client_and_attempt_operation(self, keystone_session): - """Attempt to ???? + @classmethod + def setUpClass(cls, application_name=None): + """Run class setup for running NeutronApiTest charm operation tests.""" + super(NeutronApiTests, cls).setUpClass(application_name="neutron-api") + cls.application_name = "neutron-api" + + def get_client_and_attempt_operation(self, ip): + """Attempt to list the networks as a policyd override. This operation should pass normally for the demo_user, and fail when the rule has been overriden (see the `rule` class variable. - :param keystone_session: the keystone session to use to obtain the - client necessary for the test. - :type keystone_session: keystoneauth1.session.Session + :param ip: the IP address to get the session against. + :type ip: str :raises: PolicydOperationFailedException if operation fails. """ neutron_client = openstack_utils.get_neutron_session_client( - keystone_session) + self.get_keystone_session_demo_user(ip)) try: - neutron_client.list_networks() - except neutronclient.exceptions.Forbidden: + # If we are allowed to list networks, this will return something. + # if the policyd override is present, then no error is generated, + # but no networks are returned. + networks = neutron_client.list_networks() + logging.debug("networks: {}".format(networks)) + if len(networks['networks']) == 0: + raise PolicydOperationFailedException() + except Exception: + raise PolicydOperationFailedException() + + +class GlanceTests(BasePolicydSpecialization): + """Test the policyd override using the glance client.""" + + _rule = "{'get_images': '!'}" + + @classmethod + def setUpClass(cls, application_name=None): + """Run class setup for running GlanceTests charm operation tests.""" + super(GlanceTests, cls).setUpClass(application_name="glance") + cls.application_name = "glance" + + def get_client_and_attempt_operation(self, ip): + """Attempt to list the images as a policyd override. + + This operation should pass normally for the demo_user, and fail when + the rule has been overriden (see the `rule` class variable. + + :param ip: the IP address to get the session against. + :type ip: str + :raises: PolicydOperationFailedException if operation fails. + """ + glance_client = openstack_utils.get_glance_session_client( + self.get_keystone_session_demo_user(ip)) + try: + glance_client.images.list() + except glanceclient.common.exceptions.HTTPForbidden: + raise PolicydOperationFailedException() + + +class CinderTests(BasePolicydSpecialization): + """Test the policyd override using the cinder client.""" + + _rule = "{'volume:get_all': '!'}" + + @classmethod + def setUpClass(cls, application_name=None): + """Run class setup for running CinderTests charm operation tests.""" + super(CinderTests, cls).setUpClass(application_name="cinder") + cls.application_name = "cinder" + + def get_client_and_attempt_operation(self, ip): + """Attempt to list the images as a policyd override. + + This operation should pass normally for the demo_user, and fail when + the rule has been overriden (see the `rule` class variable. + + :param ip: the IP address to get the session against. + :type ip: str + :raises: PolicydOperationFailedException if operation fails. + """ + cinder_client = openstack_utils.get_cinder_session_client( + self.get_keystone_session_admin_user(ip)) + try: + cinder_client.volumes.list() + except cinderclient.exceptions.Forbidden: raise PolicydOperationFailedException()