Adds scaledown check and verification for stoppage of synchronisation. (#863)
This commit is contained in:
@@ -24,6 +24,7 @@ from os import (
|
||||
import requests
|
||||
import tempfile
|
||||
import boto3
|
||||
import botocore.exceptions
|
||||
import urllib3
|
||||
|
||||
import tenacity
|
||||
@@ -682,16 +683,32 @@ class CephRGWTest(test_utils.BaseCharmTest):
|
||||
hostname = unit_hostnames[unit_name]
|
||||
return 'radosgw-admin --id=rgw.{} '.format(hostname)
|
||||
|
||||
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=10, max=300),
|
||||
reraise=True, stop=tenacity.stop_after_attempt(12),
|
||||
retry=tenacity.retry_if_exception_type(AssertionError))
|
||||
def wait_for_sync(self, application, is_primary=False):
|
||||
def purge_bucket(self, application, bucket_name):
|
||||
"""Remove a bucket and all it's objects.
|
||||
|
||||
:param application: RGW application name
|
||||
:type application: str
|
||||
:param bucket_name: Name for RGW bucket to be deleted
|
||||
:type bucket_name: str
|
||||
"""
|
||||
juju_units = zaza_model.get_units(application)
|
||||
unit_hostnames = generic_utils.get_unit_hostnames(juju_units)
|
||||
for unit_name, hostname in unit_hostnames.items():
|
||||
key_name = "rgw.{}".format(hostname)
|
||||
cmd = 'radosgw-admin --id={} bucket rm --bucket={}' \
|
||||
' --purge-objects'.format(key_name, bucket_name)
|
||||
zaza_model.run_on_unit(unit_name, cmd)
|
||||
|
||||
def wait_for_status(self, application,
|
||||
is_primary=False, sync_expected=True):
|
||||
"""Wait for required RGW endpoint to finish sync for data and metadata.
|
||||
|
||||
:param application: RGW application which has to be waited for
|
||||
:type application: str
|
||||
:param is_primary: whether application is primary site endpoint
|
||||
:param is_primary: whether RGW application is primary or secondary
|
||||
:type is_primary: boolean
|
||||
:param sync_expected: whether sync details should be expected in status
|
||||
:type sync_expected: boolean
|
||||
"""
|
||||
juju_units = zaza_model.get_units(application)
|
||||
unit_hostnames = generic_utils.get_unit_hostnames(juju_units)
|
||||
@@ -699,15 +716,28 @@ class CephRGWTest(test_utils.BaseCharmTest):
|
||||
meta_primary = 'metadata sync no sync (zone is master)'
|
||||
meta_secondary = 'metadata is caught up with master'
|
||||
meta_check = meta_primary if is_primary else meta_secondary
|
||||
for unit_name, hostname in unit_hostnames.items():
|
||||
key_name = "rgw.{}".format(hostname)
|
||||
cmd = 'radosgw-admin --id={} sync status'.format(key_name)
|
||||
stdout = zaza_model.run_on_unit(unit_name, cmd).get('Stdout', '')
|
||||
assert data_check in stdout
|
||||
assert meta_check in stdout
|
||||
|
||||
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=1, max=60),
|
||||
reraise=True, stop=tenacity.stop_after_attempt(12))
|
||||
for attempt in tenacity.Retrying(
|
||||
wait=tenacity.wait_exponential(multiplier=10, max=300),
|
||||
reraise=True, stop=tenacity.stop_after_attempt(12),
|
||||
retry=tenacity.retry_if_exception_type(AssertionError)
|
||||
):
|
||||
with attempt:
|
||||
for unit_name, hostname in unit_hostnames.items():
|
||||
key_name = "rgw.{}".format(hostname)
|
||||
cmd = 'radosgw-admin --id={} sync status'.format(key_name)
|
||||
stdout = zaza_model.run_on_unit(
|
||||
unit_name, cmd
|
||||
).get('Stdout', '')
|
||||
if sync_expected:
|
||||
# Both data and meta sync.
|
||||
self.assertIn(data_check, stdout)
|
||||
self.assertIn(meta_check, stdout)
|
||||
else:
|
||||
# ExpectPrimary's Meta Status and no Data sync status
|
||||
self.assertIn(meta_primary, stdout)
|
||||
self.assertNotIn(data_check, stdout)
|
||||
|
||||
def fetch_rgw_object(self, target_client, container_name, object_name):
|
||||
"""Fetch RGW object content.
|
||||
|
||||
@@ -718,9 +748,14 @@ class CephRGWTest(test_utils.BaseCharmTest):
|
||||
:param object_name: Object name for desired object.
|
||||
:type object_name: str
|
||||
"""
|
||||
return target_client.Object(container_name, object_name).get()[
|
||||
'Body'
|
||||
].read().decode('UTF-8')
|
||||
for attempt in tenacity.Retrying(
|
||||
wait=tenacity.wait_exponential(multiplier=1, max=60),
|
||||
reraise=True, stop=tenacity.stop_after_attempt(12)
|
||||
):
|
||||
with attempt:
|
||||
return target_client.Object(
|
||||
container_name, object_name
|
||||
).get()['Body'].read().decode('UTF-8')
|
||||
|
||||
def promote_rgw_to_primary(self, app_name: str):
|
||||
"""Promote provided app to Primary and update period at new secondary.
|
||||
@@ -980,8 +1015,8 @@ class CephRGWTest(test_utils.BaseCharmTest):
|
||||
self.secondary_rgw_app + ":slave"
|
||||
)
|
||||
|
||||
secondary_client.Object(container_name, obj_name).delete()
|
||||
secondary_client.Bucket(container_name).delete()
|
||||
# Make secondary pristine.
|
||||
self.purge_bucket(self.secondary_rgw_app, container_name)
|
||||
|
||||
zaza_model.block_until_unit_wl_status(self.secondary_rgw_unit,
|
||||
'active')
|
||||
@@ -1032,8 +1067,8 @@ class CephRGWTest(test_utils.BaseCharmTest):
|
||||
self.secondary_rgw_unit, "active"
|
||||
)
|
||||
logging.info('Waiting for Data and Metadata to Synchronize')
|
||||
self.wait_for_sync(self.secondary_rgw_app, is_primary=False)
|
||||
self.wait_for_sync(self.primary_rgw_app, is_primary=True)
|
||||
self.wait_for_status(self.secondary_rgw_app, is_primary=False)
|
||||
self.wait_for_status(self.primary_rgw_app, is_primary=True)
|
||||
|
||||
logging.info('Performing post migration IO tests.')
|
||||
# Add another object at primary
|
||||
@@ -1072,11 +1107,11 @@ class CephRGWTest(test_utils.BaseCharmTest):
|
||||
self.promote_rgw_to_primary(self.secondary_rgw_app)
|
||||
|
||||
# Wait for Sites to be syncronised.
|
||||
self.wait_for_sync(self.primary_rgw_app, is_primary=False)
|
||||
self.wait_for_sync(self.secondary_rgw_app, is_primary=True)
|
||||
self.wait_for_status(self.primary_rgw_app, is_primary=False)
|
||||
self.wait_for_status(self.secondary_rgw_app, is_primary=True)
|
||||
|
||||
# IO Test
|
||||
container = 'demo-container-for-failover'
|
||||
container = 'failover-container'
|
||||
test_data = 'Test data from Zaza on Slave'
|
||||
secondary_client.Bucket(container).create()
|
||||
secondary_object = secondary_client.Object(container, 'testfile')
|
||||
@@ -1089,8 +1124,8 @@ class CephRGWTest(test_utils.BaseCharmTest):
|
||||
|
||||
# Recovery scenario, reset ceph-rgw as primary.
|
||||
self.promote_rgw_to_primary(self.primary_rgw_app)
|
||||
self.wait_for_sync(self.primary_rgw_app, is_primary=True)
|
||||
self.wait_for_sync(self.secondary_rgw_app, is_primary=False)
|
||||
self.wait_for_status(self.primary_rgw_app, is_primary=True)
|
||||
self.wait_for_status(self.secondary_rgw_app, is_primary=False)
|
||||
|
||||
# Fetch Syncronised copy of testfile from primary site.
|
||||
primary_content = self.fetch_rgw_object(
|
||||
@@ -1100,6 +1135,50 @@ class CephRGWTest(test_utils.BaseCharmTest):
|
||||
# Verify Data Integrity.
|
||||
self.assertEqual(secondary_content, primary_content)
|
||||
|
||||
# Scaledown and verify replication has stopped.
|
||||
logging.info('Checking multisite scaledown')
|
||||
zaza_model.remove_relation(
|
||||
self.primary_rgw_app,
|
||||
self.primary_rgw_app + ":master",
|
||||
self.secondary_rgw_app + ":slave"
|
||||
)
|
||||
|
||||
# wait for sync stop
|
||||
self.wait_for_status(self.primary_rgw_app, sync_expected=False)
|
||||
self.wait_for_status(self.secondary_rgw_app, sync_expected=False)
|
||||
|
||||
# Refresh client and verify objects are not replicating.
|
||||
primary_client = boto3.resource("s3",
|
||||
verify=False,
|
||||
endpoint_url=primary_endpoint,
|
||||
aws_access_key_id=access_key,
|
||||
aws_secret_access_key=secret_key)
|
||||
secondary_client = boto3.resource("s3",
|
||||
verify=False,
|
||||
endpoint_url=secondary_endpoint,
|
||||
aws_access_key_id=access_key,
|
||||
aws_secret_access_key=secret_key)
|
||||
|
||||
# IO Test
|
||||
container = 'scaledown-container'
|
||||
test_data = 'Scaledown Test data'
|
||||
secondary_client.Bucket(container).create()
|
||||
secondary_object = secondary_client.Object(container, 'scaledown')
|
||||
secondary_object.put(
|
||||
Body=test_data
|
||||
)
|
||||
|
||||
# Since bucket is not replicated.
|
||||
with self.assertRaises(botocore.exceptions.ClientError):
|
||||
primary_content = self.fetch_rgw_object(
|
||||
primary_client, container, 'scaledown'
|
||||
)
|
||||
|
||||
# Cleanup of scaledown resources and synced resources.
|
||||
self.purge_bucket(self.secondary_rgw_app, container)
|
||||
self.purge_bucket(self.secondary_rgw_app, 'zaza-container')
|
||||
self.purge_bucket(self.secondary_rgw_app, 'failover-container')
|
||||
|
||||
|
||||
class CephProxyTest(unittest.TestCase):
|
||||
"""Test ceph via proxy."""
|
||||
|
||||
Reference in New Issue
Block a user