Add helps for charm functional tests

The idea of this change is to put together helpers for including
functional tests for charms in a central location. The charm
declares the tests and bundles to be run to complete a
functional tests but the tests themselves live here in zaza.

To use this code the charm should have the following:

1) zaza in the test-requirements.txt
2) tox.ini should include a target like:
```
[testenv:func35]
  basepython = python3
  commands =
    functest-bundle-deploy
```

3) Bundles which are to be used for the tests:
```
ls -1 tests/bundles/*
tests/bundles/xenial.yaml
```

4) A tests/tests.yaml file that describes the bundles to be run and
   the tests
```
charm_name: vault
tests:
  - zaza.charms_tests.vault.VaultTest
gate_bundles:
  - xenial
dev_bundles:
  - bionic
```

Tests can be run without running a deployment using functest-run-tests
with the list of test classes to be run:

```
functest-run-tests -t zaza.charms_tests.vault.VaultTest
```

Known Issues:

 - The deploy_bundle and add_model methods should be using libjuju
 - DEV_BUNDLES is currently ignored
 - VaultUtils and VaultTest should probably be in separate files.
 - When skipIfNotHA skips a test unittest does not pick up it has
   been skipped
 - A new model is created for each bundle even if an existing empty
   model exists
 - No model cleanup is performed.
This commit is contained in:
Liam Young
2018-03-22 10:13:12 +00:00
parent 7b7f616fe9
commit 458a76f05a
4 changed files with 213 additions and 99 deletions
+15
View File
@@ -0,0 +1,15 @@
import logging
import unittest
import zaza.model
def skipIfNotHA(service_name):
def _skipIfNotHA_inner_1(f):
def _skipIfNotHA_inner_2(*args, **kwargs):
if len(zaza.model.unit_ips(service_name)) > 1:
return f(*args, **kwargs)
else:
logging.warn("Skipping HA test for non-ha service {}".format(service_name))
return _skipIfNotHA_inner_2
return _skipIfNotHA_inner_1
+182
View File
@@ -0,0 +1,182 @@
#!/usr/bin/env python3
import hvac
import logging
import os
import requests
import time
import unittest
import urllib3
import uuid
import yaml
import zaza.charms_tests.test_utils as test_utils
import zaza.model
class VaultUtils(object):
def get_client(self, vault_url):
"""Return an hvac client for the given URL
:param vault_url: Vault url to point client at
:returns: hvac.Client
"""
return hvac.Client(url=vault_url)
def init_vault(self, client, shares=1, threshold=1):
"""Initialise vault
:param client: hvac.Client Client to use for initiliasation
:param shares: int Number of key shares to create
:param threshold: int Number of keys needed to unseal vault
:returns: hvac.Client
"""
return client.initialize(shares, threshold)
def get_clients(self, units=None):
"""Create a list of clients, one per vault server
:param units: [ip1, ip2, ...] List of IP addresses of vault endpoints
:returns: [hvac.Client, ...] List of clients
"""
if not units:
units = zaza.model.unit_ips('vault')
clients = []
for unit in units:
vault_url = 'http://{}:8200'.format(unit)
clients.append((unit, self.get_client(vault_url)))
return clients
def is_initialized(self, client):
"""Check if vault is initialized
:param client: hvac.Client Client to use to check if vault is
initialized
:returns: bool
"""
initialized = False
for i in range(1, 10):
try:
initialized = client[1].is_initialized()
except (ConnectionRefusedError,
urllib3.exceptions.NewConnectionError,
urllib3.exceptions.MaxRetryError,
requests.exceptions.ConnectionError):
time.sleep(2)
else:
break
else:
raise Exception("Cannot connect")
return initialized
def get_credentails_from_file(self, auth_file):
"""Read the vault credentials from the auth_file
:param auth_file: str Path to file with credentials
:returns: {} Dictionary of credentials
"""
with open(auth_file, 'r') as stream:
vault_creds = yaml.load(stream)
return vault_creds
def write_credentails(self, auth_file, vault_creds):
"""Write the vault credentials to the auth_file
:param auth_file: str Path to file to write credentials
"""
with open(auth_file, 'w') as outfile:
yaml.dump(vault_creds, outfile, default_flow_style=False)
def unseal_all(self, clients, key):
"""Unseal all the vaults with the given clients with the provided key
:param clients: [hvac.Client, ...] List of clients
:param key: str key to unlock clients
"""
for (addr, client) in clients:
if client.is_sealed():
client.unseal(key)
def auth_all(self, clients, token):
"""Authenticate all the given clients with the provided token
:param clients: [hvac.Client, ...] List of clients
:param token: str token to authorize clients
"""
for (addr, client) in clients:
client.token = token
class VaultTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
vutils = VaultUtils()
cls.clients = vutils.get_clients()
unseal_client = cls.clients[0]
initialized = vutils.is_initialized(unseal_client)
# The credentials are written to a file to allow the tests to be re-run
# this is mainly useful for manually working on the tests.
auth_file = "/tmp/vault_tests.yaml"
if initialized:
vault_creds = vutils.get_credentails_from_file(auth_file)
else:
vault_creds = vutils.init_vault(unseal_client[1])
vutils.write_credentails(auth_file, vault_creds)
vutils.unseal_all(cls.clients, vault_creds['keys'][0])
vutils.auth_all(cls.clients, vault_creds['root_token'])
def test_all_clients_authenticated(self):
for (addr, client) in self.clients:
for i in range(1, 10):
try:
self.assertTrue(client.is_authenticated())
except hvac.exceptions.InternalServerError:
time.sleep(2)
else:
break
else:
self.assertTrue(client.is_authenticated())
def check_read(self, key, value):
for (addr, client) in self.clients:
self.assertEqual(
client.read('secret/uuids')['data']['uuid'],
value)
def test_consistent_read_write(self):
key = 'secret/uuids'
for (addr, client) in self.clients:
value = str(uuid.uuid1())
client.write(key, uuid=value, lease='1h')
# Now check all clients read the same value back
self.check_read(key, value)
@test_utils.skipIfNotHA('vault')
def test_vault_ha_statuses(self):
leader = []
leader_address = []
leader_cluster_address = []
for (addr, client) in self.clients:
self.assertTrue(client.ha_status['ha_enabled'])
leader_address.append(
client.ha_status['leader_address'])
leader_cluster_address.append(
client.ha_status['leader_cluster_address'])
if client.ha_status['is_self']:
leader.append(addr)
# Check there is exactly one leader
self.assertEqual(len(leader), 1)
# Check both cluster addresses match accross the cluster
self.assertEqual(len(set(leader_address)), 1)
self.assertEqual(len(set(leader_cluster_address)), 1)
def test_check_vault_status(self):
for (addr, client) in self.clients:
self.assertFalse(client.seal_status['sealed'])
self.assertTrue(client.seal_status['cluster_name'])
if __name__ == '__main__':
unittest.main()
-99
View File
@@ -1,99 +0,0 @@
import argparse
import datetime
import importlib
import logging
import os
import subprocess
import sys
import unittest
import yaml
import juju_wait
BUNDLE_DIR = "./tests/bundles/"
DEFAULT_TEST_CONFIG = "./tests/tests.yaml"
def deploy_bundle(bundle, model):
"""Deploy the given bundle file in the specified model
:param bundle: Path to bundle file
:type bundle: str
:param model: Name of model to deploy bundle in
:type model: str
"""
logging.info("Deploying bundle {}".format(bundle))
subprocess.check_call(['juju', 'deploy', '-m', model, bundle])
def add_model(model_name):
"""Add a model with the given name
:param model: Name of model to add
:type bundle: str
"""
logging.info("Adding model {}".format(model_name))
subprocess.check_call(['juju', 'add-model', model_name])
def get_test_class(class_str):
"""Get the test class represented by the given string
For example, get_test_class('zaza.charms_tests.svc.TestSVCClass1')
returns zaza.charms_tests.svc.TestSVCClass1
:param class_str: Class to be returned
:type class_str: str
:returns: Test class
:rtype: class
"""
test_module_name = '.'.join(class_str.split('.')[:-1])
test_class_name = class_str.split('.')[-1]
test_module = importlib.import_module(test_module_name)
return getattr(test_module, test_class_name)
def run_test_list(tests):
"""Run the tests as defined in the list of test classes in series.
:param tests: List of test class strings
:type tests: ['zaza.charms_tests.svc.TestSVCClass1', ...]
:raises: AssertionError if test run fails
"""
for _testcase in tests:
testcase = get_test_class(_testcase)
suite = unittest.TestLoader().loadTestsFromTestCase(testcase)
test_result = unittest.TextTestRunner(verbosity=2).run(suite)
assert test_result.wasSuccessful(), "Test run failed"
def deploy():
"""Deploy the bundles and run the tests as defined by the charms tests.yaml
"""
test_config = get_test_config()
for t in test_config['gate_bundles']:
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
model_name = '{}{}{}'.format(test_config['charm_name'], t, timestamp)
add_model(model_name)
deploy_bundle(
os.path.join(BUNDLE_DIR, '{}.yaml'.format(t)),
model_name)
logging.info("Waiting for environment to settle")
juju_wait.wait()
run_test_list(test_config['tests'])
def get_test_config():
"""Read the yaml test config file and return the resulting config
:returns: Config dictionary
:rtype: dict
"""
with open(DEFAULT_TEST_CONFIG, 'r') as stream:
return yaml.load(stream)
def run_tests():
"""Run the tests defined by the command line args or if none were provided
read the tests from the charms tests.yaml config file"""
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument('-t','--tests', nargs='+',
help='Space sperated list of test classes',
required=False)
args = parser.parse_args()
tests = args.tests or get_test_config()['tests']
run_test_list(tests)
+16
View File
@@ -19,6 +19,22 @@ async def deployed(filter=None):
await model.disconnect()
async def _unit_ips(service_name):
# Create a Model instance. We need to connect our Model to a Juju api
# server before we can use it.
model = Model()
# Connect to the currently active Juju model
await model.connect_current()
app = model.applications[service_name]
ips = []
for unit in app.units:
ips.append(unit.public_address)
await model.disconnect()
return ips
def unit_ips(service_name):
return loop.run(_unit_ips(service_name))
def main():
# Run the deploy coroutine in an asyncio event loop, using a helper
# that abstracts loop creation and teardown.