Add vault tests

This commit is contained in:
Liam Young
2018-03-26 08:51:09 +00:00
parent 117ae4127d
commit a1f80dced7
4 changed files with 243 additions and 0 deletions

View File

View File

@@ -0,0 +1,27 @@
#!/usr/bin/env python3
import hvac
import logging
import os
import requests
import time
import unittest
import urllib3
import uuid
import yaml
import zaza.charm_tests.test_utils as test_utils
import zaza.charm_tests.vault.utils as vault_utils
def basic_setup():
clients = vault_utils.get_clients()
unseal_client = clients[0]
initialized = vault_utils.is_initialized(unseal_client)
# The credentials are written to a file to allow the tests to be re-run
# this is mainly useful for manually working on the tests.
if initialized:
vault_creds = vault_utils.get_credentails()
else:
vault_creds = vault_utils.init_vault(unseal_client[1])
vault_utils.store_credentails(vault_creds)
print(vault_creds)

View File

@@ -0,0 +1,90 @@
#!/usr/bin/env python3
import hvac
import logging
import os
import requests
import time
import unittest
import urllib3
import uuid
import yaml
import zaza.charm_tests.test_utils as test_utils
import zaza.charm_tests.vault.utils as vault_utils
class VaultTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
# vault_utils = VaultUtils()
# cls.clients = vault_utils.get_clients()
# unseal_client = cls.clients[0]
# initialized = vault_utils.is_initialized(unseal_client)
# # The credentials are written to a file to allow the tests to be re-run
# # this is mainly useful for manually working on the tests.
# auth_file = "/tmp/vault_tests.yaml"
# if initialized:
# vault_creds = vault_utils.get_credentails_from_file(auth_file)
# else:
# vault_creds = vault_utils.init_vault(unseal_client[1])
# vault_utils.write_credentails(auth_file, vault_creds)
cls.clients = vault_utils.get_clients()
vault_creds = vault_utils.get_credentails()
vault_utils.unseal_all(cls.clients, vault_creds['keys'][0])
vault_utils.auth_all(cls.clients, vault_creds['root_token'])
def test_all_clients_authenticated(self):
for (addr, client) in self.clients:
for i in range(1, 10):
try:
self.assertTrue(client.is_authenticated())
except hvac.exceptions.InternalServerError:
time.sleep(2)
else:
break
else:
self.assertTrue(client.is_authenticated())
def check_read(self, key, value):
for (addr, client) in self.clients:
self.assertEqual(
client.read('secret/uuids')['data']['uuid'],
value)
def test_consistent_read_write(self):
key = 'secret/uuids'
for (addr, client) in self.clients:
value = str(uuid.uuid1())
client.write(key, uuid=value, lease='1h')
# Now check all clients read the same value back
self.check_read(key, value)
@test_utils.skipIfNotHA('vault')
def test_vault_ha_statuses(self):
leader = []
leader_address = []
leader_cluster_address = []
for (addr, client) in self.clients:
self.assertTrue(client.ha_status['ha_enabled'])
leader_address.append(
client.ha_status['leader_address'])
leader_cluster_address.append(
client.ha_status['leader_cluster_address'])
if client.ha_status['is_self']:
leader.append(addr)
# Check there is exactly one leader
self.assertEqual(len(leader), 1)
# Check both cluster addresses match accross the cluster
self.assertEqual(len(set(leader_address)), 1)
self.assertEqual(len(set(leader_cluster_address)), 1)
def test_check_vault_status(self):
for (addr, client) in self.clients:
self.assertFalse(client.seal_status['sealed'])
self.assertTrue(client.seal_status['cluster_name'])
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,126 @@
#!/usr/bin/env python3
import asyncio
import hvac
import logging
import os
import requests
import time
import tempfile
import unittest
import urllib3
import uuid
import yaml
import zaza.charm_tests.test_utils as test_utils
import zaza.model
AUTH_FILE = "vault_tests.yaml"
def get_client(vault_url):
"""Return an hvac client for the given URL
:param vault_url: Vault url to point client at
:returns: hvac.Client
"""
return hvac.Client(url=vault_url)
def init_vault(client, shares=1, threshold=1):
"""Initialise vault
:param client: hvac.Client Client to use for initiliasation
:param shares: int Number of key shares to create
:param threshold: int Number of keys needed to unseal vault
:returns: hvac.Client
"""
return client.initialize(shares, threshold)
def get_clients(units=None):
"""Create a list of clients, one per vault server
:param units: [ip1, ip2, ...] List of IP addresses of vault endpoints
:returns: [hvac.Client, ...] List of clients
"""
if not units:
units = zaza.model.get_app_ips('vault')
clients = []
for unit in units:
vault_url = 'http://{}:8200'.format(unit)
clients.append((unit, get_client(vault_url)))
return clients
def is_initialized(client):
"""Check if vault is initialized
:param client: hvac.Client Client to use to check if vault is
initialized
:returns: bool
"""
initialized = False
for i in range(1, 10):
try:
initialized = client[1].is_initialized()
except (ConnectionRefusedError,
urllib3.exceptions.NewConnectionError,
urllib3.exceptions.MaxRetryError,
requests.exceptions.ConnectionError):
time.sleep(2)
else:
break
else:
raise Exception("Cannot connect")
return initialized
def get_credentails():
unit = zaza.model.get_first_unit('vault')
with tempfile.TemporaryDirectory() as tmpdirname:
tmp_file = '{}/{}'.format(tmpdirname, AUTH_FILE)
zaza.model.scp_from_unit(unit, '~/{}'.format(AUTH_FILE), tmp_file)
with open(tmp_file, 'r') as stream:
creds = yaml.load(stream)
return creds
def store_credentails(creds):
unit = zaza.model.get_first_unit('vault')
with tempfile.NamedTemporaryFile(mode='w') as fp:
fp.write(yaml.dump(creds))
fp.flush()
zaza.model.scp_to_unit(unit, fp.name, '~/{}'.format(AUTH_FILE))
def get_credentails_from_file(auth_file):
"""Read the vault credentials from the auth_file
:param auth_file: str Path to file with credentials
:returns: {} Dictionary of credentials
"""
with open(auth_file, 'r') as stream:
vault_creds = yaml.load(stream)
return vault_creds
def write_credentails(auth_file, vault_creds):
"""Write the vault credentials to the auth_file
:param auth_file: str Path to file to write credentials
"""
with open(auth_file, 'w') as outfile:
yaml.dump(vault_creds, outfile, default_flow_style=False)
def unseal_all(clients, key):
"""Unseal all the vaults with the given clients with the provided key
:param clients: [hvac.Client, ...] List of clients
:param key: str key to unlock clients
"""
for (addr, client) in clients:
if client.is_sealed():
client.unseal(key)
def auth_all(clients, token):
"""Authenticate all the given clients with the provided token
:param clients: [hvac.Client, ...] List of clients
:param token: str token to authorize clients
"""
for (addr, client) in clients:
client.token = token