
We had the normal infra pep8 ignores marked, but follow some of them by hand in this codebase anyway. While E12* are not valid pep8, the prevailing style of shade is in agreement with them. While we weren't enforcing them, a few instances snuck in. Fix them and just turn the flags on. Change-Id: Iea2bd9a99eb8b20dd40418ac8479b809d3872de8
167 lines
7.0 KiB
Python
167 lines
7.0 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""
|
|
test_object
|
|
----------------------------------
|
|
|
|
Functional tests for `shade` object methods.
|
|
"""
|
|
|
|
import random
|
|
import string
|
|
import tempfile
|
|
|
|
from testtools import content
|
|
|
|
from shade import exc
|
|
from shade.tests.functional import base
|
|
|
|
|
|
class TestObject(base.BaseFunctionalTestCase):
|
|
|
|
def setUp(self):
|
|
super(TestObject, self).setUp()
|
|
if not self.user_cloud.has_service('object-store'):
|
|
self.skipTest('Object service not supported by cloud')
|
|
|
|
def test_create_object(self):
|
|
'''Test uploading small and large files.'''
|
|
container_name = self.getUniqueString('container')
|
|
self.addDetail('container', content.text_content(container_name))
|
|
self.addCleanup(self.user_cloud.delete_container, container_name)
|
|
self.user_cloud.create_container(container_name)
|
|
self.assertEqual(container_name,
|
|
self.user_cloud.list_containers()[0]['name'])
|
|
sizes = (
|
|
(64 * 1024, 1), # 64K, one segment
|
|
(64 * 1024, 5) # 64MB, 5 segments
|
|
)
|
|
for size, nseg in sizes:
|
|
segment_size = int(round(size / nseg))
|
|
with tempfile.NamedTemporaryFile() as fake_file:
|
|
fake_content = ''.join(random.SystemRandom().choice(
|
|
string.ascii_uppercase + string.digits)
|
|
for _ in range(size)).encode('latin-1')
|
|
|
|
fake_file.write(fake_content)
|
|
fake_file.flush()
|
|
name = 'test-%d' % size
|
|
self.addCleanup(
|
|
self.user_cloud.delete_object, container_name, name)
|
|
self.user_cloud.create_object(
|
|
container_name, name,
|
|
fake_file.name,
|
|
segment_size=segment_size,
|
|
metadata={'foo': 'bar'})
|
|
self.assertFalse(self.user_cloud.is_object_stale(
|
|
container_name, name,
|
|
fake_file.name
|
|
))
|
|
self.assertEqual(
|
|
'bar', self.user_cloud.get_object_metadata(
|
|
container_name, name)['x-object-meta-foo']
|
|
)
|
|
self.user_cloud.update_object(container=container_name, name=name,
|
|
metadata={'testk': 'testv'})
|
|
self.assertEqual(
|
|
'testv', self.user_cloud.get_object_metadata(
|
|
container_name, name)['x-object-meta-testk']
|
|
)
|
|
try:
|
|
self.assertIsNotNone(
|
|
self.user_cloud.get_object(container_name, name))
|
|
except exc.OpenStackCloudException as e:
|
|
self.addDetail(
|
|
'failed_response',
|
|
content.text_content(str(e.response.headers)))
|
|
self.addDetail(
|
|
'failed_response',
|
|
content.text_content(e.response.text))
|
|
self.assertEqual(
|
|
name,
|
|
self.user_cloud.list_objects(container_name)[0]['name'])
|
|
self.assertTrue(
|
|
self.user_cloud.delete_object(container_name, name))
|
|
self.assertEqual([], self.user_cloud.list_objects(container_name))
|
|
self.assertEqual(container_name,
|
|
self.user_cloud.list_containers()[0]['name'])
|
|
self.user_cloud.delete_container(container_name)
|
|
|
|
def test_download_object_to_file(self):
|
|
'''Test uploading small and large files.'''
|
|
container_name = self.getUniqueString('container')
|
|
self.addDetail('container', content.text_content(container_name))
|
|
self.addCleanup(self.user_cloud.delete_container, container_name)
|
|
self.user_cloud.create_container(container_name)
|
|
self.assertEqual(container_name,
|
|
self.user_cloud.list_containers()[0]['name'])
|
|
sizes = (
|
|
(64 * 1024, 1), # 64K, one segment
|
|
(64 * 1024, 5) # 64MB, 5 segments
|
|
)
|
|
for size, nseg in sizes:
|
|
fake_content = ''
|
|
segment_size = int(round(size / nseg))
|
|
with tempfile.NamedTemporaryFile() as fake_file:
|
|
fake_content = ''.join(random.SystemRandom().choice(
|
|
string.ascii_uppercase + string.digits)
|
|
for _ in range(size)).encode('latin-1')
|
|
|
|
fake_file.write(fake_content)
|
|
fake_file.flush()
|
|
name = 'test-%d' % size
|
|
self.addCleanup(
|
|
self.user_cloud.delete_object, container_name, name)
|
|
self.user_cloud.create_object(
|
|
container_name, name,
|
|
fake_file.name,
|
|
segment_size=segment_size,
|
|
metadata={'foo': 'bar'})
|
|
self.assertFalse(self.user_cloud.is_object_stale(
|
|
container_name, name,
|
|
fake_file.name
|
|
))
|
|
self.assertEqual(
|
|
'bar', self.user_cloud.get_object_metadata(
|
|
container_name, name)['x-object-meta-foo']
|
|
)
|
|
self.user_cloud.update_object(container=container_name, name=name,
|
|
metadata={'testk': 'testv'})
|
|
self.assertEqual(
|
|
'testv', self.user_cloud.get_object_metadata(
|
|
container_name, name)['x-object-meta-testk']
|
|
)
|
|
try:
|
|
with tempfile.NamedTemporaryFile() as fake_file:
|
|
self.user_cloud.get_object(
|
|
container_name, name, outfile=fake_file.name)
|
|
downloaded_content = open(fake_file.name, 'rb').read()
|
|
self.assertEqual(fake_content, downloaded_content)
|
|
except exc.OpenStackCloudException as e:
|
|
self.addDetail(
|
|
'failed_response',
|
|
content.text_content(str(e.response.headers)))
|
|
self.addDetail(
|
|
'failed_response',
|
|
content.text_content(e.response.text))
|
|
raise
|
|
self.assertEqual(
|
|
name,
|
|
self.user_cloud.list_objects(container_name)[0]['name'])
|
|
self.assertTrue(
|
|
self.user_cloud.delete_object(container_name, name))
|
|
self.assertEqual([], self.user_cloud.list_objects(container_name))
|
|
self.assertEqual(container_name,
|
|
self.user_cloud.list_containers()[0]['name'])
|
|
self.user_cloud.delete_container(container_name)
|