Refactor tests to provide readibility and sanity
This commit is contained in:
@@ -53,13 +53,14 @@ class BaseDesignateTest(test_utils.OpenStackBaseTest):
|
||||
overcloud_auth = openstack_utils.get_overcloud_auth()
|
||||
keystone = openstack_utils.get_keystone_client(overcloud_auth)
|
||||
|
||||
keystone_session = openstack_utils.get_overcloud_keystone_session()
|
||||
if cls.post_xenial_queens:
|
||||
keystone_session = openstack_utils.get_overcloud_keystone_session()
|
||||
cls.designate = openstack_utils.get_designate_session_client(
|
||||
session=keystone_session
|
||||
)
|
||||
cls.zones_list = cls.designate.zones.list
|
||||
cls.zones_delete = cls.designate.zones.delete
|
||||
cls.domain_list = cls.designate.zones.list
|
||||
cls.domain_delete = cls.designate.zones.delete
|
||||
cls.domain_create = cls.designate.zones.create
|
||||
else:
|
||||
# Authenticate admin with designate endpoint
|
||||
designate_ep = keystone.service_catalog.url_for(
|
||||
@@ -71,12 +72,15 @@ class BaseDesignateTest(test_utils.OpenStackBaseTest):
|
||||
cls.designate = openstack_utils.get_designate_session_client(
|
||||
version=1,
|
||||
auth_url=keystone_ep,
|
||||
username="admin",
|
||||
password="openstack",
|
||||
token=keystone_session.get_token(),
|
||||
tenant_name="admin",
|
||||
endpoint=designate_ep)
|
||||
cls.zones_list = cls.designate.domains.list
|
||||
cls.zones_delete = cls.designate.domains.delete
|
||||
cls.domain_list = cls.designate.domains.list
|
||||
cls.domain_delete = cls.designate.domains.delete
|
||||
cls.domain_create = cls.designate.domains.create
|
||||
cls.server_list = cls.designate.servers.list
|
||||
cls.server_create = cls.designate.servers.create
|
||||
cls.server_delete = cls.designate.servers.delete
|
||||
|
||||
|
||||
class DesignateTests(BaseDesignateTest):
|
||||
@@ -122,93 +126,72 @@ class DesignateTests(BaseDesignateTest):
|
||||
pgrep_full=False):
|
||||
logging.info("Testing pause resume")
|
||||
|
||||
def _get_server_id(self, server_name):
|
||||
server_id = None
|
||||
for server in self.designate.servers.list():
|
||||
if server.name == server_name:
|
||||
server_id = server.id
|
||||
break
|
||||
return server_id
|
||||
def _get_server_id(self, server_name=None, server_id=None):
|
||||
for srv in self.server_list():
|
||||
if isinstance(srv, dict):
|
||||
if srv['id'] == server_id or srv['name'] == server_name:
|
||||
return srv['id']
|
||||
elif srv.name == server_name or srv.id == server_id:
|
||||
return srv.id
|
||||
return None
|
||||
|
||||
def _get_test_server_id(self):
|
||||
return self._get_server_id(self.TEST_NS2_RECORD)
|
||||
|
||||
@tenacity.retry(
|
||||
wait=tenacity.wait_exponential(multiplier=1, min=5, max=10),
|
||||
reraise=True
|
||||
)
|
||||
def _wait_on_server_gone(self):
|
||||
logging.debug('Waiting for server to disappear')
|
||||
return not self._get_test_server_id()
|
||||
def _wait_on_server_gone(self, server_id):
|
||||
@tenacity.retry(
|
||||
wait=tenacity.wait_exponential(multiplier=1, min=5, max=10),
|
||||
reraise=True
|
||||
)
|
||||
def wait():
|
||||
logging.debug('Waiting for server %s to disappear', server_id)
|
||||
if self._get_server_id(server_id=server_id):
|
||||
raise Exception("Server Exists")
|
||||
self.server_delete(server_id)
|
||||
return wait()
|
||||
|
||||
def test_400_server_creation(self):
|
||||
"""Simple api calls to create domain."""
|
||||
"""Simple api calls to create a server."""
|
||||
# Designate does not allow the last server to be deleted so ensure
|
||||
# that ns1 is always present
|
||||
if self.post_xenial_queens:
|
||||
logging.info('Skipping server creation tests for Queens and above')
|
||||
return
|
||||
|
||||
if not self._get_server_id(self.TEST_NS1_RECORD):
|
||||
if not self._get_server_id(server_name=self.TEST_NS1_RECORD):
|
||||
server = servers.Server(name=self.TEST_NS1_RECORD)
|
||||
new_server = self.designate.servers.create(server)
|
||||
new_server = self.server_create(server)
|
||||
self.assertIsNotNone(new_server)
|
||||
|
||||
logging.debug('Checking if server exists before trying to create it')
|
||||
old_server_id = self._get_test_server_id()
|
||||
old_server_id = self._get_server_id(server_name=self.TEST_NS2_RECORD)
|
||||
if old_server_id:
|
||||
logging.debug('Deleting old server')
|
||||
self.designate.servers.delete(old_server_id)
|
||||
self._wait_on_server_gone()
|
||||
self._wait_on_server_gone(old_server_id)
|
||||
|
||||
logging.debug('Creating new server')
|
||||
server = servers.Server(name=self.TEST_NS2_RECORD)
|
||||
new_server = self.designate.servers.create(server)
|
||||
self.assertIsNotNone(new_server)
|
||||
new_server = self.server_create(server)
|
||||
self.assertIsNotNone(new_server, "Failed to Create Server")
|
||||
self._wait_on_server_gone(self._get_server_id(self.TEST_NS2_RECORD))
|
||||
|
||||
def _get_domain_id(self, domain_name):
|
||||
domain_id = None
|
||||
for dom in self.zones_list():
|
||||
def _get_domain_id(self, domain_name=None, domain_id=None):
|
||||
for dom in self.domain_list():
|
||||
if isinstance(dom, dict):
|
||||
if dom['name'] == domain_name:
|
||||
domain_id = dom['name']
|
||||
break
|
||||
else:
|
||||
if dom.name == domain_name:
|
||||
domain_id = dom.id
|
||||
break
|
||||
return domain_id
|
||||
if dom['id'] == domain_id or dom['name'] == domain_name:
|
||||
return dom['id']
|
||||
elif dom.id == domain_id or dom.name == domain_name:
|
||||
return dom.id
|
||||
return None
|
||||
|
||||
def _get_test_domain_id(self):
|
||||
return self._get_domain_id(self.TEST_DOMAIN)
|
||||
|
||||
@tenacity.retry(
|
||||
wait=tenacity.wait_exponential(multiplier=1, min=5, max=10),
|
||||
reraise=True
|
||||
)
|
||||
def _wait_on_domain_gone(self):
|
||||
logging.debug('Waiting for domain to disappear')
|
||||
if self._get_test_domain_id():
|
||||
raise Exception("Domain Exists")
|
||||
|
||||
def _get_test_zone_id(self):
|
||||
zone_id = None
|
||||
for zone in self.designate.zones.list():
|
||||
if zone['name'] == self.TEST_DOMAIN:
|
||||
zone_id = zone['id']
|
||||
break
|
||||
return zone_id
|
||||
|
||||
@tenacity.retry(
|
||||
wait=tenacity.wait_exponential(multiplier=1, min=5, max=10),
|
||||
reraise=True
|
||||
)
|
||||
def _wait_on_zone_gone(self):
|
||||
self._wait_on_domain_gone()
|
||||
if self.post_xenial_queens:
|
||||
logging.debug('Waiting for zone to disappear')
|
||||
if self._get_test_zone_id():
|
||||
raise Exception("Zone Exists")
|
||||
def _wait_on_domain_gone(self, domain_id):
|
||||
@tenacity.retry(
|
||||
wait=tenacity.wait_exponential(multiplier=1, min=5, max=10),
|
||||
reraise=True
|
||||
)
|
||||
def wait():
|
||||
logging.debug('Waiting for domain %s to disappear', domain_id)
|
||||
if self._get_domain_id(domain_id=domain_id):
|
||||
raise Exception("Domain Exists")
|
||||
self.domain_delete(domain_id)
|
||||
wait()
|
||||
|
||||
@tenacity.retry(
|
||||
wait=tenacity.wait_exponential(multiplier=1, min=5, max=10),
|
||||
@@ -233,11 +216,10 @@ class DesignateTests(BaseDesignateTest):
|
||||
def test_400_domain_creation(self):
|
||||
"""Simple api calls to create domain."""
|
||||
logging.debug('Checking if domain exists before trying to create it')
|
||||
old_dom_id = self._get_test_domain_id()
|
||||
old_dom_id = self._get_domain_id(domain_name=self.TEST_DOMAIN)
|
||||
if old_dom_id:
|
||||
logging.debug('Deleting old domain')
|
||||
self.zones_delete(old_dom_id)
|
||||
self._wait_on_zone_gone()
|
||||
self._wait_on_domain_gone(old_dom_id)
|
||||
|
||||
logging.debug('Creating new domain')
|
||||
domain = domains.Domain(
|
||||
@@ -245,10 +227,10 @@ class DesignateTests(BaseDesignateTest):
|
||||
email="fred@amuletexample.com")
|
||||
|
||||
if self.post_xenial_queens:
|
||||
new_domain = self.designate.zones.create(
|
||||
new_domain = self.domain_create(
|
||||
name=domain.name, email=domain.email)
|
||||
else:
|
||||
new_domain = self.designate.domains.create(domain)
|
||||
new_domain = self.domain_create(domain)
|
||||
self.assertIsNotNone(new_domain)
|
||||
|
||||
logging.debug('Creating new test record')
|
||||
@@ -258,16 +240,15 @@ class DesignateTests(BaseDesignateTest):
|
||||
data=self.TEST_RECORD[self.TEST_WWW_RECORD])
|
||||
|
||||
if self.post_xenial_queens:
|
||||
_domain_id = new_domain['id']
|
||||
domain_id = new_domain['id']
|
||||
self.designate.recordsets.create(
|
||||
_domain_id, _record.name, _record.type, [_record.data])
|
||||
domain_id, _record.name, _record.type, [_record.data])
|
||||
else:
|
||||
_domain_id = new_domain.id
|
||||
self.designate.records.create(_domain_id, _record)
|
||||
domain_id = new_domain.id
|
||||
self.designate.records.create(domain_id, _record)
|
||||
|
||||
self._wait_to_resolve_test_record()
|
||||
|
||||
logging.debug('Tidy up delete test record')
|
||||
self.zones_delete(_domain_id)
|
||||
self._wait_on_zone_gone()
|
||||
self._wait_on_domain_gone(domain_id)
|
||||
logging.debug('OK')
|
||||
|
||||
Reference in New Issue
Block a user