Add helps for charm functional tests

The idea of this change is to put together helpers for including
functional tests for charms in a central location. The charm
declares the tests and bundles to be run to complete a
functional tests but the tests themselves live here in zaza.

To use this code the charm should have the following:

1) zaza in the test-requirements.txt
2) tox.ini should include a target like:
```
[testenv:func35]
  basepython = python3
  commands =
    functest-bundle-deploy
```

3) Bundles which are to be used for the tests:
```
ls -1 tests/bundles/*
tests/bundles/xenial.yaml
```

4) A tests/tests.yaml file that describes the bundles to be run and
   the tests
```
charm_name: vault
tests:
  - zaza.charms_tests.vault.VaultTest
gate_bundles:
  - xenial
dev_bundles:
  - bionic
```

Tests can be run without running a deployment using functest-run-tests
with the list of test classes to be run:

```
functest-run-tests -t zaza.charms_tests.vault.VaultTest
```

Known Issues:

 - The deploy_bundle and add_model methods should be using libjuju
 - DEV_BUNDLES is currently ignored
 - VaultUtils and VaultTest should probably be in separate files.
 - When skipIfNotHA skips a test unittest does not pick up it has
   been skipped
 - A new model is created for each bundle even if an existing empty
   model exists
 - No model cleanup is performed.
This commit is contained in:
Liam Young
2018-03-22 10:13:12 +00:00
parent 5dbd8f10f0
commit 502b96c772
7 changed files with 281 additions and 2 deletions

View File

@@ -8,7 +8,9 @@ from setuptools.command.test import test as TestCommand
version = "0.0.1.dev1"
install_require = [
'juju'
'juju',
'juju-wait',
'hvac'
]
tests_require = [
@@ -72,6 +74,8 @@ setup(
],
entry_points={
'console_scripts': [
'functest-run-tests = zaza.functests.deploy:run_tests',
'functest-bundle-deploy = zaza.functests.deploy:deploy',
'current-apps = zaza.model:main',
'tempest-config = zaza.tempest_config:main',
]
@@ -88,4 +92,4 @@ setup(
'testing': tests_require,
},
tests_require=tests_require,
)
)

View File

View File

@@ -0,0 +1,15 @@
import logging
import unittest
import zaza.model
def skipIfNotHA(service_name):
def _skipIfNotHA_inner_1(f):
def _skipIfNotHA_inner_2(*args, **kwargs):
if len(zaza.model.unit_ips(service_name)) > 1:
return f(*args, **kwargs)
else:
logging.warn("Skipping HA test for non-ha service {}".format(service_name))
return _skipIfNotHA_inner_2
return _skipIfNotHA_inner_1

182
zaza/charms_tests/vault.py Executable file
View File

@@ -0,0 +1,182 @@
#!/usr/bin/env python3
import hvac
import logging
import os
import requests
import time
import unittest
import urllib3
import uuid
import yaml
import zaza.charms_tests.test_utils as test_utils
import zaza.model
class VaultUtils(object):
def get_client(self, vault_url):
"""Return an hvac client for the given URL
:param vault_url: Vault url to point client at
:returns: hvac.Client
"""
return hvac.Client(url=vault_url)
def init_vault(self, client, shares=1, threshold=1):
"""Initialise vault
:param client: hvac.Client Client to use for initiliasation
:param shares: int Number of key shares to create
:param threshold: int Number of keys needed to unseal vault
:returns: hvac.Client
"""
return client.initialize(shares, threshold)
def get_clients(self, units=None):
"""Create a list of clients, one per vault server
:param units: [ip1, ip2, ...] List of IP addresses of vault endpoints
:returns: [hvac.Client, ...] List of clients
"""
if not units:
units = zaza.model.unit_ips('vault')
clients = []
for unit in units:
vault_url = 'http://{}:8200'.format(unit)
clients.append((unit, self.get_client(vault_url)))
return clients
def is_initialized(self, client):
"""Check if vault is initialized
:param client: hvac.Client Client to use to check if vault is
initialized
:returns: bool
"""
initialized = False
for i in range(1, 10):
try:
initialized = client[1].is_initialized()
except (ConnectionRefusedError,
urllib3.exceptions.NewConnectionError,
urllib3.exceptions.MaxRetryError,
requests.exceptions.ConnectionError):
time.sleep(2)
else:
break
else:
raise Exception("Cannot connect")
return initialized
def get_credentails_from_file(self, auth_file):
"""Read the vault credentials from the auth_file
:param auth_file: str Path to file with credentials
:returns: {} Dictionary of credentials
"""
with open(auth_file, 'r') as stream:
vault_creds = yaml.load(stream)
return vault_creds
def write_credentails(self, auth_file, vault_creds):
"""Write the vault credentials to the auth_file
:param auth_file: str Path to file to write credentials
"""
with open(auth_file, 'w') as outfile:
yaml.dump(vault_creds, outfile, default_flow_style=False)
def unseal_all(self, clients, key):
"""Unseal all the vaults with the given clients with the provided key
:param clients: [hvac.Client, ...] List of clients
:param key: str key to unlock clients
"""
for (addr, client) in clients:
if client.is_sealed():
client.unseal(key)
def auth_all(self, clients, token):
"""Authenticate all the given clients with the provided token
:param clients: [hvac.Client, ...] List of clients
:param token: str token to authorize clients
"""
for (addr, client) in clients:
client.token = token
class VaultTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
vutils = VaultUtils()
cls.clients = vutils.get_clients()
unseal_client = cls.clients[0]
initialized = vutils.is_initialized(unseal_client)
# The credentials are written to a file to allow the tests to be re-run
# this is mainly useful for manually working on the tests.
auth_file = "/tmp/vault_tests.yaml"
if initialized:
vault_creds = vutils.get_credentails_from_file(auth_file)
else:
vault_creds = vutils.init_vault(unseal_client[1])
vutils.write_credentails(auth_file, vault_creds)
vutils.unseal_all(cls.clients, vault_creds['keys'][0])
vutils.auth_all(cls.clients, vault_creds['root_token'])
def test_all_clients_authenticated(self):
for (addr, client) in self.clients:
for i in range(1, 10):
try:
self.assertTrue(client.is_authenticated())
except hvac.exceptions.InternalServerError:
time.sleep(2)
else:
break
else:
self.assertTrue(client.is_authenticated())
def check_read(self, key, value):
for (addr, client) in self.clients:
self.assertEqual(
client.read('secret/uuids')['data']['uuid'],
value)
def test_consistent_read_write(self):
key = 'secret/uuids'
for (addr, client) in self.clients:
value = str(uuid.uuid1())
client.write(key, uuid=value, lease='1h')
# Now check all clients read the same value back
self.check_read(key, value)
@test_utils.skipIfNotHA('vault')
def test_vault_ha_statuses(self):
leader = []
leader_address = []
leader_cluster_address = []
for (addr, client) in self.clients:
self.assertTrue(client.ha_status['ha_enabled'])
leader_address.append(
client.ha_status['leader_address'])
leader_cluster_address.append(
client.ha_status['leader_cluster_address'])
if client.ha_status['is_self']:
leader.append(addr)
# Check there is exactly one leader
self.assertEqual(len(leader), 1)
# Check both cluster addresses match accross the cluster
self.assertEqual(len(set(leader_address)), 1)
self.assertEqual(len(set(leader_cluster_address)), 1)
def test_check_vault_status(self):
for (addr, client) in self.clients:
self.assertFalse(client.seal_status['sealed'])
self.assertTrue(client.seal_status['cluster_name'])
if __name__ == '__main__':
unittest.main()

View File

62
zaza/functests/deploy.py Executable file
View File

@@ -0,0 +1,62 @@
import argparse
import datetime
import importlib
import logging
import os
import subprocess
import sys
import unittest
import yaml
import juju_wait
BUNDLE_DIR = "./tests/bundles/"
DEFAULT_TEST_CONFIG = "./tests/tests.yaml"
def deploy_bundle(bundle, model):
logging.info("Deploying bundle {}".format(bundle))
subprocess.check_call(['juju', 'deploy', '-m', model, bundle])
def add_model(model_name):
logging.info("Adding model {}".format(model_name))
subprocess.check_call(['juju', 'add-model', model_name])
def get_test_class(class_str):
test_module_name = '.'.join(class_str.split('.')[:-1])
test_class_name = class_str.split('.')[-1]
test_module = importlib.import_module(test_module_name)
return getattr(test_module, test_class_name)
def run_test_list(tests):
for _testcase in tests:
testcase = get_test_class(_testcase)
suite = unittest.TestLoader().loadTestsFromTestCase(testcase)
test_result = unittest.TextTestRunner(verbosity=2).run(suite)
assert test_result.wasSuccessful(), "Test run failed"
def deploy():
test_config = get_test_config()
for t in test_config['gate_bundles']:
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
model_name = '{}{}{}'.format(test_config['charm_name'], t, timestamp)
add_model(model_name)
deploy_bundle(
os.path.join(BUNDLE_DIR, '{}.yaml'.format(t)),
model_name)
logging.info("Waiting for environment to settle")
juju_wait.wait()
run_test_list(test_config['tests'])
def get_test_config():
with open(DEFAULT_TEST_CONFIG, 'r') as stream:
return yaml.load(stream)
def run_tests():
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument('-t','--tests', nargs='+',
help='Space sperated list of test classes',
required=False)
args = parser.parse_args()
tests = args.tests or get_test_config()['tests']
run_test_list(tests)

View File

@@ -19,6 +19,22 @@ async def deployed(filter=None):
await model.disconnect()
async def _unit_ips(service_name):
# Create a Model instance. We need to connect our Model to a Juju api
# server before we can use it.
model = Model()
# Connect to the currently active Juju model
await model.connect_current()
app = model.applications[service_name]
ips = []
for unit in app.units:
ips.append(unit.public_address)
await model.disconnect()
return ips
def unit_ips(service_name):
return loop.run(_unit_ips(service_name))
def main():
# Run the deploy coroutine in an asyncio event loop, using a helper
# that abstracts loop creation and teardown.