Refactor tests and add tests

Relocates some test infrastructure in preparation for
use with encryption tests, in particular moves the test
server setup code from test/unit/proxy/test_server.py
to a new helpers.py so that it can be re-used, and adds
ability to specify additional config options for the
test servers (used in encryption tests).

Adds unit test coverage for extract_swift_bytes and functional
test coverage for container listings. Adds a check on the content
and metadata of reconciled objects in probe tests.

Change-Id: I9bfbf4e47cb0eb370e7a74d18c78d67b6b9d6645
This commit is contained in:
Alistair Coles 2016-06-07 13:41:55 +01:00
parent 7a50972104
commit 928c4790eb
9 changed files with 393 additions and 254 deletions

View File

@ -585,6 +585,7 @@ class Container(Base):
file_item['name'] = file_item['name'].encode('utf-8') file_item['name'] = file_item['name'].encode('utf-8')
file_item['content_type'] = file_item['content_type'].\ file_item['content_type'] = file_item['content_type'].\
encode('utf-8') encode('utf-8')
file_item['bytes'] = int(file_item['bytes'])
return files return files
else: else:
content = self.conn.response.read() content = self.conn.response.read()

View File

@ -744,6 +744,30 @@ class TestContainer(Base):
for file_item in files: for file_item in files:
self.assertIn(file_item, self.env.files) self.assertIn(file_item, self.env.files)
def _testContainerFormattedFileList(self, format_type):
expected = {}
for name in self.env.files:
expected[name] = self.env.container.file(name).info()
file_list = self.env.container.files(parms={'format': format_type})
self.assert_status(200)
for actual in file_list:
name = actual['name']
self.assertIn(name, expected)
self.assertEqual(expected[name]['etag'], actual['hash'])
self.assertEqual(
expected[name]['content_type'], actual['content_type'])
self.assertEqual(
expected[name]['content_length'], actual['bytes'])
expected.pop(name)
self.assertFalse(expected) # sanity check
def testContainerJsonFileList(self):
self._testContainerFormattedFileList('json')
def testContainerXmlFileList(self):
self._testContainerFormattedFileList('xml')
def testMarkerLimitFileList(self): def testMarkerLimitFileList(self):
for format_type in [None, 'json', 'xml']: for format_type in [None, 'json', 'xml']:
for marker in ['0', 'A', 'I', 'R', 'Z', 'a', 'i', 'r', 'z', for marker in ['0', 'A', 'I', 'R', 'Z', 'a', 'i', 'r', 'z',

View File

@ -164,12 +164,12 @@ class BrainSplitter(object):
client.delete_container(self.url, self.token, self.container_name) client.delete_container(self.url, self.token, self.container_name)
@command @command
def put_object(self, headers=None): def put_object(self, headers=None, contents=None):
""" """
issue put for zero byte test object issue put for test object
""" """
client.put_object(self.url, self.token, self.container_name, client.put_object(self.url, self.token, self.container_name,
self.object_name, headers=headers) self.object_name, headers=headers, contents=contents)
@command @command
def delete_object(self): def delete_object(self):

View File

@ -46,6 +46,24 @@ class TestContainerMergePolicyIndex(ReplProbeTest):
self.brain = BrainSplitter(self.url, self.token, self.container_name, self.brain = BrainSplitter(self.url, self.token, self.container_name,
self.object_name, 'container') self.object_name, 'container')
def _get_object_patiently(self, policy_index):
# use proxy to access object (bad container info might be cached...)
timeout = time.time() + TIMEOUT
while time.time() < timeout:
try:
return client.get_object(self.url, self.token,
self.container_name,
self.object_name)
except ClientException as err:
if err.http_status != HTTP_NOT_FOUND:
raise
time.sleep(1)
else:
self.fail('could not HEAD /%s/%s/%s/ from policy %s '
'after %s seconds.' % (
self.account, self.container_name, self.object_name,
int(policy_index), TIMEOUT))
def test_merge_storage_policy_index(self): def test_merge_storage_policy_index(self):
# generic split brain # generic split brain
self.brain.stop_primary_half() self.brain.stop_primary_half()
@ -53,7 +71,8 @@ class TestContainerMergePolicyIndex(ReplProbeTest):
self.brain.start_primary_half() self.brain.start_primary_half()
self.brain.stop_handoff_half() self.brain.stop_handoff_half()
self.brain.put_container() self.brain.put_container()
self.brain.put_object() self.brain.put_object(headers={'x-object-meta-test': 'custom-meta'},
contents='VERIFY')
self.brain.start_handoff_half() self.brain.start_handoff_half()
# make sure we have some manner of split brain # make sure we have some manner of split brain
container_part, container_nodes = self.container_ring.get_nodes( container_part, container_nodes = self.container_ring.get_nodes(
@ -127,24 +146,10 @@ class TestContainerMergePolicyIndex(ReplProbeTest):
self.fail('Found /%s/%s/%s in %s' % ( self.fail('Found /%s/%s/%s in %s' % (
self.account, self.container_name, self.object_name, self.account, self.container_name, self.object_name,
orig_policy_index)) orig_policy_index))
# use proxy to access object (bad container info might be cached...) # verify that the object data read by external client is correct
timeout = time.time() + TIMEOUT headers, data = self._get_object_patiently(expected_policy_index)
while time.time() < timeout: self.assertEqual('VERIFY', data)
try: self.assertEqual('custom-meta', headers['x-object-meta-test'])
metadata = client.head_object(self.url, self.token,
self.container_name,
self.object_name)
except ClientException as err:
if err.http_status != HTTP_NOT_FOUND:
raise
time.sleep(1)
else:
break
else:
self.fail('could not HEAD /%s/%s/%s/ from policy %s '
'after %s seconds.' % (
self.account, self.container_name, self.object_name,
expected_policy_index, TIMEOUT))
def test_reconcile_delete(self): def test_reconcile_delete(self):
# generic split brain # generic split brain
@ -399,17 +404,18 @@ class TestContainerMergePolicyIndex(ReplProbeTest):
self.assertEqual(2, len(old_container_node_ids)) self.assertEqual(2, len(old_container_node_ids))
# hopefully memcache still has the new policy cached # hopefully memcache still has the new policy cached
self.brain.put_object() self.brain.put_object(headers={'x-object-meta-test': 'custom-meta'},
contents='VERIFY')
# double-check object correctly written to new policy # double-check object correctly written to new policy
conf_files = [] conf_files = []
for server in Manager(['container-reconciler']).servers: for server in Manager(['container-reconciler']).servers:
conf_files.extend(server.conf_files()) conf_files.extend(server.conf_files())
conf_file = conf_files[0] conf_file = conf_files[0]
client = InternalClient(conf_file, 'probe-test', 3) int_client = InternalClient(conf_file, 'probe-test', 3)
client.get_object_metadata( int_client.get_object_metadata(
self.account, self.container_name, self.object_name, self.account, self.container_name, self.object_name,
headers={'X-Backend-Storage-Policy-Index': int(new_policy)}) headers={'X-Backend-Storage-Policy-Index': int(new_policy)})
client.get_object_metadata( int_client.get_object_metadata(
self.account, self.container_name, self.object_name, self.account, self.container_name, self.object_name,
acceptable_statuses=(4,), acceptable_statuses=(4,),
headers={'X-Backend-Storage-Policy-Index': int(old_policy)}) headers={'X-Backend-Storage-Policy-Index': int(old_policy)})
@ -423,9 +429,9 @@ class TestContainerMergePolicyIndex(ReplProbeTest):
tuple(server.once(number=n + 1) for n in old_container_node_ids) tuple(server.once(number=n + 1) for n in old_container_node_ids)
# verify entry in the queue for the "misplaced" new_policy # verify entry in the queue for the "misplaced" new_policy
for container in client.iter_containers('.misplaced_objects'): for container in int_client.iter_containers('.misplaced_objects'):
for obj in client.iter_objects('.misplaced_objects', for obj in int_client.iter_objects('.misplaced_objects',
container['name']): container['name']):
expected = '%d:/%s/%s/%s' % (new_policy, self.account, expected = '%d:/%s/%s/%s' % (new_policy, self.account,
self.container_name, self.container_name,
self.object_name) self.object_name)
@ -434,12 +440,12 @@ class TestContainerMergePolicyIndex(ReplProbeTest):
Manager(['container-reconciler']).once() Manager(['container-reconciler']).once()
# verify object in old_policy # verify object in old_policy
client.get_object_metadata( int_client.get_object_metadata(
self.account, self.container_name, self.object_name, self.account, self.container_name, self.object_name,
headers={'X-Backend-Storage-Policy-Index': int(old_policy)}) headers={'X-Backend-Storage-Policy-Index': int(old_policy)})
# verify object is *not* in new_policy # verify object is *not* in new_policy
client.get_object_metadata( int_client.get_object_metadata(
self.account, self.container_name, self.object_name, self.account, self.container_name, self.object_name,
acceptable_statuses=(4,), acceptable_statuses=(4,),
headers={'X-Backend-Storage-Policy-Index': int(new_policy)}) headers={'X-Backend-Storage-Policy-Index': int(new_policy)})
@ -447,10 +453,9 @@ class TestContainerMergePolicyIndex(ReplProbeTest):
self.get_to_final_state() self.get_to_final_state()
# verify entry in the queue # verify entry in the queue
client = InternalClient(conf_file, 'probe-test', 3) for container in int_client.iter_containers('.misplaced_objects'):
for container in client.iter_containers('.misplaced_objects'): for obj in int_client.iter_objects('.misplaced_objects',
for obj in client.iter_objects('.misplaced_objects', container['name']):
container['name']):
expected = '%d:/%s/%s/%s' % (old_policy, self.account, expected = '%d:/%s/%s/%s' % (old_policy, self.account,
self.container_name, self.container_name,
self.object_name) self.object_name)
@ -459,21 +464,26 @@ class TestContainerMergePolicyIndex(ReplProbeTest):
Manager(['container-reconciler']).once() Manager(['container-reconciler']).once()
# and now it flops back # and now it flops back
client.get_object_metadata( int_client.get_object_metadata(
self.account, self.container_name, self.object_name, self.account, self.container_name, self.object_name,
headers={'X-Backend-Storage-Policy-Index': int(new_policy)}) headers={'X-Backend-Storage-Policy-Index': int(new_policy)})
client.get_object_metadata( int_client.get_object_metadata(
self.account, self.container_name, self.object_name, self.account, self.container_name, self.object_name,
acceptable_statuses=(4,), acceptable_statuses=(4,),
headers={'X-Backend-Storage-Policy-Index': int(old_policy)}) headers={'X-Backend-Storage-Policy-Index': int(old_policy)})
# make sure the queue is settled # make sure the queue is settled
self.get_to_final_state() self.get_to_final_state()
for container in client.iter_containers('.misplaced_objects'): for container in int_client.iter_containers('.misplaced_objects'):
for obj in client.iter_objects('.misplaced_objects', for obj in int_client.iter_objects('.misplaced_objects',
container['name']): container['name']):
self.fail('Found unexpected object %r in the queue' % obj) self.fail('Found unexpected object %r in the queue' % obj)
# verify that the object data read by external client is correct
headers, data = self._get_object_patiently(int(new_policy))
self.assertEqual('VERIFY', data)
self.assertEqual('custom-meta', headers['x-object-meta-test'])
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@ -168,3 +168,13 @@ class FakeSwift(object):
def register_responses(self, method, path, responses): def register_responses(self, method, path, responses):
self._responses[(method, path)] = list(responses) self._responses[(method, path)] = list(responses)
class FakeAppThatExcepts(object):
MESSAGE = "We take exception to that!"
def __init__(self, exception_class=Exception):
self.exception_class = exception_class
def __call__(self, env, start_response):
raise self.exception_class(self.MESSAGE)

View File

@ -27,6 +27,7 @@ from swift.common.swob import Request, Response
from swift.common import constraints from swift.common import constraints
from swift.common.storage_policy import StoragePolicy from swift.common.storage_policy import StoragePolicy
from test.unit import patch_policies from test.unit import patch_policies
from test.unit.common.middleware.helpers import FakeAppThatExcepts
class FakeApp(object): class FakeApp(object):
@ -59,12 +60,6 @@ class FakeApp(object):
return self.body return self.body
class FakeAppThatExcepts(object):
def __call__(self, env, start_response):
raise Exception("We take exception to that!")
class FakeAppNoContentLengthNoTransferEncoding(object): class FakeAppNoContentLengthNoTransferEncoding(object):
def __init__(self, body=None): def __init__(self, body=None):

View File

@ -3210,6 +3210,24 @@ cluster_dfw1 = http://dfw1.host/v1/
self.assertEqual(listing_dict['content_type'], self.assertEqual(listing_dict['content_type'],
'text/plain;hello="world"') 'text/plain;hello="world"')
def test_extract_swift_bytes(self):
scenarios = {
# maps input value -> expected returned tuple
'': ('', None),
'text/plain': ('text/plain', None),
'text/plain; other=thing': ('text/plain;other=thing', None),
'text/plain; swift_bytes=123': ('text/plain', '123'),
'text/plain; other=thing;swift_bytes=123':
('text/plain;other=thing', '123'),
'text/plain; swift_bytes=123; other=thing':
('text/plain;other=thing', '123'),
'text/plain; swift_bytes=123; swift_bytes=456':
('text/plain', '456'),
'text/plain; swift_bytes=123; other=thing;swift_bytes=456':
('text/plain;other=thing', '456')}
for test_value, expected in scenarios.items():
self.assertEqual(expected, utils.extract_swift_bytes(test_value))
def test_clean_content_type(self): def test_clean_content_type(self):
subtests = { subtests = {
'': '', 'text/plain': 'text/plain', '': '', 'text/plain': 'text/plain',

271
test/unit/helpers.py Normal file
View File

@ -0,0 +1,271 @@
# Copyright (c) 2010-2016 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Provides helper functions for unit tests.
This cannot be in test/unit/__init__.py because that module is imported by the
py34 unit test job and there are imports here that end up importing modules
that are not yet ported to py34, such wsgi.py which import mimetools.
"""
import os
from contextlib import closing
from gzip import GzipFile
from tempfile import mkdtemp
import time
from eventlet import listen, spawn, wsgi
import mock
from shutil import rmtree
import six.moves.cPickle as pickle
import swift
from swift.account import server as account_server
from swift.common import storage_policy
from swift.common.ring import RingData
from swift.common.storage_policy import StoragePolicy, ECStoragePolicy
from swift.common.middleware import proxy_logging
from swift.common import utils
from swift.common.utils import mkdirs, normalize_timestamp, NullLogger
from swift.container import server as container_server
from swift.obj import server as object_server
from swift.proxy import server as proxy_server
import swift.proxy.controllers.obj
from test.unit import write_fake_ring, DEFAULT_TEST_EC_TYPE, debug_logger, \
connect_tcp, readuntil2crlfs
def setup_servers(the_object_server=object_server, extra_conf=None):
"""
Setup proxy, account, container and object servers using a set of fake
rings and policies.
:param the_object_server: The object server module to use (optional,
defaults to swift.obj.server)
:param extra_conf: A dict of config options that will update the basic
config passed to all server instances.
:returns: A dict containing the following entries:
orig_POLICIES: the value of storage_policy.POLICIES prior to
it being patched with fake policies
orig_SysLogHandler: the value of utils.SysLogHandler prior to
it being patched
testdir: root directory used for test files
test_POLICIES: a StoragePolicyCollection of fake policies
test_servers: a tuple of test server instances
test_sockets: a tuple of sockets used by test servers
test_coros: a tuple of greenthreads in which test servers are
running
"""
context = {
"orig_POLICIES": storage_policy._POLICIES,
"orig_SysLogHandler": utils.SysLogHandler}
utils.HASH_PATH_SUFFIX = 'endcap'
utils.SysLogHandler = mock.MagicMock()
# Since we're starting up a lot here, we're going to test more than
# just chunked puts; we're also going to test parts of
# proxy_server.Application we couldn't get to easily otherwise.
context["testdir"] = _testdir = \
os.path.join(mkdtemp(), 'tmp_test_proxy_server_chunked')
mkdirs(_testdir)
rmtree(_testdir)
for drive in ('sda1', 'sdb1', 'sdc1', 'sdd1', 'sde1',
'sdf1', 'sdg1', 'sdh1', 'sdi1'):
mkdirs(os.path.join(_testdir, drive, 'tmp'))
conf = {'devices': _testdir, 'swift_dir': _testdir,
'mount_check': 'false', 'allowed_headers':
'content-encoding, x-object-manifest, content-disposition, foo',
'allow_versions': 't'}
if extra_conf:
conf.update(extra_conf)
prolis = listen(('localhost', 0))
acc1lis = listen(('localhost', 0))
acc2lis = listen(('localhost', 0))
con1lis = listen(('localhost', 0))
con2lis = listen(('localhost', 0))
obj1lis = listen(('localhost', 0))
obj2lis = listen(('localhost', 0))
obj3lis = listen(('localhost', 0))
objsocks = [obj1lis, obj2lis, obj3lis]
context["test_sockets"] = \
(prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis, obj2lis, obj3lis)
account_ring_path = os.path.join(_testdir, 'account.ring.gz')
account_devs = [
{'port': acc1lis.getsockname()[1]},
{'port': acc2lis.getsockname()[1]},
]
write_fake_ring(account_ring_path, *account_devs)
container_ring_path = os.path.join(_testdir, 'container.ring.gz')
container_devs = [
{'port': con1lis.getsockname()[1]},
{'port': con2lis.getsockname()[1]},
]
write_fake_ring(container_ring_path, *container_devs)
storage_policy._POLICIES = storage_policy.StoragePolicyCollection([
StoragePolicy(0, 'zero', True),
StoragePolicy(1, 'one', False),
StoragePolicy(2, 'two', False),
ECStoragePolicy(3, 'ec', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=2, ec_nparity=1, ec_segment_size=4096)])
obj_rings = {
0: ('sda1', 'sdb1'),
1: ('sdc1', 'sdd1'),
2: ('sde1', 'sdf1'),
# sdg1, sdh1, sdi1 taken by policy 3 (see below)
}
for policy_index, devices in obj_rings.items():
policy = storage_policy.POLICIES[policy_index]
obj_ring_path = os.path.join(_testdir, policy.ring_name + '.ring.gz')
obj_devs = [
{'port': objsock.getsockname()[1], 'device': dev}
for objsock, dev in zip(objsocks, devices)]
write_fake_ring(obj_ring_path, *obj_devs)
# write_fake_ring can't handle a 3-element ring, and the EC policy needs
# at least 3 devs to work with, so we do it manually
devs = [{'id': 0, 'zone': 0, 'device': 'sdg1', 'ip': '127.0.0.1',
'port': obj1lis.getsockname()[1]},
{'id': 1, 'zone': 0, 'device': 'sdh1', 'ip': '127.0.0.1',
'port': obj2lis.getsockname()[1]},
{'id': 2, 'zone': 0, 'device': 'sdi1', 'ip': '127.0.0.1',
'port': obj3lis.getsockname()[1]}]
pol3_replica2part2dev_id = [[0, 1, 2, 0],
[1, 2, 0, 1],
[2, 0, 1, 2]]
obj3_ring_path = os.path.join(
_testdir, storage_policy.POLICIES[3].ring_name + '.ring.gz')
part_shift = 30
with closing(GzipFile(obj3_ring_path, 'wb')) as fh:
pickle.dump(RingData(pol3_replica2part2dev_id, devs, part_shift), fh)
prosrv = proxy_server.Application(conf, logger=debug_logger('proxy'))
for policy in storage_policy.POLICIES:
# make sure all the rings are loaded
prosrv.get_object_ring(policy.idx)
# don't lose this one!
context["test_POLICIES"] = storage_policy._POLICIES
acc1srv = account_server.AccountController(
conf, logger=debug_logger('acct1'))
acc2srv = account_server.AccountController(
conf, logger=debug_logger('acct2'))
con1srv = container_server.ContainerController(
conf, logger=debug_logger('cont1'))
con2srv = container_server.ContainerController(
conf, logger=debug_logger('cont2'))
obj1srv = the_object_server.ObjectController(
conf, logger=debug_logger('obj1'))
obj2srv = the_object_server.ObjectController(
conf, logger=debug_logger('obj2'))
obj3srv = the_object_server.ObjectController(
conf, logger=debug_logger('obj3'))
context["test_servers"] = \
(prosrv, acc1srv, acc2srv, con1srv, con2srv, obj1srv, obj2srv, obj3srv)
nl = NullLogger()
logging_prosv = proxy_logging.ProxyLoggingMiddleware(prosrv, conf,
logger=prosrv.logger)
prospa = spawn(wsgi.server, prolis, logging_prosv, nl)
acc1spa = spawn(wsgi.server, acc1lis, acc1srv, nl)
acc2spa = spawn(wsgi.server, acc2lis, acc2srv, nl)
con1spa = spawn(wsgi.server, con1lis, con1srv, nl)
con2spa = spawn(wsgi.server, con2lis, con2srv, nl)
obj1spa = spawn(wsgi.server, obj1lis, obj1srv, nl)
obj2spa = spawn(wsgi.server, obj2lis, obj2srv, nl)
obj3spa = spawn(wsgi.server, obj3lis, obj3srv, nl)
context["test_coros"] = \
(prospa, acc1spa, acc2spa, con1spa, con2spa, obj1spa, obj2spa, obj3spa)
# Create account
ts = normalize_timestamp(time.time())
partition, nodes = prosrv.account_ring.get_nodes('a')
for node in nodes:
conn = swift.proxy.controllers.obj.http_connect(node['ip'],
node['port'],
node['device'],
partition, 'PUT', '/a',
{'X-Timestamp': ts,
'x-trans-id': 'test'})
resp = conn.getresponse()
assert(resp.status == 201)
# Create another account
# used for account-to-account tests
ts = normalize_timestamp(time.time())
partition, nodes = prosrv.account_ring.get_nodes('a1')
for node in nodes:
conn = swift.proxy.controllers.obj.http_connect(node['ip'],
node['port'],
node['device'],
partition, 'PUT',
'/a1',
{'X-Timestamp': ts,
'x-trans-id': 'test'})
resp = conn.getresponse()
assert(resp.status == 201)
# Create containers, 1 per test policy
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('PUT /v1/a/c HTTP/1.1\r\nHost: localhost\r\n'
'Connection: close\r\nX-Auth-Token: t\r\n'
'Content-Length: 0\r\n\r\n')
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 201'
assert headers[:len(exp)] == exp, "Expected '%s', encountered '%s'" % (
exp, headers[:len(exp)])
# Create container in other account
# used for account-to-account tests
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('PUT /v1/a1/c1 HTTP/1.1\r\nHost: localhost\r\n'
'Connection: close\r\nX-Auth-Token: t\r\n'
'Content-Length: 0\r\n\r\n')
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 201'
assert headers[:len(exp)] == exp, "Expected '%s', encountered '%s'" % (
exp, headers[:len(exp)])
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write(
'PUT /v1/a/c1 HTTP/1.1\r\nHost: localhost\r\n'
'Connection: close\r\nX-Auth-Token: t\r\nX-Storage-Policy: one\r\n'
'Content-Length: 0\r\n\r\n')
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 201'
assert headers[:len(exp)] == exp, \
"Expected '%s', encountered '%s'" % (exp, headers[:len(exp)])
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write(
'PUT /v1/a/c2 HTTP/1.1\r\nHost: localhost\r\n'
'Connection: close\r\nX-Auth-Token: t\r\nX-Storage-Policy: two\r\n'
'Content-Length: 0\r\n\r\n')
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 201'
assert headers[:len(exp)] == exp, \
"Expected '%s', encountered '%s'" % (exp, headers[:len(exp)])
return context
def teardown_servers(context):
for server in context["test_coros"]:
server.kill()
rmtree(os.path.dirname(context["testdir"]))
utils.SysLogHandler = context["orig_SysLogHandler"]
storage_policy._POLICIES = context["orig_POLICIES"]

View File

@ -20,12 +20,10 @@ import logging
import json import json
import math import math
import os import os
import pickle
import sys import sys
import traceback import traceback
import unittest import unittest
from contextlib import closing, contextmanager from contextlib import contextmanager
from gzip import GzipFile
from shutil import rmtree from shutil import rmtree
import gc import gc
import time import time
@ -55,13 +53,11 @@ from swift.common.utils import hash_path, storage_directory, \
iter_multipart_mime_documents, public iter_multipart_mime_documents, public
from test.unit import ( from test.unit import (
connect_tcp, readuntil2crlfs, FakeLogger, fake_http_connect, FakeRing, connect_tcp, readuntil2crlfs, FakeLogger, FakeRing, fake_http_connect,
FakeMemcache, debug_logger, patch_policies, write_fake_ring, FakeMemcache, debug_logger, patch_policies, write_fake_ring,
mocked_http_conn, DEFAULT_TEST_EC_TYPE) mocked_http_conn, DEFAULT_TEST_EC_TYPE)
from swift.proxy import server as proxy_server from swift.proxy import server as proxy_server
from swift.proxy.controllers.obj import ReplicatedObjectController from swift.proxy.controllers.obj import ReplicatedObjectController
from swift.account import server as account_server
from swift.container import server as container_server
from swift.obj import server as object_server from swift.obj import server as object_server
from swift.common.middleware import proxy_logging, versioned_writes, \ from swift.common.middleware import proxy_logging, versioned_writes, \
copy copy
@ -69,8 +65,7 @@ from swift.common.middleware.acl import parse_acl, format_acl
from swift.common.exceptions import ChunkReadTimeout, DiskFileNotExist, \ from swift.common.exceptions import ChunkReadTimeout, DiskFileNotExist, \
APIVersionError, ChunkWriteTimeout APIVersionError, ChunkWriteTimeout
from swift.common import utils, constraints from swift.common import utils, constraints
from swift.common.ring import RingData from swift.common.utils import mkdirs, NullLogger
from swift.common.utils import mkdirs, normalize_timestamp, NullLogger
from swift.common.wsgi import monkey_patch_mimetools, loadapp from swift.common.wsgi import monkey_patch_mimetools, loadapp
from swift.proxy.controllers import base as proxy_base from swift.proxy.controllers import base as proxy_base
from swift.proxy.controllers.base import get_cache_key, cors_validation, \ from swift.proxy.controllers.base import get_cache_key, cors_validation, \
@ -80,212 +75,31 @@ import swift.proxy.controllers.obj
from swift.common.header_key_dict import HeaderKeyDict from swift.common.header_key_dict import HeaderKeyDict
from swift.common.swob import Request, Response, HTTPUnauthorized, \ from swift.common.swob import Request, Response, HTTPUnauthorized, \
HTTPException, HTTPBadRequest HTTPException, HTTPBadRequest
from swift.common import storage_policy from swift.common.storage_policy import StoragePolicy, POLICIES
from swift.common.storage_policy import StoragePolicy, ECStoragePolicy, \
StoragePolicyCollection, POLICIES
import swift.common.request_helpers import swift.common.request_helpers
from swift.common.request_helpers import get_sys_meta_prefix from swift.common.request_helpers import get_sys_meta_prefix
from test.unit.helpers import setup_servers, teardown_servers
# mocks # mocks
logging.getLogger().addHandler(logging.StreamHandler(sys.stdout)) logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))
STATIC_TIME = time.time() STATIC_TIME = time.time()
_test_coros = _test_servers = _test_sockets = _orig_container_listing_limit = \ _test_context = _test_servers = _test_sockets = _testdir = \
_testdir = _orig_SysLogHandler = _orig_POLICIES = _test_POLICIES = None _test_POLICIES = None
def do_setup(the_object_server): def do_setup(object_server):
utils.HASH_PATH_SUFFIX = 'endcap' # setup test context and break out some globals for convenience
global _testdir, _test_servers, _test_sockets, \ global _test_context, _testdir, _test_servers, _test_sockets, \
_orig_container_listing_limit, _test_coros, _orig_SysLogHandler, \ _test_POLICIES
_orig_POLICIES, _test_POLICIES
_orig_POLICIES = storage_policy._POLICIES
_orig_SysLogHandler = utils.SysLogHandler
utils.SysLogHandler = mock.MagicMock()
monkey_patch_mimetools() monkey_patch_mimetools()
# Since we're starting up a lot here, we're going to test more than _test_context = setup_servers(object_server)
# just chunked puts; we're also going to test parts of _testdir = _test_context["testdir"]
# proxy_server.Application we couldn't get to easily otherwise. _test_servers = _test_context["test_servers"]
_testdir = \ _test_sockets = _test_context["test_sockets"]
os.path.join(mkdtemp(), 'tmp_test_proxy_server_chunked') _test_POLICIES = _test_context["test_POLICIES"]
mkdirs(_testdir)
rmtree(_testdir)
for drive in ('sda1', 'sdb1', 'sdc1', 'sdd1', 'sde1',
'sdf1', 'sdg1', 'sdh1', 'sdi1'):
mkdirs(os.path.join(_testdir, drive, 'tmp'))
conf = {'devices': _testdir, 'swift_dir': _testdir,
'mount_check': 'false', 'allowed_headers':
'content-encoding, x-object-manifest, content-disposition, foo',
'allow_versions': 't'}
prolis = listen(('localhost', 0))
acc1lis = listen(('localhost', 0))
acc2lis = listen(('localhost', 0))
con1lis = listen(('localhost', 0))
con2lis = listen(('localhost', 0))
obj1lis = listen(('localhost', 0))
obj2lis = listen(('localhost', 0))
obj3lis = listen(('localhost', 0))
objsocks = [obj1lis, obj2lis, obj3lis]
_test_sockets = \
(prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis, obj2lis, obj3lis)
account_ring_path = os.path.join(_testdir, 'account.ring.gz')
account_devs = [
{'port': acc1lis.getsockname()[1]},
{'port': acc2lis.getsockname()[1]},
]
write_fake_ring(account_ring_path, *account_devs)
container_ring_path = os.path.join(_testdir, 'container.ring.gz')
container_devs = [
{'port': con1lis.getsockname()[1]},
{'port': con2lis.getsockname()[1]},
]
write_fake_ring(container_ring_path, *container_devs)
storage_policy._POLICIES = StoragePolicyCollection([
StoragePolicy(0, 'zero', True),
StoragePolicy(1, 'one', False),
StoragePolicy(2, 'two', False),
ECStoragePolicy(3, 'ec', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=2, ec_nparity=1, ec_segment_size=4096)])
obj_rings = {
0: ('sda1', 'sdb1'),
1: ('sdc1', 'sdd1'),
2: ('sde1', 'sdf1'),
# sdg1, sdh1, sdi1 taken by policy 3 (see below)
}
for policy_index, devices in obj_rings.items():
policy = POLICIES[policy_index]
obj_ring_path = os.path.join(_testdir, policy.ring_name + '.ring.gz')
obj_devs = [
{'port': objsock.getsockname()[1], 'device': dev}
for objsock, dev in zip(objsocks, devices)]
write_fake_ring(obj_ring_path, *obj_devs)
# write_fake_ring can't handle a 3-element ring, and the EC policy needs
# at least 3 devs to work with, so we do it manually
devs = [{'id': 0, 'zone': 0, 'device': 'sdg1', 'ip': '127.0.0.1',
'port': obj1lis.getsockname()[1]},
{'id': 1, 'zone': 0, 'device': 'sdh1', 'ip': '127.0.0.1',
'port': obj2lis.getsockname()[1]},
{'id': 2, 'zone': 0, 'device': 'sdi1', 'ip': '127.0.0.1',
'port': obj3lis.getsockname()[1]}]
pol3_replica2part2dev_id = [[0, 1, 2, 0],
[1, 2, 0, 1],
[2, 0, 1, 2]]
obj3_ring_path = os.path.join(_testdir, POLICIES[3].ring_name + '.ring.gz')
part_shift = 30
with closing(GzipFile(obj3_ring_path, 'wb')) as fh:
pickle.dump(RingData(pol3_replica2part2dev_id, devs, part_shift), fh)
prosrv = proxy_server.Application(conf, FakeMemcacheReturnsNone(),
logger=debug_logger('proxy'))
for policy in POLICIES:
# make sure all the rings are loaded
prosrv.get_object_ring(policy.idx)
# don't lose this one!
_test_POLICIES = storage_policy._POLICIES
acc1srv = account_server.AccountController(
conf, logger=debug_logger('acct1'))
acc2srv = account_server.AccountController(
conf, logger=debug_logger('acct2'))
con1srv = container_server.ContainerController(
conf, logger=debug_logger('cont1'))
con2srv = container_server.ContainerController(
conf, logger=debug_logger('cont2'))
obj1srv = the_object_server.ObjectController(
conf, logger=debug_logger('obj1'))
obj2srv = the_object_server.ObjectController(
conf, logger=debug_logger('obj2'))
obj3srv = the_object_server.ObjectController(
conf, logger=debug_logger('obj3'))
_test_servers = \
(prosrv, acc1srv, acc2srv, con1srv, con2srv, obj1srv, obj2srv, obj3srv)
nl = NullLogger()
logging_prosv = proxy_logging.ProxyLoggingMiddleware(prosrv, conf,
logger=prosrv.logger)
prospa = spawn(wsgi.server, prolis, logging_prosv, nl)
acc1spa = spawn(wsgi.server, acc1lis, acc1srv, nl)
acc2spa = spawn(wsgi.server, acc2lis, acc2srv, nl)
con1spa = spawn(wsgi.server, con1lis, con1srv, nl)
con2spa = spawn(wsgi.server, con2lis, con2srv, nl)
obj1spa = spawn(wsgi.server, obj1lis, obj1srv, nl)
obj2spa = spawn(wsgi.server, obj2lis, obj2srv, nl)
obj3spa = spawn(wsgi.server, obj3lis, obj3srv, nl)
_test_coros = \
(prospa, acc1spa, acc2spa, con1spa, con2spa, obj1spa, obj2spa, obj3spa)
# Create account
ts = normalize_timestamp(time.time())
partition, nodes = prosrv.account_ring.get_nodes('a')
for node in nodes:
conn = swift.proxy.controllers.obj.http_connect(node['ip'],
node['port'],
node['device'],
partition, 'PUT', '/a',
{'X-Timestamp': ts,
'x-trans-id': 'test'})
resp = conn.getresponse()
assert(resp.status == 201)
# Create another account
# used for account-to-account tests
ts = normalize_timestamp(time.time())
partition, nodes = prosrv.account_ring.get_nodes('a1')
for node in nodes:
conn = swift.proxy.controllers.obj.http_connect(node['ip'],
node['port'],
node['device'],
partition, 'PUT',
'/a1',
{'X-Timestamp': ts,
'x-trans-id': 'test'})
resp = conn.getresponse()
assert(resp.status == 201)
# Create containers, 1 per test policy
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('PUT /v1/a/c HTTP/1.1\r\nHost: localhost\r\n'
'Connection: close\r\nX-Auth-Token: t\r\n'
'Content-Length: 0\r\n\r\n')
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 201'
assert headers[:len(exp)] == exp, "Expected '%s', encountered '%s'" % (
exp, headers[:len(exp)])
# Create container in other account
# used for account-to-account tests
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('PUT /v1/a1/c1 HTTP/1.1\r\nHost: localhost\r\n'
'Connection: close\r\nX-Auth-Token: t\r\n'
'Content-Length: 0\r\n\r\n')
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 201'
assert headers[:len(exp)] == exp, "Expected '%s', encountered '%s'" % (
exp, headers[:len(exp)])
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write(
'PUT /v1/a/c1 HTTP/1.1\r\nHost: localhost\r\n'
'Connection: close\r\nX-Auth-Token: t\r\nX-Storage-Policy: one\r\n'
'Content-Length: 0\r\n\r\n')
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 201'
assert headers[:len(exp)] == exp, \
"Expected '%s', encountered '%s'" % (exp, headers[:len(exp)])
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write(
'PUT /v1/a/c2 HTTP/1.1\r\nHost: localhost\r\n'
'Connection: close\r\nX-Auth-Token: t\r\nX-Storage-Policy: two\r\n'
'Content-Length: 0\r\n\r\n')
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 201'
assert headers[:len(exp)] == exp, \
"Expected '%s', encountered '%s'" % (exp, headers[:len(exp)])
def unpatch_policies(f): def unpatch_policies(f):
@ -308,11 +122,7 @@ def setup():
def teardown(): def teardown():
for server in _test_coros: teardown_servers(_test_context)
server.kill()
rmtree(os.path.dirname(_testdir))
utils.SysLogHandler = _orig_SysLogHandler
storage_policy._POLICIES = _orig_POLICIES
def sortHeaderNames(headerNames): def sortHeaderNames(headerNames):