Merge pull request #194 from openstack-charmers/add-s3client-test

swift: Add S3 API test
This commit is contained in:
Liam Young
2020-03-17 10:04:14 +00:00
committed by GitHub
3 changed files with 68 additions and 0 deletions

View File

@@ -1,5 +1,6 @@
aiounittest
async_generator
boto3
juju
juju_wait
PyYAML<=4.2,>=3.0

View File

@@ -26,6 +26,7 @@ from setuptools.command.test import test as TestCommand
version = "0.0.1.dev1"
install_require = [
'async_generator',
'boto3',
'cryptography',
'hvac<0.7.0',
'jinja2',

View File

@@ -17,6 +17,7 @@
"""Encapsulate swift testing."""
import logging
import pprint
import tenacity
import zaza.model
@@ -26,6 +27,8 @@ import zaza.openstack.configure.guest
import zaza.openstack.utilities.openstack as openstack_utils
import zaza.openstack.utilities.swift as swift_utils
import boto3
class SwiftImageCreateTest(test_utils.OpenStackBaseTest):
"""Test swift proxy via glance."""
@@ -226,3 +229,66 @@ class SwiftGlobalReplicationTests(test_utils.OpenStackBaseTest):
self.assertEqual(
len(obj_replicas.all_zones),
3)
class S3APITest(test_utils.OpenStackBaseTest):
"""Test object storage S3 API."""
@classmethod
def setUpClass(cls):
"""Run class setup for running tests."""
super(S3APITest, cls).setUpClass()
session = openstack_utils.get_overcloud_keystone_session()
ks_client = openstack_utils.get_keystone_session_client(session)
# Get token data so we can glean our user_id and project_id
token_data = ks_client.tokens.get_token_data(session.get_token())
project_id = token_data['token']['project']['id']
user_id = token_data['token']['user']['id']
# Store URL to service providing S3 compatible API
for entry in token_data['token']['catalog']:
if entry['type'] == 's3':
for endpoint in entry['endpoints']:
if endpoint['interface'] == 'public':
cls.s3_region = endpoint['region']
cls.s3_endpoint = endpoint['url']
# Create AWS compatible application credentials in Keystone
cls.ec2_creds = ks_client.ec2.create(user_id, project_id)
def test_s3_list_buckets(self):
"""Use S3 API to list buckets."""
# We use a mix of the high- and low-level API with common arguments
kwargs = {
'region_name': self.s3_region,
'aws_access_key_id': self.ec2_creds.access,
'aws_secret_access_key': self.ec2_creds.secret,
'endpoint_url': self.s3_endpoint,
}
s3_client = boto3.client('s3', **kwargs)
s3 = boto3.resource('s3', **kwargs)
# Create bucket
bucket_name = 'zaza-s3'
bucket = s3.Bucket(bucket_name)
bucket.create()
# Validate its presence
bucket_list = s3_client.list_buckets()
logging.info(pprint.pformat(bucket_list))
for bkt in bucket_list['Buckets']:
if bkt['Name'] == bucket_name:
break
else:
AssertionError('Bucket "{}" not found'.format(bucket_name))
# Delete bucket
bucket.delete()
# Validate its absence
bucket_list = s3_client.list_buckets()
logging.info(pprint.pformat(bucket_list))
for bkt in bucket_list['Buckets']:
assert bkt['Name'] != bucket_name