Port designate-bind expand and shrink test

Port designate-bind expand and shrink test (and helpers) from
mojo to zaza. The mojo designate-bind test is here *1

*1 https://github.com/openstack-charmers/openstack-mojo-specs/blob/master/helper/tests/expand_and_shrink_bind.py
This commit is contained in:
Liam Young
2020-05-29 13:49:03 +00:00
parent b54dedef62
commit 1beef2ddb2
2 changed files with 267 additions and 0 deletions

View File

@@ -14,14 +14,19 @@
"""Encapsulate designate testing."""
import logging
import unittest
import tenacity
import subprocess
import designateclient.v1.domains as domains
import designateclient.v1.records as records
import designateclient.v1.servers as servers
import zaza.model
import zaza.openstack.utilities.juju as zaza_juju
import zaza.openstack.charm_tests.test_utils as test_utils
import zaza.openstack.utilities.openstack as openstack_utils
import zaza.openstack.charm_tests.designate.utils as designate_utils
class BaseDesignateTest(test_utils.OpenStackBaseTest):
@@ -255,3 +260,60 @@ class DesignateTests(DesignateAPITests, DesignateCharmTests):
"""Collection of all Designate test classes."""
pass
class DesignateBindExpand(BaseDesignateTest):
"""Test expanding and shrinking bind."""
TEST_DOMAIN = 'zazabindtesting.com.'
TEST_NS1_RECORD = 'ns1.{}'.format(TEST_DOMAIN)
TEST_NS2_RECORD = 'ns2.{}'.format(TEST_DOMAIN)
TEST_WWW_RECORD = "www.{}".format(TEST_DOMAIN)
TEST_RECORD = {TEST_WWW_RECORD: '10.0.0.24'}
def test_expand_and_contract(self):
"""Test expanding and shrinking bind."""
if not self.post_xenial_queens:
raise unittest.SkipTest("Test not supported before Queens")
domain = designate_utils.create_or_return_zone(
self.designate,
name=self.TEST_DOMAIN,
email="test@zaza.com")
designate_utils.create_or_return_recordset(
self.designate,
domain['id'],
'www',
'A',
[self.TEST_RECORD[self.TEST_WWW_RECORD]])
# Test record is in bind and designate
designate_utils.check_dns_entry(
self.designate,
self.TEST_RECORD[self.TEST_WWW_RECORD],
self.TEST_DOMAIN,
record_name=self.TEST_WWW_RECORD)
logging.debug('Adding a designate-bind unit')
zaza.model.add_unit('designate-bind')
zaza.model.block_until_all_units_idle()
logging.debug('Performing DNS lookup on all units')
designate_utils.check_dns_entry(
self.designate,
self.TEST_RECORD[self.TEST_WWW_RECORD],
self.TEST_DOMAIN,
record_name=self.TEST_WWW_RECORD)
units = zaza.model.get_status().applications['designate-bind']['units']
doomed_unit = sorted(units.keys())[0]
logging.debug('Removing {}'.format(doomed_unit))
zaza.model.destroy_unit('designate-bind', doomed_unit)
logging.debug('Performing DNS lookup on all units')
designate_utils.check_dns_entry(
self.designate,
self.TEST_RECORD[self.TEST_WWW_RECORD],
self.TEST_DOMAIN,
record_name=self.TEST_WWW_RECORD)

View File

@@ -0,0 +1,205 @@
"""Utilities for interacting with designate."""
import dns.resolver
import logging
import tenacity
import designateclient.exceptions
import zaza.model
def create_or_return_zone(client, name, email):
"""Create zone or return matching existing zone.
:param designate_client: Client to query designate
:type designate_client: designateclient.v2.Client
:param name: Name of zone
:type name: str
:param email: Email address to associate with zone.
:type email: str
:returns: Zone
:rtype: designateclient.v2.zones.Zone
"""
try:
zone = client.zones.create(
name=name,
email=email)
except designateclient.exceptions.Conflict:
logging.info('{} zone already exists.'.format(name))
zones = [z for z in client.zones.list() if z['name'] == name]
assert len(zones) == 1, "Wrong number of zones found {}".format(zones)
zone = zones[0]
return zone
def create_or_return_recordset(client, zone_id, sub_domain, record_type, data):
"""Create recordset or return matching existing recordset.
:param designate_client: Client to query designate
:type designate_client: designateclient.v2.Client
:param zone_id: uuid of zone
:type zone_id: str
:param sub_domain: Subdomain to associate records with
:type sub_domain: str
:param data: Dictionary of entries eg {'www.test.com': '10.0.0.24'}
:type data: dict
:returns: RecordSet
:rtype: designateclient.v2.recordsets.RecordSet
"""
try:
rs = client.recordsets.create(
zone_id,
sub_domain,
record_type,
data)
except designateclient.exceptions.Conflict:
logging.info('{} record already exists.'.format(data))
for r in client.recordsets.list(zone_id):
if r['name'].split('.')[0] == sub_domain:
rs = r
return rs
def get_designate_zone_objects(designate_client, domain_name=None,
domain_id=None):
"""Get all domains matching a given domain_name or domain_id.
:param designate_client: Client to query designate
:type designate_client: designateclient.v2.Client
:param domain_name: Name of domain to lookup
:type domain_name: str
:param domain_id: UUID of domain to lookup
:type domain_id: str
:returns: List of Domain objects matching domain_name or domain_id
:rtype: [designateclient.v2.domains.Domain,]
"""
all_zones = designate_client.zones.list()
a = [z for z in all_zones
if z['name'] == domain_name or z['id'] == domain_id]
return a
def get_designate_domain_object(designate_client, domain_name):
"""Get the one and only domain matching the given domain_name.
:param designate_client: Client to query designate
:type designate_client: designateclient.v2.Client
:param domain_name: Name of domain to lookup
:type domain_name:str
:returns: Domain with name domain_name
:rtype: designateclient.v2.domains.Domain
:raises: AssertionError
"""
dns_zone_id = get_designate_zone_objects(designate_client,
domain_name=domain_name)
msg = "Found {} domains for {}".format(
len(dns_zone_id),
domain_name)
assert len(dns_zone_id) == 1, msg
return dns_zone_id[0]
def get_designate_dns_records(designate_client, domain_name, ip):
"""Look for records in designate that match the given ip.
:param designate_client: Client to query designate
:type designate_client: designateclient.v2.Client
:param domain_name: Name of domain to lookup
:type domain_name:str
:returns: List of Record objects matching matching IP address
:rtype: [designateclient.v2.records.Record,]
"""
dns_zone = get_designate_domain_object(designate_client, domain_name)
return [r for r in designate_client.recordsets.list(dns_zone['id'])
if r['records'] == ip]
def check_dns_record_exists(dns_server_ip, query_name, expected_ip,
retry_count=3):
"""Lookup a DNS record against the given dns server address.
:param dns_server_ip: IP address to run query against
:type dns_server_ip: str
:param query_name: Record to lookup
:type query_name: str
:param expected_ip: IP address expected to be associated with record.
:type expected_ip: str
:param retry_count: Number of times to retry query. Useful if waiting
for record to propagate.
:type retry_count: int
:raises: AssertionError
"""
my_resolver = dns.resolver.Resolver()
my_resolver.nameservers = [dns_server_ip]
for attempt in tenacity.Retrying(
stop=tenacity.stop_after_attempt(retry_count),
wait=tenacity.wait_exponential(multiplier=1, min=2, max=10),
reraise=True):
with attempt:
logging.info("Checking record {} against {}".format(
query_name,
dns_server_ip))
answers = my_resolver.query(query_name)
for rdata in answers:
logging.info("Checking address returned by {} is correct".format(
dns_server_ip))
assert str(rdata) == expected_ip
def check_dns_entry(des_client, ip, domain, record_name):
"""Check that record for ip is in designate and in bind.
:param ip: IP address to lookup
:type ip: str
:param domain_name: Domain to look for record in
:type domain_name:str
:param record_name: record name
:type record_name: str
"""
check_dns_entry_in_designate(des_client, [ip], domain,
record_name=record_name)
check_dns_entry_in_bind(ip, record_name)
def check_dns_entry_in_designate(des_client, ip, domain, record_name=None):
"""Look for records in designate that match the given ip domain.
:param designate_client: Client to query designate
:type designate_client: designateclient.v2.Client
:param ip: IP address to lookup in designate
:type ip: str
:param domain_name: Name of domain to lookup
:type domain_name:str
:param record_name: Retrieved record should have this name
:type record_name: str
:raises: AssertionError
"""
records = get_designate_dns_records(des_client, domain, ip)
assert records, "Record not found for {} in designate".format(ip)
logging.info('Found record in {} for {} in designate'.format(domain, ip))
if record_name:
recs = [r for r in records if r['name'] == record_name]
assert recs, "No DNS entry name matches expected name {}".format(
record_name)
logging.info('Found record in {} for {} in designate'.format(
domain,
record_name))
def check_dns_entry_in_bind(ip, record_name, model_name=None):
"""Check that record for ip address is in bind.
:param ip: IP address to lookup
:type ip: str
:param record_name: record name
:type record_name: str
"""
for addr in zaza.model.get_app_ips('designate-bind',
model_name=model_name):
logging.info("Checking {} is {} against ({})".format(
record_name,
ip,
addr))
check_dns_record_exists(addr, record_name, ip, retry_count=6)