Merge pull request #176 from javacruft/radosgw-tests

ceph: Add tests for radosgw
This commit is contained in:
Alex Kavanagh
2019-02-05 14:17:38 +00:00
committed by GitHub
3 changed files with 81 additions and 3 deletions

View File

@@ -57,8 +57,9 @@ def func_test_runner(keep_model=False, smoke=False, dev=False, bundle=None):
deploy.deploy(
os.path.join(utils.BUNDLE_DIR, '{}.yaml'.format(t)),
model_name)
# Configure
configure.configure(model_name, test_config['configure'])
if 'configure' in test_config:
# Configure
configure.configure(model_name, test_config['configure'])
# Test
test.test(model_name, test_config['tests'])
# Destroy

View File

@@ -292,7 +292,8 @@ class CephTest(test_utils.OpenStackBaseTest):
file_mtime = None
folder_name = '/etc/ceph/dmcrypt-keys/'
with self.config_change(set_default, set_alternate):
with self.config_change(set_default, set_alternate,
application_name=juju_service):
with tempfile.TemporaryDirectory() as tempdir:
# Creating a temp dir to copy keys
temp_folder = '/tmp/dmcrypt-keys'
@@ -501,3 +502,67 @@ class CephTest(test_utils.OpenStackBaseTest):
'active'
)
logging.debug('OK')
class CephRGWTest(test_utils.OpenStackBaseTest):
"""Ceph RADOS Gateway Daemons Test Class."""
@classmethod
def setUpClass(cls):
"""Run class setup for running ceph low level tests."""
super(CephRGWTest, cls).setUpClass()
def test_processes(self):
"""Verify Ceph processes.
Verify that the expected service processes are running
on each ceph unit.
"""
logging.info('Checking radosgw processes...')
# Process name and quantity of processes to expect on each unit
ceph_radosgw_processes = {
'radosgw': 1,
}
# Units with process names and PID quantities expected
expected_processes = {
'ceph-radosgw/0': ceph_radosgw_processes,
}
actual_pids = zaza_utils.get_unit_process_ids(expected_processes)
ret = zaza_utils.validate_unit_process_ids(expected_processes,
actual_pids)
self.assertTrue(ret)
def test_services(self):
"""Verify the ceph services.
Verify the expected services are running on the service units.
"""
logging.info('Checking radosgw services...')
services = ['radosgw', 'haproxy']
for unit in zaza_model.get_units('ceph-radosgw'):
zaza_model.block_until_service_status(
unit_name=unit.entity_id,
services=services,
target_status='running'
)
def test_object_storage(self):
"""Verify object storage API.
Verify that the object storage API works as expected.
"""
logging.info('Checking Swift REST API')
keystone_session = zaza_openstack.get_overcloud_keystone_session()
swift_client = zaza_openstack.get_swift_session_client(
keystone_session)
_container = 'demo-container'
swift_client.put_container(_container)
swift_client.put_object(_container,
'testfile',
contents='Test data from Zaza',
content_type='text/plain')
_, content = swift_client.get_object(_container, 'testfile')
self.assertEqual(content.decode('UTF-8'),
'Test data from Zaza')

View File

@@ -38,6 +38,7 @@ from novaclient import client as novaclient_client
from neutronclient.v2_0 import client as neutronclient
from neutronclient.common import exceptions as neutronexceptions
from octaviaclient.api.v2 import octavia as octaviaclient
from swiftclient import client as swiftclient
import io
import juju_wait
@@ -201,6 +202,17 @@ def get_neutron_session_client(session):
return neutronclient.Client(session=session)
def get_swift_session_client(session):
"""Return swiftclient authenticated by keystone session.
:param session: Keystone session object
:type session: keystoneauth1.session.Session object
:returns: Authenticated swiftclient
:rtype: swiftclient.Client object
"""
return swiftclient.Connection(session=session)
def get_octavia_session_client(session, service_type='load-balancer',
interface='internal'):
"""Return neutronclient authenticated by keystone session.