Code tidy and docstrings

This commit is contained in:
Liam Young
2021-01-24 17:24:20 +00:00
parent d637646a9e
commit 401829f0a7
2 changed files with 53 additions and 102 deletions
@@ -235,6 +235,3 @@ def validate_ca(cacertificate, application="keystone", port=5000):
fp.write(cacertificate.decode())
fp.flush()
requests.get('https://{}:{}'.format(ip, str(port)), verify=fp.name)
def get_cert():
print(zaza.openstack.utilities.openstack.get_remote_ca_cert_file('masakari'))
+53 -99
View File
@@ -184,7 +184,7 @@ WORKLOAD_STATUS_EXCEPTIONS = {
# For vault TLS certificates
CACERT_FILENAME_FORMAT = "{}_juju_ca_cert.crt"
CERT_PROVIDORS = ['vault']
CERT_PROVIDERS = ['vault']
LOCAL_CERT_DIR = "tests"
REMOTE_CERT_DIR = "/usr/local/share/ca-certificates"
KEYSTONE_CACERT = "keystone_juju_ca_cert.crt"
@@ -192,33 +192,29 @@ KEYSTONE_REMOTE_CACERT = (
"/usr/local/share/ca-certificates/{}".format(KEYSTONE_CACERT))
KEYSTONE_LOCAL_CACERT = ("{}/{}".format(LOCAL_CERT_DIR, KEYSTONE_CACERT))
VAULT_CACERT = "vault_juju_ca_cert.crt"
VAULT_REMOTE_CACERT = (
"/usr/local/share/ca-certificates/{}".format(VAULT_CACERT))
VAULT_LOCAL_CACERT = ("{}/{}".format(LOCAL_CERT_DIR, VAULT_CACERT))
REMOTE_CERTIFICATES = [VAULT_REMOTE_CACERT, KEYSTONE_REMOTE_CACERT]
LOCAL_CERTIFICATES = [VAULT_LOCAL_CACERT, KEYSTONE_LOCAL_CACERT]
async def async_block_until_ca_exists(application_name, ca_cert,
model_name=None, timeout=2700):
"""Block until a CA cert is on all units of application_name.
async def async_block_until_ca_exists(application_name, ca_cert, model_name=None, timeout=2700):
:param application_name: Name of application to check
:type application_name: str
:param ca_cert: The certificate to look for.
:type ca_cert: str
:param model_name: Name of model to query.
:type model_name: str
:param timeout: How long in seconds to wait
:type timeout: int
"""
async def _check_ca_present(model, ca_files):
units = model.applications[application_name].units
print(ca_files)
for ca_file in ca_files:
for unit in units:
print(unit)
print(ca_file)
try:
output = await unit.run('cat {}'.format(ca_file))
contents = output.data.get('results').get('Stdout', '')
if not ca_cert in contents:
print("It's not here!")
print(ca_cert)
print(contents)
if ca_cert not in contents:
break
if ca_cert in contents:
print("It's here!")
# libjuju throws a generic error for connection failure. So we
# cannot differentiate between a connectivity issue and a
# target file not existing error. For now just assume the
@@ -229,8 +225,9 @@ async def async_block_until_ca_exists(application_name, ca_cert, model_name=None
return True
else:
return False
ca_files = await _async_get_remote_ca_cert_file_candidates(application_name, model_name=model_name)
print(ca_files)
ca_files = await _async_get_remote_ca_cert_file_candidates(
application_name,
model_name=model_name)
async with zaza.model.run_in_model(model_name) as model:
await zaza.model.async_block_until(
lambda: _check_ca_present(model, ca_files), timeout=timeout)
@@ -238,65 +235,19 @@ async def async_block_until_ca_exists(application_name, ca_cert, model_name=None
block_until_ca_exists = zaza.model.sync_wrapper(async_block_until_ca_exists)
#async def async_get_cert_file_name(app, cert_files=None, block=True,
# model_name=None, timeout=2700):
# """Get the name of the CA cert file thats on all units of an application.
#
# :param app: Name of application
# :type capp: str
# :param cert_files: List of cert files to search for.
# :type cert_files: List[str]
# :param block: Whether to block until a consistent cert file is found.
# :type block: bool
# :param model_name: Name of model to run check in
# :type model_name: str
# :param timeout: Time to wait for consistent file
# :type timeout: int
# :returns: Credentials dictionary
# :rtype: dict
# """
# async def _check_for_file(model, cert_files):
# units = model.applications[app].units
# results = {u.entity_id: [] for u in units}
# for unit in units:
# try:
# for cf in cert_files:
# output = await unit.run('test -e "{}"; echo $?'.format(cf))
# contents = output.data.get('results')['Stdout']
# if "0" in contents:
# results[unit.entity_id].append(cf)
# except JujuError:
# pass
# for cert_file in cert_files:
# # Check that the certificate file exists on all the units.
# if all(cert_file in files for files in results.values()):
# return cert_file
# else:
# return None
#
# if not cert_files:
# cert_files = REMOTE_CERTIFICATES
# cert_file = None
# async with zaza.model.run_in_model(model_name) as model:
# if block:
# await zaza.model.async_block_until(
# lambda: _check_for_file(model, cert_files), timeout=timeout)
# cert_file = await _check_for_file(model, cert_files)
# return cert_file
#
#get_cert_file_name = zaza.model.sync_wrapper(async_get_cert_file_name)
def get_cacert():
"""Return path to CA Certificate bundle for verification during test.
:returns: Path to CA Certificate bundle or None.
:rtype: Optional[str]
:rtype: Union[str, None]
"""
for _cert in LOCAL_CERTIFICATES:
for _provider in CERT_PROVIDERS:
_cert = LOCAL_CERT_DIR + '/' + CACERT_FILENAME_FORMAT.format(
_provider)
if os.path.exists(_cert):
return _cert
if os.path.exists(KEYSTONE_LOCAL_CACERT):
return KEYSTONE_LOCAL_CACERT
# OpenStack Client helpers
@@ -2058,65 +2009,68 @@ def get_overcloud_auth(address=None, model_name=None):
return auth_settings
async def _async_get_remote_ca_cert_file_candidates(application, model_name=None):
async def _async_get_remote_ca_cert_file_candidates(application,
model_name=None):
cert_files = []
# unit = model.get_first_unit_name(application, model_name=model_name)
units = await model.async_get_units(application, model_name=model_name)
unit = units[0].name
for _providor in CERT_PROVIDORS:
for _provider in CERT_PROVIDERS:
tls_rid = await model.async_get_relation_id(
application,
_providor,
_provider,
model_name=model_name,
remote_interface_name='certificates')
if tls_rid:
cert_files.append(REMOTE_CERT_DIR + '/' + CACERT_FILENAME_FORMAT.format(_providor))
cert_files.append(REMOTE_CERT_DIR + '/' + KEYSTONE_CACERT)
cert_files.append(
REMOTE_CERT_DIR + '/' + CACERT_FILENAME_FORMAT.format(
_provider))
cert_files.append(KEYSTONE_LOCAL_CACERT)
return cert_files
_get_remote_ca_cert_file_candidates = zaza.model.sync_wrapper(_async_get_remote_ca_cert_file_candidates)
_get_remote_ca_cert_file_candidates = zaza.model.sync_wrapper(
_async_get_remote_ca_cert_file_candidates)
def get_remote_ca_cert_file(application, model_name=None):
# CACERT_FILENAME = "{}_juju_ca_cert.crt"
# cert_files = []
# unit = model.get_first_unit_name(application, model_name=model_name)
# for _providor in CERT_PROVIDORS:
# tls_rid = model.get_relation_id(
# application,
# _providor,
# model_name=model_name,
# remote_interface_name='certificates')
# if tls_rid:
# cert_files.append(CACERT_FILENAME.format(_providor))
# cert_files.append(KEYSTONE_CACERT)
"""Collect CA certificate from application.
:param application: Name of application to collect file from.
:type application: str
:param model_name: Name of model to query.
:type model_name: str
:returns: Path to cafile
:rtype: str
"""
unit = model.get_first_unit_name(application, model_name=model_name)
local_cert_file = None
cert_files = _get_remote_ca_cert_file_candidates(application, model_name=model_name)
cert_files = _get_remote_ca_cert_file_candidates(
application,
model_name=model_name)
for cert_file in cert_files:
_local_cert_file = "{}/{}".format(
LOCAL_CERT_DIR,
os.path.basename(cert_file))
with tempfile.NamedTemporaryFile(mode="w", delete=False) as _tmp_ca_file:
with tempfile.NamedTemporaryFile(mode="w", delete=False) as _tmp_ca:
try:
model.scp_from_unit(
unit,
cert_file,
_tmp_ca_file.name)
_tmp_ca.name)
except JujuError:
continue
# ensure that the path to put the local cacert in actually exists. The
# assumption that 'tests/' exists for, say, mojo is false.
# ensure that the path to put the local cacert in actually exists.
# The assumption that 'tests/' exists for, say, mojo is false.
# Needed due to:
# commit: 537473ad3addeaa3d1e4e2d0fd556aeaa4018eb2
_dir = os.path.dirname(_local_cert_file)
if not os.path.exists(_dir):
os.makedirs(_dir)
shutil.move(_tmp_ca_file.name, _local_cert_file)
shutil.move(_tmp_ca.name, _local_cert_file)
os.chmod(_local_cert_file, 0o644)
local_cert_file = _local_cert_file
break
return local_cert_file
def get_urllib_opener():
"""Create a urllib opener taking into account proxy settings.