Merge pull request #126 from ajkavanagh/policy-overrides-openstack-dashboard

Modified policyd tests to support openstack-dashboard
This commit is contained in:
Liam Young
2019-11-21 13:12:26 +00:00
committed by GitHub
3 changed files with 251 additions and 147 deletions

View File

@@ -25,7 +25,7 @@ deps = -r{toxinidir}/requirements.txt
commands = /bin/true
[flake8]
ignore = E402,E226,W504
ignore = E402,E226,W503
per-file-ignores =
unit_tests/**: D

View File

@@ -14,16 +14,144 @@
"""Encapsulate horizon (openstack-dashboard) charm testing."""
import http.client
import logging
import requests
import tenacity
import urllib.request
import yaml
import zaza.model as zaza_model
import zaza.openstack.utilities.openstack as openstack_utils
import zaza.openstack.charm_tests.test_utils as test_utils
import zaza.openstack.utilities.juju as openstack_juju
import zaza.openstack.charm_tests.policyd.tests as policyd
class AuthExceptions(Exception):
"""Exception base class for the 401 test."""
pass
class FailedAuth(AuthExceptions):
"""Failed exception for the 401 test."""
pass
def _get_dashboard_ip():
"""Get the IP of the dashboard.
:returns: The IP of the dashboard
:rtype: str
"""
unit_name = zaza_model.get_lead_unit_name('openstack-dashboard')
keystone_unit = zaza_model.get_lead_unit_name('keystone')
dashboard_relation = openstack_juju.get_relation_from_unit(
keystone_unit, unit_name, 'identity-service')
dashboard_ip = dashboard_relation['private-address']
logging.debug("dashboard_ip is: {}".format(dashboard_ip))
return dashboard_ip
# NOTE: intermittent authentication fails. Wrap in a retry loop.
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=1,
min=5, max=10),
reraise=True)
def _login(dashboard_ip, domain, username, password):
"""Login to the website to get a session.
:param dashboard_ip: The IP address of the dashboard to log in to.
:type dashboard_ip: str
:param domain: the domain to login into
:type domain: str
:param username: the username to login as
:type username: str
:param password: the password to use to login
:type password: str
:returns: tuple of (client, response) where response is the page after
logging in.
:rtype: (requests.sessions.Session, requests.models.Response)
:raises: FailedAuth if the authorisation doesn't work
"""
auth_url = 'http://{}/horizon/auth/login/'.format(dashboard_ip)
# start session, get csrftoken
client = requests.session()
client.get(auth_url)
if 'csrftoken' in client.cookies:
csrftoken = client.cookies['csrftoken']
else:
raise Exception("Missing csrftoken")
# build and send post request
overcloud_auth = openstack_utils.get_overcloud_auth()
if overcloud_auth['OS_AUTH_URL'].endswith("v2.0"):
api_version = 2
else:
api_version = 3
keystone_client = openstack_utils.get_keystone_client(
overcloud_auth)
catalog = keystone_client.service_catalog.get_endpoints()
logging.info(catalog)
if api_version == 2:
region = catalog['identity'][0]['publicURL']
else:
region = [i['url']
for i in catalog['identity']
if i['interface'] == 'public'][0]
auth = {
'domain': domain,
'username': username,
'password': password,
'csrfmiddlewaretoken': csrftoken,
'next': '/horizon/',
'region': region,
}
# In the minimal test deployment /horizon/project/ is unauthorized,
# this does not occur in a full deployment and is probably due to
# services/information missing that horizon wants to display data
# for.
# Redirect to /horizon/identity/ instead.
if (openstack_utils.get_os_release()
>= openstack_utils.get_os_release('xenial_queens')):
auth['next'] = '/horizon/identity/'
if (openstack_utils.get_os_release()
>= openstack_utils.get_os_release('bionic_stein')):
auth['region'] = 'default'
if api_version == 2:
del auth['domain']
logging.info('POST data: "{}"'.format(auth))
response = client.post(auth_url, data=auth, headers={'Referer': auth_url})
# NOTE(ajkavanagh) there used to be a trusty/icehouse test in the
# amulet test, but as the zaza tests only test from trusty/mitaka
# onwards, the test has been dropped
if (openstack_utils.get_os_release()
>= openstack_utils.get_os_release('bionic_stein')):
expect = "Sign Out"
# update the in dashboard seems to require region to be default in
# this test configuration
region = 'default'
else:
expect = 'Projects - OpenStack Dashboard'
if expect not in response.text:
msg = 'FAILURE code={} text="{}"'.format(response,
response.text)
logging.info("Yeah, wen't wrong: {}".format(msg))
raise FailedAuth(msg)
logging.info("Logged into okay")
return client, response
class OpenStackDashboardTests(test_utils.OpenStackBaseTest):
@@ -153,126 +281,21 @@ class OpenStackDashboardTests(test_utils.OpenStackBaseTest):
self.assertIn('OpenStack Dashboard', html,
"Dashboard frontpage check failed")
class AuthExceptions(Exception):
"""Exception base class for the 401 test."""
pass
class FailedAuth(AuthExceptions):
"""Failed exception for the 401 test."""
pass
class PassedAuth(AuthExceptions):
"""Passed exception for the 401 test."""
pass
def test_401_authenticate(self):
"""Validate that authentication succeeds for client log in.
Ported from amulet tests.
"""
"""Validate that authentication succeeds for client log in."""
logging.info('Checking authentication through dashboard...')
unit_name = zaza_model.get_lead_unit_name('openstack-dashboard')
keystone_unit = zaza_model.get_lead_unit_name('keystone')
dashboard_relation = openstack_juju.get_relation_from_unit(
keystone_unit, unit_name, 'identity-service')
dashboard_ip = dashboard_relation['private-address']
logging.debug("... dashboard_ip is:{}".format(dashboard_ip))
url = 'http://{}/horizon/auth/login/'.format(dashboard_ip)
dashboard_ip = _get_dashboard_ip()
overcloud_auth = openstack_utils.get_overcloud_auth()
if overcloud_auth['OS_AUTH_URL'].endswith("v2.0"):
api_version = 2
else:
api_version = 3
keystone_client = openstack_utils.get_keystone_client(
overcloud_auth)
catalog = keystone_client.service_catalog.get_endpoints()
logging.info(catalog)
if api_version == 2:
region = catalog['identity'][0]['publicURL']
else:
region = [i['url']
for i in catalog['identity']
if i['interface'] == 'public'][0]
# NOTE(ajkavanagh) there used to be a trusty/icehouse test in the
# amulet test, but as the zaza tests only test from trusty/mitaka
# onwards, the test has been dropped
if (openstack_utils.get_os_release() >=
openstack_utils.get_os_release('bionic_stein')):
expect = "Sign Out"
# update the in dashboard seems to require region to be default in
# this test configuration
region = 'default'
else:
expect = 'Projects - OpenStack Dashboard'
# NOTE(thedac) Similar to the connection test above we get occasional
# intermittent authentication fails. Wrap in a retry loop.
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=1,
min=5, max=10),
retry=tenacity.retry_unless_exception_type(
self.AuthExceptions),
reraise=True)
def _do_auth_check(expect):
# start session, get csrftoken
client = requests.session()
client.get(url)
if 'csrftoken' in client.cookies:
csrftoken = client.cookies['csrftoken']
else:
raise Exception("Missing csrftoken")
# build and send post request
auth = {
'domain': 'admin_domain',
'username': 'admin',
'password': overcloud_auth['OS_PASSWORD'],
'csrfmiddlewaretoken': csrftoken,
'next': '/horizon/',
'region': region,
}
# In the minimal test deployment /horizon/project/ is unauthorized,
# this does not occur in a full deployment and is probably due to
# services/information missing that horizon wants to display data
# for.
# Redirect to /horizon/identity/ instead.
if (openstack_utils.get_os_release() >=
openstack_utils.get_os_release('xenial_queens')):
auth['next'] = '/horizon/identity/'
if (openstack_utils.get_os_release() >=
openstack_utils.get_os_release('bionic_stein')):
auth['region'] = 'default'
if api_version == 2:
del auth['domain']
logging.info('POST data: "{}"'.format(auth))
response = client.post(url, data=auth, headers={'Referer': url})
if expect not in response.text:
msg = 'FAILURE code={} text="{}"'.format(response,
response.text)
# NOTE(thedac) amulet.raise_status exits on exception.
# Raise a custom exception.
logging.info("Yeah, wen't wrong: {}".format(msg))
raise self.FailedAuth(msg)
raise self.PassedAuth()
try:
_do_auth_check(expect)
except self.FailedAuth as e:
assert False, str(e)
except self.PassedAuth:
pass
password = overcloud_auth['OS_PASSWORD'],
logging.info("admin password is {}".format(password))
# try to get the url which will either pass or fail with a 403
overcloud_auth = openstack_utils.get_overcloud_auth()
domain = 'admin_domain',
username = 'admin',
password = overcloud_auth['OS_PASSWORD'],
_login(dashboard_ip, domain, username, password)
logging.info('OK')
def test_404_connection(self):
"""Verify the apache status module gets disabled when hardening apache.
@@ -291,18 +314,29 @@ class OpenStackDashboardTests(test_utils.OpenStackBaseTest):
logging.debug('Maybe enabling hardening for apache...')
_app_config = zaza_model.get_application_config(self.application_name)
logging.info(_app_config['harden'])
# NOTE(ajkavanagh): it seems that apache2 doesn't start quickly enough
# for the test, and so it gets reset errors; repeat until either that
# stops or there is a failure
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=1,
min=5, max=10),
retry=tenacity.retry_if_exception_type(
http.client.RemoteDisconnected),
reraise=True)
def _do_request():
return urllib.request.urlopen('http://{}/server-status'
.format(dashboard_ip))
with self.config_change(
{'harden': _app_config['harden'].get('value', '')},
{'harden': 'apache'}):
try:
urllib.request.urlopen('http://{}/server-status'
.format(dashboard_ip))
_do_request()
except urllib.request.HTTPError as e:
if e.code == 404:
return
# test failed if it didn't return 404
msg = "Apache mod_status check failed."
assert False, msg
# test failed if it didn't return 404
msg = "Apache mod_status check failed."
self.assertEqual(e.code, 404, msg)
logging.info('OK')
def test_501_security_checklist_action(self):
"""Verify expected result on a default install.
@@ -357,3 +391,59 @@ class OpenStackDashboardTests(test_utils.OpenStackBaseTest):
"""
with self.pause_resume(['apache2']):
logging.info("Testing pause resume")
class OpenStackDashboardPolicydTests(policyd.BasePolicydSpecialization):
"""Test the policyd override using the dashboard."""
good = {
"identity/file1.yaml": "{'rule1': '!'}"
}
bad = {
"identity/file2.yaml": "{'rule': '!}"
}
path_infix = "keystone_policy.d"
_rule = {'identity/rule.yaml': yaml.dump({
'identity:list_domains': '!',
'identity:get_domain': '!',
'identity:update_domain': '!',
'identity:list_domains_for_user': '!',
})}
# url associated with rule above that will return HTTP 403
url = "http://{}/horizon/identity/domains"
@classmethod
def setUpClass(cls, application_name=None):
"""Run class setup for running horizon charm operation tests."""
super(OpenStackDashboardPolicydTests, cls).setUpClass(
application_name="openstack-dashboard")
cls.application_name = "openstack-dashboard"
def get_client_and_attempt_operation(self, ip):
"""Attempt to list users on the openstack-dashboard service.
This is slightly complicated in that the client is actually a web-site.
Thus, the test has to login first and then attempt the operation. This
makes the test a little more complicated.
:param ip: the IP address to get the session against.
:type ip: str
:raises: PolicydOperationFailedException if operation fails.
"""
dashboard_ip = _get_dashboard_ip()
logging.info("Dashboard is at {}".format(dashboard_ip))
overcloud_auth = openstack_utils.get_overcloud_auth()
password = overcloud_auth['OS_PASSWORD'],
logging.info("admin password is {}".format(password))
# try to get the url which will either pass or fail with a 403
overcloud_auth = openstack_utils.get_overcloud_auth()
domain = 'admin_domain',
username = 'admin',
password = overcloud_auth['OS_PASSWORD'],
client, response = _login(dashboard_ip, domain, username, password)
# now attempt to get the domains page
_url = self.url.format(dashboard_ip)
result = client.get(_url)
if result.status_code == 403:
raise policyd.PolicydOperationFailedException("Not authenticated")

View File

@@ -63,6 +63,14 @@ class PolicydTest(object):
service: keystone
"""
good = {
"file1.yaml": "{'rule1': '!'}"
}
bad = {
"file2.yaml": "{'rule': '!}"
}
path_infix = ""
@classmethod
def setUpClass(cls, application_name=None):
"""Run class setup for running Policyd charm operation tests."""
@@ -104,8 +112,8 @@ class PolicydTest(object):
zfp.writestr(name, contents)
return path
def _set_policy_with(self, rules):
rules_zip_path = self._make_zip_file_from('rules.zip', rules)
def _set_policy_with(self, rules, filename='rules.zip'):
rules_zip_path = self._make_zip_file_from(filename, rules)
zaza_model.attach_resource(self.application_name,
'policyd-override',
rules_zip_path)
@@ -115,9 +123,7 @@ class PolicydTest(object):
def test_001_policyd_good_yaml(self):
"""Test that the policyd with a good zipped yaml file."""
good = {
'file1.yaml': "{'rule1': '!'}"
}
good = self.good
good_zip_path = self._make_zip_file_from('good.zip', good)
logging.info("Attaching good zip file as a resource.")
zaza_model.attach_resource(self.application_name,
@@ -127,8 +133,13 @@ class PolicydTest(object):
logging.debug("Now setting config to true")
self._set_config(True)
# check that the file gets to the right location
path = os.path.join(
"/etc", self._service_name, "policy.d", 'file1.yaml')
if self.path_infix:
path = os.path.join(
"/etc", self._service_name, "policy.d", self.path_infix,
'file1.yaml')
else:
path = os.path.join(
"/etc", self._service_name, "policy.d", 'file1.yaml')
logging.info("Now checking for file contents: {}".format(path))
zaza_model.block_until_file_has_contents(self.application_name,
path,
@@ -162,9 +173,7 @@ class PolicydTest(object):
def test_002_policyd_bad_yaml(self):
"""Test bad yaml file in the zip file is handled."""
bad = {
"file2.yaml": "{'rule': '!}"
}
bad = self.bad
bad_zip_path = self._make_zip_file_from('bad.zip', bad)
logging.info("Attaching bad zip file as a resource")
zaza_model.attach_resource(self.application_name,
@@ -182,8 +191,13 @@ class PolicydTest(object):
logging.debug("App status is valid for broken yaml file")
zaza_model.block_until_all_units_idle()
# now verify that no file got landed on the machine
path = os.path.join(
"/etc", self._service_name, "policy.d", 'file2.yaml')
if self.path_infix:
path = os.path.join(
"/etc", self._service_name, "policy.d", self.path_infix,
'file2.yaml')
else:
path = os.path.join(
"/etc", self._service_name, "policy.d", 'file2.yaml')
logging.info("Now checking that file {} is not present.".format(path))
zaza_model.block_until_file_missing(self.application_name, path)
self._set_config(False)
@@ -198,8 +212,8 @@ class GenericPolicydTest(PolicydTest, test_utils.OpenStackBaseTest):
def setUpClass(cls, application_name=None):
"""Run class setup for running KeystonePolicydTest tests."""
super(GenericPolicydTest, cls).setUpClass(application_name)
if (openstack_utils.get_os_release() <
openstack_utils.get_os_release('xenial_queens')):
if (openstack_utils.get_os_release()
< openstack_utils.get_os_release('xenial_queens')):
raise unittest.SkipTest(
"zaza.openstack.charm_tests.policyd.tests.GenericPolicydTest "
"not valid before xenial_queens")
@@ -242,7 +256,7 @@ class BasePolicydSpecialization(PolicydTest,
class KeystonePolicydTest(BasePolicydSpecialization):
_rule = "{'identity:list_services': '!'}"
_rule = {'rule.yaml': "{'identity:list_services': '!'}"}
def get_client_and_attempt_operation(self, keystone_session):
... etc.
@@ -260,8 +274,8 @@ class BasePolicydSpecialization(PolicydTest,
def setUpClass(cls, application_name=None):
"""Run class setup for running KeystonePolicydTest tests."""
super(BasePolicydSpecialization, cls).setUpClass(application_name)
if (openstack_utils.get_os_release() <
openstack_utils.get_os_release('xenial_queens')):
if (openstack_utils.get_os_release()
< openstack_utils.get_os_release('xenial_queens')):
raise unittest.SkipTest(
"zaza.openstack.charm_tests.policyd.tests.* "
"not valid before xenial_queens")
@@ -386,7 +400,7 @@ class BasePolicydSpecialization(PolicydTest,
# now do the policyd override.
logging.info("Doing policyd override with: {}".format(self._rule))
self._set_policy_with({'rule.yaml': self._rule})
self._set_policy_with(self._rule)
zaza_model.block_until_all_units_idle()
# now make sure the operation fails
@@ -439,7 +453,7 @@ class BasePolicydSpecialization(PolicydTest,
class KeystoneTests(BasePolicydSpecialization):
"""Test the policyd override using the keystone client."""
_rule = "{'identity:list_services': '!'}"
_rule = {'rule.yaml': "{'identity:list_services': '!'}"}
@classmethod
def setUpClass(cls, application_name=None):
@@ -468,7 +482,7 @@ class KeystoneTests(BasePolicydSpecialization):
class NeutronApiTests(BasePolicydSpecialization):
"""Test the policyd override using the neutron client."""
_rule = "{'get_network': '!'}"
_rule = {'rule.yaml': "{'get_network': '!'}"}
@classmethod
def setUpClass(cls, application_name=None):
@@ -503,7 +517,7 @@ class NeutronApiTests(BasePolicydSpecialization):
class GlanceTests(BasePolicydSpecialization):
"""Test the policyd override using the glance client."""
_rule = "{'get_images': '!'}"
_rule = {'rule.yaml': "{'get_images': '!'}"}
@classmethod
def setUpClass(cls, application_name=None):
@@ -537,7 +551,7 @@ class GlanceTests(BasePolicydSpecialization):
class CinderTests(BasePolicydSpecialization):
"""Test the policyd override using the cinder client."""
_rule = "{'volume:get_all': '!'}"
_rule = {'rule.yaml': "{'volume:get_all': '!'}"}
@classmethod
def setUpClass(cls, application_name=None):
@@ -566,7 +580,7 @@ class CinderTests(BasePolicydSpecialization):
class HeatTests(BasePolicydSpecialization):
"""Test the policyd override using the heat client."""
_rule = "{'stacks:index': '!'}"
_rule = {'rule.yaml': "{'stacks:index': '!'}"}
@classmethod
def setUpClass(cls, application_name=None):