Add vault cachine of secrets on relation test

This specific test is for the certificates relation to ensure that the
data presented to units related to vault have a consistent set of data.
This commit is contained in:
Alex Kavanagh
2023-07-20 12:17:57 +01:00
parent df6dd60e11
commit 752e643c33

View File

@@ -19,6 +19,7 @@
import contextlib
import json
import logging
import subprocess
import unittest
import uuid
@@ -350,5 +351,78 @@ class VaultTest(BaseVaultTest):
self.assertTrue(lead_client.hvac_client.seal_status['sealed'])
class VaultCacheTest(BaseVaultTest):
"""Encapsulate vault tests."""
@test_utils.skipIfNotHA('vault')
def test_vault_caching_unified_view(self):
"""Verify that the vault applicate presents consisent certificates.
On all of the relations to the clients.
"""
try:
application = zaza.model.get_application('keystone')
except KeyError:
self.skipTest("Application 'keystone' not available so skipping.")
# data keys that are 'strs'
key_str_values = ['ca', 'chain', 'client.cert', 'client.key']
for unit in application.units:
command = ['juju', 'show-unit', '--format=json', unit.entity_id]
output = subprocess.check_output(command).decode()
unit_info = json.loads(output)
# verify that unit_info{unit.entity_id}{'relation-info'}[n],
# where List item [n] is a {} where,
# [n]{'endpoint'} == 'certificates' AND
# [n]{'related_units'}{*}{'data'}{...} all match.
#
# first collect the list items that are 'certificates' endpoint.
relation_info_list = [
item
for item in unit_info[unit.entity_id]['relation-info']
if item['endpoint'] == 'certificates']
# collect the data for each of the units.
unit_data_list = [
{key: value['data']
for key, value in item['related-units'].items()}
for item in relation_info_list]
# for each str key, verify that it's the same string on all lists.
for str_key in key_str_values:
values = set((v[str_key]
for unit_data in unit_data_list
for v in unit_data.values()))
self.assertEqual(len(values), 1,
"Not all {} items in data match: {}"
.format(str_key, "\n".join(values)))
# now validate that 'processed_requests' are the same.
# they are json encoded, so need pulling out of the json; first get
# the keys that look like "keystone_0.processed_requests".
data_keys = set((k
for u in unit_data_list
for v in u.values()
for k in v.keys()))
processed_request_keys = [
k for k in data_keys if k.endswith(".processed_requests")]
# now for each processed_request keys, fetch the associated databag
# and json.loads it to get the values; they should match across the
# relations. Using json.loads just in case the ordering of the
# json.dumps is not determined.
for processed_request_key in processed_request_keys:
data_bags = [
(unit, json.loads(v[processed_request_key]))
for u in unit_data_list
for unit, v in u.items()]
# data_bags: [(unit, processed_requests dict)]
self.assertGreater(
len(data_bags), 1,
"Key {} is only in one bag".format(processed_request_key))
first_bag = data_bags[0]
for data_bag in data_bags[1:]:
self.assertEqual(
first_bag[1], data_bag[1],
"key {}: bag for: {} doesn't match bag for: {}"
.format(
processed_request_key, first_bag[0], data_bag[0]))
if __name__ == '__main__':
unittest.main()