Modified policyd tests to support openstack-dashboard

* Addition of the (incomplete) openstack-dashboard test.
* Modification of other charm policyd override tests to support
  multi-file policy overrides resource ZIP.
This commit is contained in:
Alex Kavanagh
2019-11-15 15:41:16 +00:00
parent 8f09ed84d5
commit c0db2def3b
3 changed files with 203 additions and 130 deletions

View File

@@ -25,7 +25,7 @@ deps = -r{toxinidir}/requirements.txt
commands = /bin/true
[flake8]
ignore = E402,E226,W504
ignore = E402,E226,W503
per-file-ignores =
unit_tests/**: D

View File

@@ -18,12 +18,141 @@ import logging
import requests
import tenacity
import urllib.request
import yaml
import zaza.model as zaza_model
import zaza.openstack.utilities.openstack as openstack_utils
import zaza.openstack.charm_tests.test_utils as test_utils
import zaza.openstack.utilities.juju as openstack_juju
import zaza.openstack.charm_tests.policyd.tests as policyd
class AuthExceptions(Exception):
"""Exception base class for the 401 test."""
pass
class FailedAuth(AuthExceptions):
"""Failed exception for the 401 test."""
pass
def _get_dashboard_ip():
"""Get the IP of the dashboard.
:returns: The IP of the dashboard
:rtype: str
"""
unit_name = zaza_model.get_lead_unit_name('openstack-dashboard')
keystone_unit = zaza_model.get_lead_unit_name('keystone')
dashboard_relation = openstack_juju.get_relation_from_unit(
keystone_unit, unit_name, 'identity-service')
dashboard_ip = dashboard_relation['private-address']
logging.debug("dashboard_ip is: {}".format(dashboard_ip))
return dashboard_ip
# NOTE: intermittent authentication fails. Wrap in a retry loop.
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=1,
min=5, max=10),
reraise=True)
def _login(dashboard_ip, domain, username, password):
"""Login to the website to get a session.
:param dashboard_ip: The IP address of the dashboard to log in to.
:type dashboard_ip: str
:param domain: the domain to login into
:type domain: str
:param username: the username to login as
:type username: str
:param password: the password to use to login
:type password: str
:returns: tuple of (client, response) where response is the page after
logging in.
:rtype: (requests.sessions.Session, requests.models.Response)
:raises: FailedAuth if the authorisation doesn't work
"""
auth_url = 'http://{}/horizon/auth/login/'.format(dashboard_ip)
# start session, get csrftoken
client = requests.session()
client.get(auth_url)
if 'csrftoken' in client.cookies:
csrftoken = client.cookies['csrftoken']
else:
raise Exception("Missing csrftoken")
# build and send post request
overcloud_auth = openstack_utils.get_overcloud_auth()
if overcloud_auth['OS_AUTH_URL'].endswith("v2.0"):
api_version = 2
else:
api_version = 3
keystone_client = openstack_utils.get_keystone_client(
overcloud_auth)
catalog = keystone_client.service_catalog.get_endpoints()
logging.info(catalog)
if api_version == 2:
region = catalog['identity'][0]['publicURL']
else:
region = [i['url']
for i in catalog['identity']
if i['interface'] == 'public'][0]
auth = {
'domain': domain,
'username': username,
'password': password,
'csrfmiddlewaretoken': csrftoken,
'next': '/horizon/',
'region': region,
}
# In the minimal test deployment /horizon/project/ is unauthorized,
# this does not occur in a full deployment and is probably due to
# services/information missing that horizon wants to display data
# for.
# Redirect to /horizon/identity/ instead.
if (openstack_utils.get_os_release()
>= openstack_utils.get_os_release('xenial_queens')):
auth['next'] = '/horizon/identity/'
if (openstack_utils.get_os_release()
>= openstack_utils.get_os_release('bionic_stein')):
auth['region'] = 'default'
if api_version == 2:
del auth['domain']
logging.info('POST data: "{}"'.format(auth))
response = client.post(auth_url, data=auth, headers={'Referer': auth_url})
# NOTE(ajkavanagh) there used to be a trusty/icehouse test in the
# amulet test, but as the zaza tests only test from trusty/mitaka
# onwards, the test has been dropped
if (openstack_utils.get_os_release()
>= openstack_utils.get_os_release('bionic_stein')):
expect = "Sign Out"
# update the in dashboard seems to require region to be default in
# this test configuration
region = 'default'
else:
expect = 'Projects - OpenStack Dashboard'
if expect not in response.text:
msg = 'FAILURE code={} text="{}"'.format(response,
response.text)
# NOTE(thedac) amulet.raise_status exits on exception.
# Raise a custom exception.
logging.info("Yeah, wen't wrong: {}".format(msg))
raise FailedAuth(msg)
logging.info("Logged into okay")
return client, response
class OpenStackDashboardTests(test_utils.OpenStackBaseTest):
@@ -153,126 +282,21 @@ class OpenStackDashboardTests(test_utils.OpenStackBaseTest):
self.assertIn('OpenStack Dashboard', html,
"Dashboard frontpage check failed")
class AuthExceptions(Exception):
"""Exception base class for the 401 test."""
pass
class FailedAuth(AuthExceptions):
"""Failed exception for the 401 test."""
pass
class PassedAuth(AuthExceptions):
"""Passed exception for the 401 test."""
pass
def test_401_authenticate(self):
"""Validate that authentication succeeds for client log in.
Ported from amulet tests.
"""
"""Validate that authentication succeeds for client log in."""
logging.info('Checking authentication through dashboard...')
unit_name = zaza_model.get_lead_unit_name('openstack-dashboard')
keystone_unit = zaza_model.get_lead_unit_name('keystone')
dashboard_relation = openstack_juju.get_relation_from_unit(
keystone_unit, unit_name, 'identity-service')
dashboard_ip = dashboard_relation['private-address']
logging.debug("... dashboard_ip is:{}".format(dashboard_ip))
url = 'http://{}/horizon/auth/login/'.format(dashboard_ip)
dashboard_ip = _get_dashboard_ip()
overcloud_auth = openstack_utils.get_overcloud_auth()
if overcloud_auth['OS_AUTH_URL'].endswith("v2.0"):
api_version = 2
else:
api_version = 3
keystone_client = openstack_utils.get_keystone_client(
overcloud_auth)
catalog = keystone_client.service_catalog.get_endpoints()
logging.info(catalog)
if api_version == 2:
region = catalog['identity'][0]['publicURL']
else:
region = [i['url']
for i in catalog['identity']
if i['interface'] == 'public'][0]
# NOTE(ajkavanagh) there used to be a trusty/icehouse test in the
# amulet test, but as the zaza tests only test from trusty/mitaka
# onwards, the test has been dropped
if (openstack_utils.get_os_release() >=
openstack_utils.get_os_release('bionic_stein')):
expect = "Sign Out"
# update the in dashboard seems to require region to be default in
# this test configuration
region = 'default'
else:
expect = 'Projects - OpenStack Dashboard'
# NOTE(thedac) Similar to the connection test above we get occasional
# intermittent authentication fails. Wrap in a retry loop.
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=1,
min=5, max=10),
retry=tenacity.retry_unless_exception_type(
self.AuthExceptions),
reraise=True)
def _do_auth_check(expect):
# start session, get csrftoken
client = requests.session()
client.get(url)
if 'csrftoken' in client.cookies:
csrftoken = client.cookies['csrftoken']
else:
raise Exception("Missing csrftoken")
# build and send post request
auth = {
'domain': 'admin_domain',
'username': 'admin',
'password': overcloud_auth['OS_PASSWORD'],
'csrfmiddlewaretoken': csrftoken,
'next': '/horizon/',
'region': region,
}
# In the minimal test deployment /horizon/project/ is unauthorized,
# this does not occur in a full deployment and is probably due to
# services/information missing that horizon wants to display data
# for.
# Redirect to /horizon/identity/ instead.
if (openstack_utils.get_os_release() >=
openstack_utils.get_os_release('xenial_queens')):
auth['next'] = '/horizon/identity/'
if (openstack_utils.get_os_release() >=
openstack_utils.get_os_release('bionic_stein')):
auth['region'] = 'default'
if api_version == 2:
del auth['domain']
logging.info('POST data: "{}"'.format(auth))
response = client.post(url, data=auth, headers={'Referer': url})
if expect not in response.text:
msg = 'FAILURE code={} text="{}"'.format(response,
response.text)
# NOTE(thedac) amulet.raise_status exits on exception.
# Raise a custom exception.
logging.info("Yeah, wen't wrong: {}".format(msg))
raise self.FailedAuth(msg)
raise self.PassedAuth()
try:
_do_auth_check(expect)
except self.FailedAuth as e:
assert False, str(e)
except self.PassedAuth:
pass
password = overcloud_auth['OS_PASSWORD'],
logging.info("admin password is {}".format(password))
# try to get the url which will either pass or fail with a 403
overcloud_auth = openstack_utils.get_overcloud_auth()
domain = 'admin_domain',
username = 'admin',
password = overcloud_auth['OS_PASSWORD'],
_login(dashboard_ip, domain, username, password)
logging.info('OK')
def test_404_connection(self):
"""Verify the apache status module gets disabled when hardening apache.
@@ -357,3 +381,52 @@ class OpenStackDashboardTests(test_utils.OpenStackBaseTest):
"""
with self.pause_resume(['apache2']):
logging.info("Testing pause resume")
class OpenStackDashboardPolicydTests(policyd.BasePolicydSpecialization):
"""Test the policyd override using the dashboard."""
_rule = {'identity/rule.yaml': yaml.dump({
'identity:list_domains': '!',
'identity:get_domain': '!',
'identity:update_domain': '!',
'identity:list_domains_for_user': '!',
})}
# url associated with rule above that will return HTTP 403
url = "http://{}/horizon/identity/domains"
@classmethod
def setUpClass(cls, application_name=None):
"""Run class setup for running horizon charm operation tests."""
super(OpenStackDashboardPolicydTests, cls).setUpClass(
application_name="openstack-dashboard")
cls.application_name = "openstack-dashboard"
def get_client_and_attempt_operation(self, ip):
"""Attempt to list users on the openstack-dashboard service.
This is slightly complicated in that the client is actually a web-site.
Thus, the test has to login first and then attempt the operation. This
makes the test a little more complicated.
:param ip: the IP address to get the session against.
:type ip: str
:raises: PolicydOperationFailedException if operation fails.
"""
dashboard_ip = _get_dashboard_ip()
logging.info("Dashboard is at {}".format(dashboard_ip))
overcloud_auth = openstack_utils.get_overcloud_auth()
password = overcloud_auth['OS_PASSWORD'],
logging.info("admin password is {}".format(password))
# try to get the url which will either pass or fail with a 403
overcloud_auth = openstack_utils.get_overcloud_auth()
domain = 'admin_domain',
username = 'admin',
password = overcloud_auth['OS_PASSWORD'],
client, response = _login(dashboard_ip, domain, username, password)
# now attempt to get the domains page
_url = self.url.format(dashboard_ip)
result = client.get(_url)
if result.status_code == 403:
raise policyd.PolicydOperationFailedException("Not authenticated")

View File

@@ -104,8 +104,8 @@ class PolicydTest(object):
zfp.writestr(name, contents)
return path
def _set_policy_with(self, rules):
rules_zip_path = self._make_zip_file_from('rules.zip', rules)
def _set_policy_with(self, rules, filename='rules.zip'):
rules_zip_path = self._make_zip_file_from(filename, rules)
zaza_model.attach_resource(self.application_name,
'policyd-override',
rules_zip_path)
@@ -198,8 +198,8 @@ class GenericPolicydTest(PolicydTest, test_utils.OpenStackBaseTest):
def setUpClass(cls, application_name=None):
"""Run class setup for running KeystonePolicydTest tests."""
super(GenericPolicydTest, cls).setUpClass(application_name)
if (openstack_utils.get_os_release() <
openstack_utils.get_os_release('xenial_queens')):
if (openstack_utils.get_os_release()
< openstack_utils.get_os_release('xenial_queens')):
raise unittest.SkipTest(
"zaza.openstack.charm_tests.policyd.tests.GenericPolicydTest "
"not valid before xenial_queens")
@@ -242,7 +242,7 @@ class BasePolicydSpecialization(PolicydTest,
class KeystonePolicydTest(BasePolicydSpecialization):
_rule = "{'identity:list_services': '!'}"
_rule = {'rule.yaml': "{'identity:list_services': '!'}"}
def get_client_and_attempt_operation(self, keystone_session):
... etc.
@@ -260,8 +260,8 @@ class BasePolicydSpecialization(PolicydTest,
def setUpClass(cls, application_name=None):
"""Run class setup for running KeystonePolicydTest tests."""
super(BasePolicydSpecialization, cls).setUpClass(application_name)
if (openstack_utils.get_os_release() <
openstack_utils.get_os_release('xenial_queens')):
if (openstack_utils.get_os_release()
< openstack_utils.get_os_release('xenial_queens')):
raise unittest.SkipTest(
"zaza.openstack.charm_tests.policyd.tests.* "
"not valid before xenial_queens")
@@ -386,7 +386,7 @@ class BasePolicydSpecialization(PolicydTest,
# now do the policyd override.
logging.info("Doing policyd override with: {}".format(self._rule))
self._set_policy_with({'rule.yaml': self._rule})
self._set_policy_with(self._rule)
zaza_model.block_until_all_units_idle()
# now make sure the operation fails
@@ -439,7 +439,7 @@ class BasePolicydSpecialization(PolicydTest,
class KeystoneTests(BasePolicydSpecialization):
"""Test the policyd override using the keystone client."""
_rule = "{'identity:list_services': '!'}"
_rule = {'rule.yaml': "{'identity:list_services': '!'}"}
@classmethod
def setUpClass(cls, application_name=None):
@@ -468,7 +468,7 @@ class KeystoneTests(BasePolicydSpecialization):
class NeutronApiTests(BasePolicydSpecialization):
"""Test the policyd override using the neutron client."""
_rule = "{'get_network': '!'}"
_rule = {'rule.yaml': "{'get_network': '!'}"}
@classmethod
def setUpClass(cls, application_name=None):
@@ -503,7 +503,7 @@ class NeutronApiTests(BasePolicydSpecialization):
class GlanceTests(BasePolicydSpecialization):
"""Test the policyd override using the glance client."""
_rule = "{'get_images': '!'}"
_rule = {'rule.yaml': "{'get_images': '!'}"}
@classmethod
def setUpClass(cls, application_name=None):
@@ -537,7 +537,7 @@ class GlanceTests(BasePolicydSpecialization):
class CinderTests(BasePolicydSpecialization):
"""Test the policyd override using the cinder client."""
_rule = "{'volume:get_all': '!'}"
_rule = {'rule.yaml': "{'volume:get_all': '!'}"}
@classmethod
def setUpClass(cls, application_name=None):
@@ -566,7 +566,7 @@ class CinderTests(BasePolicydSpecialization):
class HeatTests(BasePolicydSpecialization):
"""Test the policyd override using the heat client."""
_rule = "{'stacks:index': '!'}"
_rule = {'rule.yaml': "{'stacks:index': '!'}"}
@classmethod
def setUpClass(cls, application_name=None):