Add Testcases for Ceph-RGW multisite migration (#840)

The updated testcase handles verifying the expected states
that are safe to migrate, as well as verifying that pre- and post-
migration data are correctly synced.
This commit is contained in:
Utkarsh Bhatt
2022-08-08 17:33:35 +05:30
committed by GitHub
parent 4697ce49b1
commit efdb1ae4e3
2 changed files with 393 additions and 130 deletions

View File

@@ -5,7 +5,7 @@ lxml<4.6.3
pyparsing<3.0.0 # pin for aodhclient which is held for py35
aiounittest
async_generator
boto3
boto3<1.25
juju!=2.8.3 # blacklist 2.8.3 as it appears to have a connection bug
juju_wait
PyYAML<=4.2,>=3.0

View File

@@ -23,11 +23,11 @@ from os import (
)
import requests
import tempfile
import boto3
import urllib3
import tenacity
from swiftclient.exceptions import ClientException
import zaza.openstack.charm_tests.test_utils as test_utils
import zaza.model as zaza_model
import zaza.openstack.utilities.ceph as zaza_ceph
@@ -37,6 +37,11 @@ import zaza.utilities.juju as juju_utils
import zaza.openstack.utilities.openstack as zaza_openstack
import zaza.openstack.utilities.generic as generic_utils
# Disable warnings for ssl_verify=false
urllib3.disable_warnings(
urllib3.exceptions.InsecureRequestWarning
)
class CephLowLevelTest(test_utils.OpenStackBaseTest):
"""Ceph Low Level Test Class."""
@@ -665,8 +670,20 @@ class CephTest(test_utils.OpenStackBaseTest):
"entity client.sandbox does not exist\n")
class CephRGWTest(test_utils.OpenStackBaseTest):
"""Ceph RADOS Gateway Daemons Test Class."""
class CephRGWTest(test_utils.BaseCharmTest):
"""Ceph RADOS Gateway Daemons Test Class.
This Testset is not idempotent, because we don't support scale down from
multisite to singlesite (yet). Tests can be performed independently.
However, If test_004 has completed migration, retriggering the test-set
would cause a time-out in test_003.
"""
# String Resources
primary_rgw_app = 'ceph-radosgw'
primary_rgw_unit = 'ceph-radosgw/0'
secondary_rgw_app = 'secondary-ceph-radosgw'
secondary_rgw_unit = 'secondary-ceph-radosgw/0'
@classmethod
def setUpClass(cls):
@@ -677,11 +694,11 @@ class CephRGWTest(test_utils.OpenStackBaseTest):
def expected_apps(self):
"""Determine application names for ceph-radosgw apps."""
_apps = [
'ceph-radosgw'
self.primary_rgw_app
]
try:
zaza_model.get_application('slave-ceph-radosgw')
_apps.append('slave-ceph-radosgw')
zaza_model.get_application(self.secondary_rgw_app)
_apps.append(self.secondary_rgw_app)
except KeyError:
pass
return _apps
@@ -690,28 +707,193 @@ class CephRGWTest(test_utils.OpenStackBaseTest):
def multisite(self):
"""Determine whether deployment is multi-site."""
try:
zaza_model.get_application('slave-ceph-radosgw')
zaza_model.get_application(self.secondary_rgw_app)
return True
except KeyError:
return False
def get_rgwadmin_cmd_skeleton(self, unit_name):
"""
Get radosgw-admin cmd skeleton with rgw.hostname populated key.
:param unit_name: Unit on which the complete command would be run.
:type unit_name: str
:returns: hostname filled basic command skeleton
:rtype: str
"""
app_name = unit_name.split('/')[0]
juju_units = zaza_model.get_units(app_name)
unit_hostnames = generic_utils.get_unit_hostnames(juju_units)
hostname = unit_hostnames[unit_name]
return 'radosgw-admin --id=rgw.{} '.format(hostname)
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=10, max=300),
reraise=True, stop=tenacity.stop_after_attempt(10),
reraise=True, stop=tenacity.stop_after_attempt(12),
retry=tenacity.retry_if_exception_type(AssertionError))
def wait_for_sync(self, application):
"""Wait for slave to secondary to show it is in sync."""
def wait_for_sync(self, application, is_primary=False):
"""Wait for required RGW endpoint to finish sync for data and metadata.
:param application: RGW application which has to be waited for
:type application: str
:param is_primary: whether application is primary site endpoint
:type is_primary: boolean
"""
juju_units = zaza_model.get_units(application)
unit_hostnames = generic_utils.get_unit_hostnames(juju_units)
sync_states = []
sync_check_str = 'data is caught up with source'
data_check = 'data is caught up with source'
meta_primary = 'metadata sync no sync (zone is master)'
meta_secondary = 'metadata is caught up with master'
meta_check = meta_primary if is_primary else meta_secondary
for unit_name, hostname in unit_hostnames.items():
key_name = "rgw.{}".format(hostname)
cmd = 'radosgw-admin --id={} zone sync status'.format(key_name)
cmd = 'radosgw-admin --id={} sync status'.format(key_name)
stdout = zaza_model.run_on_unit(unit_name, cmd).get('Stdout', '')
sync_states.append(sync_check_str in stdout)
assert all(sync_states)
assert data_check in stdout
assert meta_check in stdout
def test_processes(self):
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=1, max=60),
reraise=True, stop=tenacity.stop_after_attempt(12))
def fetch_rgw_object(self, target_client, container_name, object_name):
"""Fetch RGW object content.
:param target_client: boto3 client object configured for an endpoint.
:type target_client: str
:param container_name: RGW bucket name for desired object.
:type container_name: str
:param object_name: Object name for desired object.
:type object_name: str
"""
return target_client.Object(container_name, object_name).get()[
'Body'
].read().decode('UTF-8')
def promote_rgw_to_primary(self, app_name: str):
"""Promote provided app to Primary and update period at new secondary.
:param app_name: Secondary site rgw Application to be promoted.
:type app_name: str
"""
if app_name is self.primary_rgw_app:
new_secondary = self.secondary_rgw_unit
else:
new_secondary = self.primary_rgw_unit
# Promote to Primary
zaza_model.run_action_on_leader(
app_name,
'promote',
action_params={},
)
# Period Update Commit new secondary.
cmd = self.get_rgwadmin_cmd_skeleton(new_secondary)
zaza_model.run_on_unit(
new_secondary, cmd + 'period update --commit'
)
def get_client_keys(self, rgw_app_name=None):
"""Create access_key and secret_key for boto3 client.
:param rgw_app_name: RGW application for which keys are required.
:type rgw_app_name: str
"""
unit_name = self.primary_rgw_unit
if rgw_app_name is not None:
unit_name = rgw_app_name + '/0'
user_name = 'botoclient'
cmd = self.get_rgwadmin_cmd_skeleton(unit_name)
users = json.loads(zaza_model.run_on_unit(
unit_name, cmd + 'user list'
).get('Stdout', ''))
# Fetch boto3 user keys if user exists.
if user_name in users:
output = json.loads(zaza_model.run_on_unit(
unit_name, cmd + 'user info --uid={}'.format(user_name)
).get('Stdout', ''))
keys = output['keys'][0]
return keys['access_key'], keys['secret_key']
# Create boto3 user if it does not exist.
create_cmd = cmd + 'user create --uid={} --display-name={}'.format(
user_name, user_name
)
output = json.loads(
zaza_model.run_on_unit(unit_name, create_cmd).get('Stdout', '')
)
keys = output['keys'][0]
return keys['access_key'], keys['secret_key']
@tenacity.retry(
retry=tenacity.retry_if_result(lambda ret: ret is None),
wait=tenacity.wait_fixed(10),
stop=tenacity.stop_after_attempt(5)
)
def get_rgw_endpoint(self, unit_name: str):
"""Fetch Application endpoint for RGW unit.
:param unit_name: Unit name for which RGW endpoint is required.
:type unit_name: str
"""
unit = zaza_model.get_unit_from_name(unit_name)
unit_address = zaza_model.get_unit_public_address(
unit,
self.model_name
)
logging.debug("Unit: {}, Endpoint: {}".format(unit_name, unit_address))
if unit_address is None:
return None
# Evaluate port
try:
zaza_model.get_application("vault")
return "https://{}:443".format(unit_address)
except KeyError:
return "http://{}:80".format(unit_address)
def configure_rgw_apps_for_multisite(self):
"""Configure Multisite values on primary and secondary apps."""
realm = 'zaza_realm'
zonegroup = 'zaza_zg'
zaza_model.set_application_config(
self.primary_rgw_app,
{
'realm': realm,
'zonegroup': zonegroup,
'zone': 'zaza_primary'
}
)
zaza_model.set_application_config(
self.secondary_rgw_app,
{
'realm': realm,
'zonegroup': zonegroup,
'zone': 'zaza_secondary'
}
)
def clean_rgw_multisite_config(self, app_name):
"""Clear Multisite Juju config values to default.
:param app_name: App for which config values are to be cleared
:type app_name: str
"""
unit_name = app_name + "/0"
zaza_model.set_application_config(
app_name,
{
'realm': "",
'zonegroup': "",
'zone': "default"
}
)
# Commit changes to period.
cmd = self.get_rgwadmin_cmd_skeleton(unit_name)
zaza_model.run_on_unit(
unit_name, cmd + 'period update --commit --rgw-zone=default '
'--rgw-zonegroup=default'
)
def test_001_processes(self):
"""Verify Ceph processes.
Verify that the expected service processes are running
@@ -734,7 +916,7 @@ class CephRGWTest(test_utils.OpenStackBaseTest):
actual_pids)
self.assertTrue(ret)
def test_services(self):
def test_002_services(self):
"""Verify the ceph services.
Verify the expected services are running on the service units.
@@ -749,138 +931,219 @@ class CephRGWTest(test_utils.OpenStackBaseTest):
target_status='running'
)
# When testing with TLS there is a chance the deployment will appear done
# and idle prior to ceph-radosgw and Keystone have updated the service
# catalog. Retry the test in this circumstance.
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=10, max=300),
reraise=True, stop=tenacity.stop_after_attempt(10),
retry=tenacity.retry_if_exception_type(IOError))
def test_object_storage(self):
"""Verify object storage API.
def test_003_object_storage_and_secondary_block(self):
"""Verify Object Storage API and Secondary Migration block."""
container_name = 'zaza-container'
obj_data = 'Test data from Zaza'
obj_name = 'prefile'
Verify that the object storage API works as expected.
"""
if self.multisite:
raise unittest.SkipTest('Skipping REST API test, '
'multisite configuration')
logging.info('Checking Swift REST API')
keystone_session = zaza_openstack.get_overcloud_keystone_session()
region_name = zaza_model.get_application_config(
self.application_name,
model_name=self.model_name)['region']['value']
swift_client = zaza_openstack.get_swift_session_client(
keystone_session,
region_name,
cacert=self.cacert,
logging.info('Checking Object Storage API for Primary Cluster')
# 1. Fetch Primary Endpoint Details
primary_endpoint = self.get_rgw_endpoint(self.primary_rgw_unit)
self.assertNotEqual(primary_endpoint, None)
# 2. Create RGW Client and perform IO
access_key, secret_key = self.get_client_keys()
primary_client = boto3.resource("s3",
verify=False,
endpoint_url=primary_endpoint,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
primary_client.Bucket(container_name).create()
primary_object_one = primary_client.Object(
container_name,
obj_name
)
_container = 'demo-container'
_test_data = 'Test data from Zaza'
swift_client.put_container(_container)
swift_client.put_object(_container,
'testfile',
contents=_test_data,
content_type='text/plain')
_, content = swift_client.get_object(_container, 'testfile')
self.assertEqual(content.decode('UTF-8'), _test_data)
primary_object_one.put(Body=obj_data)
def test_object_storage_multisite(self):
"""Verify object storage replication.
# 3. Fetch Object and Perform Data Integrity check.
content = primary_object_one.get()['Body'].read().decode('UTF-8')
self.assertEqual(content, obj_data)
Verify that the object storage replication works as expected.
"""
# Skip multisite tests if not compatible with bundle.
if not self.multisite:
raise unittest.SkipTest('Skipping multisite replication test')
logging.info('Skipping Secondary Object gatewaty verification')
return
logging.info('Checking multisite replication')
keystone_session = zaza_openstack.get_overcloud_keystone_session()
source_client = zaza_openstack.get_swift_session_client(
keystone_session,
region_name='east-1',
cacert=self.cacert,
logging.info('Checking Object Storage API for Secondary Cluster')
# 1. Fetch Secondary Endpoint Details
secondary_endpoint = self.get_rgw_endpoint(self.secondary_rgw_unit)
self.assertNotEqual(secondary_endpoint, None)
# 2. Create RGW Client and perform IO
access_key, secret_key = self.get_client_keys(self.secondary_rgw_app)
secondary_client = boto3.resource("s3",
verify=False,
endpoint_url=secondary_endpoint,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
secondary_client.Bucket(container_name).create()
secondary_object = secondary_client.Object(
container_name,
obj_name
)
_container = 'demo-container'
_test_data = 'Test data from Zaza'
source_client.put_container(_container)
source_client.put_object(_container,
'testfile',
contents=_test_data,
content_type='text/plain')
_, source_content = source_client.get_object(_container, 'testfile')
self.assertEqual(source_content.decode('UTF-8'), _test_data)
secondary_object.put(Body=obj_data)
target_client = zaza_openstack.get_swift_session_client(
keystone_session,
region_name='east-1',
cacert=self.cacert,
# 3. Fetch Object and Perform Data Integrity check.
content = secondary_object.get()['Body'].read().decode('UTF-8')
self.assertEqual(content, obj_data)
logging.info('Checking Secondary Migration Block')
# 1. Migrate to multisite
if zaza_model.get_relation_id(
self.primary_rgw_app, self.secondary_rgw_app,
remote_interface_name='slave'
) is not None:
logging.info('Skipping Test, Multisite relation already present.')
return
logging.info('Configuring Multisite')
self.configure_rgw_apps_for_multisite()
zaza_model.add_relation(
self.primary_rgw_app,
self.primary_rgw_app + ":master",
self.secondary_rgw_app + ":slave"
)
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=1, max=60),
reraise=True, stop=tenacity.stop_after_attempt(12))
def _target_get_object():
return target_client.get_object(_container, 'testfile')
_, target_content = _target_get_object()
# 2. Verify secondary fails migration due to existing Bucket.
assert_state = {
self.secondary_rgw_app: {
"workload-status": "blocked",
"workload-status-message-prefix":
"Non-Pristine RGW site can't be used as secondary"
}
}
zaza_model.wait_for_application_states(states=assert_state,
timeout=900)
self.assertEqual(target_content.decode('UTF-8'),
source_content.decode('UTF-8'))
target_client.delete_object(_container, 'testfile')
# 3. Perform Secondary Cleanup
logging.info('Perform cleanup at secondary')
self.clean_rgw_multisite_config(self.secondary_rgw_app)
zaza_model.remove_relation(
self.primary_rgw_app,
self.primary_rgw_app + ":master",
self.secondary_rgw_app + ":slave"
)
try:
source_client.head_object(_container, 'testfile')
except ClientException as e:
self.assertEqual(e.http_status, 404)
else:
self.fail('object not deleted on source radosgw')
secondary_client.Object(container_name, obj_name).delete()
secondary_client.Bucket(container_name).delete()
def test_multisite_failover(self):
"""Verify object storage failover/failback.
zaza_model.block_until_unit_wl_status(self.secondary_rgw_unit,
'active')
Verify that the slave radosgw can be promoted to master status
"""
def test_004_migration_and_multisite_failover(self):
"""Perform multisite migration and verify failover."""
container_name = 'zaza-container'
obj_data = 'Test data from Zaza'
# Skip multisite tests if not compatible with bundle.
if not self.multisite:
raise unittest.SkipTest('Skipping multisite failover test')
raise unittest.SkipTest('Skipping Migration Test')
logging.info('Perform Pre-Migration IO')
# 1. Fetch Endpoint Details
primary_endpoint = self.get_rgw_endpoint(self.primary_rgw_unit)
self.assertNotEqual(primary_endpoint, None)
# 2. Create primary client and add pre-migration object.
access_key, secret_key = self.get_client_keys()
primary_client = boto3.resource("s3",
verify=False,
endpoint_url=primary_endpoint,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
primary_client.Bucket(container_name).create()
primary_client.Object(
container_name,
'prefile'
).put(Body=obj_data)
# If Master/Slave relation does not exist, add it.
if zaza_model.get_relation_id(
self.primary_rgw_app, self.secondary_rgw_app,
remote_interface_name='slave'
) is None:
logging.info('Configuring Multisite')
self.configure_rgw_apps_for_multisite()
zaza_model.add_relation(
self.primary_rgw_app,
self.primary_rgw_app + ":master",
self.secondary_rgw_app + ":slave"
)
zaza_model.block_until_unit_wl_status(
self.secondary_rgw_unit, "waiting"
)
zaza_model.block_until_unit_wl_status(
self.secondary_rgw_unit, "active"
)
logging.info('Waiting for Data and Metadata to Synchronize')
self.wait_for_sync(self.secondary_rgw_app, is_primary=False)
self.wait_for_sync(self.primary_rgw_app, is_primary=True)
logging.info('Performing post migration IO tests.')
# Add another object at primary
primary_client.Object(
container_name,
'postfile'
).put(Body=obj_data)
# 1. Fetch Endpoint Details
secondary_endpoint = self.get_rgw_endpoint(self.secondary_rgw_unit)
self.assertNotEqual(secondary_endpoint, None)
# 2. Create secondary client and fetch synchronised objects.
secondary_client = boto3.resource("s3",
verify=False,
endpoint_url=secondary_endpoint,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
# 3. Verify Data Integrity
# fetch_rgw_object has internal retry so waiting for sync beforehand
# is not required for post migration object sync.
pre_migration_data = self.fetch_rgw_object(
secondary_client, container_name, 'prefile'
)
post_migration_data = self.fetch_rgw_object(
secondary_client, container_name, 'postfile'
)
# 4. Verify Syncronisation works and objects are replicated
self.assertEqual(pre_migration_data, obj_data)
self.assertEqual(post_migration_data, obj_data)
logging.info('Checking multisite failover/failback')
keystone_session = zaza_openstack.get_overcloud_keystone_session()
source_client = zaza_openstack.get_swift_session_client(
keystone_session,
region_name='east-1',
cacert=self.cacert,
)
target_client = zaza_openstack.get_swift_session_client(
keystone_session,
region_name='west-1',
cacert=self.cacert,
)
zaza_model.run_action_on_leader(
'slave-ceph-radosgw',
'promote',
action_params={},
)
self.wait_for_sync('ceph-radosgw')
_container = 'demo-container-for-failover'
_test_data = 'Test data from Zaza on Slave'
target_client.put_container(_container)
target_client.put_object(_container,
'testfile',
contents=_test_data,
content_type='text/plain')
_, target_content = target_client.get_object(_container, 'testfile')
# Failover Scenario, Promote Slave-Ceph-RadosGW to Primary
self.promote_rgw_to_primary(self.secondary_rgw_app)
zaza_model.run_action_on_leader(
'ceph-radosgw',
'promote',
action_params={},
# Wait for Sites to be syncronised.
self.wait_for_sync(self.primary_rgw_app, is_primary=False)
self.wait_for_sync(self.secondary_rgw_app, is_primary=True)
# IO Test
container = 'demo-container-for-failover'
test_data = 'Test data from Zaza on Slave'
secondary_client.Bucket(container).create()
secondary_object = secondary_client.Object(container, 'testfile')
secondary_object.put(
Body=test_data
)
self.wait_for_sync('slave-ceph-radosgw')
secondary_content = secondary_object.get()[
'Body'
].read().decode('UTF-8')
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=1, max=60),
reraise=True, stop=tenacity.stop_after_attempt(12))
def _source_get_object():
return source_client.get_object(_container, 'testfile')
_, source_content = _source_get_object()
# Recovery scenario, reset ceph-rgw as primary.
self.promote_rgw_to_primary(self.primary_rgw_app)
self.wait_for_sync(self.primary_rgw_app, is_primary=True)
self.wait_for_sync(self.secondary_rgw_app, is_primary=False)
self.assertEqual(target_content.decode('UTF-8'),
source_content.decode('UTF-8'))
# Fetch Syncronised copy of testfile from primary site.
primary_content = self.fetch_rgw_object(
primary_client, container, 'testfile'
)
# Verify Data Integrity.
self.assertEqual(secondary_content, primary_content)
class CephProxyTest(unittest.TestCase):