Merge pull request #1259 from sabaini/cos-integration-controller
COS integration testing: improve COS model lookup
This commit is contained in:
@@ -14,9 +14,9 @@
|
||||
|
||||
"""Integration tests for ceph-mon."""
|
||||
|
||||
import unittest
|
||||
|
||||
import zaza.model
|
||||
from juju import juju
|
||||
from zaza import sync_wrapper
|
||||
from zaza.openstack.charm_tests import test_utils as test_utils
|
||||
|
||||
from zaza.openstack.charm_tests.ceph.mon.tests import (
|
||||
@@ -28,21 +28,52 @@ from zaza.openstack.charm_tests.ceph.mon.tests import (
|
||||
)
|
||||
|
||||
|
||||
async def async_find_cos_model():
|
||||
"""Find a COS model.
|
||||
|
||||
Look for models first on the current controller, then on a k8s controller.
|
||||
Return the first model with a name starting with "cos".
|
||||
"""
|
||||
models = await zaza.controller.async_list_models()
|
||||
cos_models = [m for m in models if m.startswith("cos")]
|
||||
if cos_models:
|
||||
return cos_models[0]
|
||||
# Look for a k8s controller
|
||||
j = juju.Juju()
|
||||
k8s = [
|
||||
k for k, v in j.get_controllers().items() if v["type"] == "kubernetes"
|
||||
]
|
||||
if not k8s:
|
||||
return
|
||||
ctrl = juju.Controller()
|
||||
await ctrl.connect(k8s[0])
|
||||
k8s_models = await ctrl.list_models()
|
||||
cos_models = [m for m in k8s_models if m.startswith("cos")]
|
||||
await ctrl.disconnect()
|
||||
if not cos_models:
|
||||
return
|
||||
return f"{k8s[0]}:{cos_models[0]}"
|
||||
|
||||
|
||||
find_cos_model = sync_wrapper(async_find_cos_model)
|
||||
|
||||
|
||||
class COSModelNotFound(Exception):
|
||||
"""Exception raised when no COS model is found."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class COSIntegrationTest(test_utils.BaseCharmTest):
|
||||
"""Test COS integration with ceph-mon."""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
"""Run class setup for running cos integration testing."""
|
||||
# skip if there are no COS models
|
||||
cos_models = [
|
||||
m for m in zaza.controller.list_models() if m.startswith("cos")
|
||||
]
|
||||
if not cos_models:
|
||||
raise unittest.SkipTest("No COS models found")
|
||||
|
||||
cls.cos_model = cos_models[0]
|
||||
|
||||
# look for a cos model on the current controller
|
||||
cls.cos_model = find_cos_model()
|
||||
if not cls.cos_model:
|
||||
raise COSModelNotFound("No COS model found, cannot run tests")
|
||||
cls.grafana_details = zaza.model.run_action_on_leader(
|
||||
"grafana", "get-admin-password", model_name=cls.cos_model
|
||||
).results
|
||||
|
||||
Reference in New Issue
Block a user