Allow for multiple X-(Account|Container)-* headers.
When the number of account/container or container/object replicas are different, Swift had a few misbehaviors. This commit fixes them. * On an object PUT/POST/DELETE, if there were 3 object replicas and only 2 container replicas, then only 2 requests would be made to object servers. Now, 3 requests will be made, but the third won't have any X-Container-* headers in it. * On an object PUT/POST/DELETE, if there were 3 object replicas and 4 container replicas, then only 3/4 container servers would receive immediate updates; the fourth would be ignored. Now one of the object servers will receive multiple (comma-separated) values in the X-Container-* headers and it will attempt to contact both of them. One side effect is that multiple async_pendings may be written for updates to the same object. They'll have differing timestamps, though, so all but the newest will be deleted unread. To trigger this behavior, you have to have more container replicas than object replicas, 2 or more of the container servers must be down, and the headers sent to one object server must reference 2 or more down container servers; it's unlikely enough and the consequences are so minor that it didn't seem worth fixing. The situation with account/containers is analogous, only without the async_pendings. Change-Id: I98bc2de93fb6b2346d6de1d764213d7563653e8d
This commit is contained in:
parent
7e62056fd5
commit
6ff644b945
@ -1531,6 +1531,18 @@ def list_from_csv(comma_separated_str):
|
||||
return []
|
||||
|
||||
|
||||
def csv_append(csv_string, item):
|
||||
"""
|
||||
Appends an item to a comma-separated string.
|
||||
|
||||
If the comma-separated string is empty/None, just returns item.
|
||||
"""
|
||||
if csv_string:
|
||||
return ",".join((csv_string, item))
|
||||
else:
|
||||
return item
|
||||
|
||||
|
||||
def reiterate(iterable):
|
||||
"""
|
||||
Consume the first item from an iterator, then re-chain it to the rest of
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
from __future__ import with_statement
|
||||
|
||||
import itertools
|
||||
import os
|
||||
import time
|
||||
import traceback
|
||||
@ -88,19 +89,39 @@ class ContainerController(object):
|
||||
|
||||
def account_update(self, req, account, container, broker):
|
||||
"""
|
||||
Update the account server with latest container info.
|
||||
Update the account server(s) with latest container info.
|
||||
|
||||
:param req: swob.Request object
|
||||
:param account: account name
|
||||
:param container: container name
|
||||
:param broker: container DB broker object
|
||||
:returns: if the account request returns a 404 error code,
|
||||
:returns: if all the account requests return a 404 error code,
|
||||
HTTPNotFound response object, otherwise None.
|
||||
"""
|
||||
account_host = req.headers.get('X-Account-Host')
|
||||
account_partition = req.headers.get('X-Account-Partition')
|
||||
account_device = req.headers.get('X-Account-Device')
|
||||
if all([account_host, account_partition, account_device]):
|
||||
account_hosts = [h.strip() for h in
|
||||
req.headers.get('X-Account-Host', '').split(',')]
|
||||
account_devices = [d.strip() for d in
|
||||
req.headers.get('X-Account-Device', '').split(',')]
|
||||
account_partition = req.headers.get('X-Account-Partition', '')
|
||||
|
||||
if len(account_hosts) != len(account_devices):
|
||||
# This shouldn't happen unless there's a bug in the proxy,
|
||||
# but if there is, we want to know about it.
|
||||
self.logger.error(_('ERROR Account update failed: different '
|
||||
'numbers of hosts and devices in request: '
|
||||
'"%s" vs "%s"' %
|
||||
(req.headers.get('X-Account-Host', ''),
|
||||
req.headers.get('X-Account-Device', ''))))
|
||||
return
|
||||
|
||||
if account_partition:
|
||||
updates = zip(account_hosts, account_devices)
|
||||
else:
|
||||
updates = []
|
||||
|
||||
account_404s = 0
|
||||
|
||||
for account_host, account_device in updates:
|
||||
account_ip, account_port = account_host.rsplit(':', 1)
|
||||
new_path = '/' + '/'.join([account, container])
|
||||
info = broker.get_info()
|
||||
@ -122,7 +143,7 @@ class ContainerController(object):
|
||||
account_response = conn.getresponse()
|
||||
account_response.read()
|
||||
if account_response.status == HTTP_NOT_FOUND:
|
||||
return HTTPNotFound(request=req)
|
||||
account_404s += 1
|
||||
elif not is_success(account_response.status):
|
||||
self.logger.error(_(
|
||||
'ERROR Account update failed '
|
||||
@ -138,7 +159,10 @@ class ContainerController(object):
|
||||
'%(ip)s:%(port)s/%(device)s (will retry later)'),
|
||||
{'ip': account_ip, 'port': account_port,
|
||||
'device': account_device})
|
||||
return None
|
||||
if updates and account_404s == len(updates):
|
||||
return HTTPNotFound(req=req)
|
||||
else:
|
||||
return None
|
||||
|
||||
@public
|
||||
@timing_stats
|
||||
|
@ -18,6 +18,7 @@
|
||||
from __future__ import with_statement
|
||||
import cPickle as pickle
|
||||
import errno
|
||||
import itertools
|
||||
import os
|
||||
import time
|
||||
import traceback
|
||||
@ -489,16 +490,34 @@ class ObjectController(object):
|
||||
:param obj: object name
|
||||
:param headers_in: dictionary of headers from the original request
|
||||
:param headers_out: dictionary of headers to send in the container
|
||||
request
|
||||
request(s)
|
||||
:param objdevice: device name that the object is in
|
||||
"""
|
||||
host = headers_in.get('X-Container-Host', None)
|
||||
partition = headers_in.get('X-Container-Partition', None)
|
||||
contdevice = headers_in.get('X-Container-Device', None)
|
||||
if not all([host, partition, contdevice]):
|
||||
conthosts = [h.strip() for h in
|
||||
headers_in.get('X-Container-Host', '').split(',')]
|
||||
contdevices = [d.strip() for d in
|
||||
headers_in.get('X-Container-Device', '').split(',')]
|
||||
contpartition = headers_in.get('X-Container-Partition', '')
|
||||
|
||||
if len(conthosts) != len(contdevices):
|
||||
# This shouldn't happen unless there's a bug in the proxy,
|
||||
# but if there is, we want to know about it.
|
||||
self.logger.error(_('ERROR Container update failed: different '
|
||||
'numbers of hosts and devices in request: '
|
||||
'"%s" vs "%s"' %
|
||||
(req.headers.get('X-Container-Host', ''),
|
||||
req.headers.get('X-Container-Device', ''))))
|
||||
return
|
||||
self.async_update(op, account, container, obj, host, partition,
|
||||
contdevice, headers_out, objdevice)
|
||||
|
||||
if contpartition:
|
||||
updates = zip(conthosts, contdevices)
|
||||
else:
|
||||
updates = []
|
||||
|
||||
for conthost, contdevice in updates:
|
||||
self.async_update(op, account, container, obj, conthost,
|
||||
contpartition, contdevice, headers_out,
|
||||
objdevice)
|
||||
|
||||
def delete_at_update(self, op, delete_at, account, container, obj,
|
||||
headers_in, objdevice):
|
||||
@ -516,22 +535,33 @@ class ObjectController(object):
|
||||
# At that time, Swift will be so popular and pervasive I will have
|
||||
# created income for thousands of future programmers.
|
||||
delete_at = max(min(delete_at, 9999999999), 0)
|
||||
host = partition = contdevice = None
|
||||
updates = [(None, None)]
|
||||
|
||||
partition = None
|
||||
hosts = contdevices = [None]
|
||||
headers_out = {'x-timestamp': headers_in['x-timestamp'],
|
||||
'x-trans-id': headers_in.get('x-trans-id', '-')}
|
||||
if op != 'DELETE':
|
||||
host = headers_in.get('X-Delete-At-Host', None)
|
||||
partition = headers_in.get('X-Delete-At-Partition', None)
|
||||
contdevice = headers_in.get('X-Delete-At-Device', None)
|
||||
hosts = headers_in.get('X-Delete-At-Host', '')
|
||||
contdevices = headers_in.get('X-Delete-At-Device', '')
|
||||
updates = [upd for upd in
|
||||
zip((h.strip() for h in hosts.split(',')),
|
||||
(c.strip() for c in contdevices.split(',')))
|
||||
if all(upd) and partition]
|
||||
if not updates:
|
||||
updates = [(None, None)]
|
||||
headers_out['x-size'] = '0'
|
||||
headers_out['x-content-type'] = 'text/plain'
|
||||
headers_out['x-etag'] = 'd41d8cd98f00b204e9800998ecf8427e'
|
||||
self.async_update(
|
||||
op, self.expiring_objects_account,
|
||||
str(delete_at / self.expiring_objects_container_divisor *
|
||||
self.expiring_objects_container_divisor),
|
||||
'%s-%s/%s/%s' % (delete_at, account, container, obj),
|
||||
host, partition, contdevice, headers_out, objdevice)
|
||||
|
||||
for host, contdevice in updates:
|
||||
self.async_update(
|
||||
op, self.expiring_objects_account,
|
||||
str(delete_at / self.expiring_objects_container_divisor *
|
||||
self.expiring_objects_container_divisor),
|
||||
'%s-%s/%s/%s' % (delete_at, account, container, obj),
|
||||
host, partition, contdevice, headers_out, objdevice)
|
||||
|
||||
@public
|
||||
@timing_stats
|
||||
|
@ -28,7 +28,7 @@ import time
|
||||
from urllib import unquote
|
||||
from random import shuffle
|
||||
|
||||
from swift.common.utils import normalize_timestamp, public
|
||||
from swift.common.utils import normalize_timestamp, public, csv_append
|
||||
from swift.common.constraints import check_metadata, MAX_CONTAINER_NAME_LENGTH
|
||||
from swift.common.http import HTTP_ACCEPTED
|
||||
from swift.proxy.controllers.base import Controller, delay_denial, \
|
||||
@ -131,16 +131,8 @@ class ContainerController(Controller):
|
||||
return HTTPNotFound(request=req)
|
||||
container_partition, containers = self.app.container_ring.get_nodes(
|
||||
self.account_name, self.container_name)
|
||||
headers = []
|
||||
for account in accounts:
|
||||
nheaders = {'X-Timestamp': normalize_timestamp(time.time()),
|
||||
'x-trans-id': self.trans_id,
|
||||
'X-Account-Host': '%(ip)s:%(port)s' % account,
|
||||
'X-Account-Partition': account_partition,
|
||||
'X-Account-Device': account['device'],
|
||||
'Connection': 'close'}
|
||||
self.transfer_headers(req.headers, nheaders)
|
||||
headers.append(nheaders)
|
||||
headers = self._backend_requests(req, len(containers),
|
||||
account_partition, accounts)
|
||||
if self.app.memcache:
|
||||
cache_key = get_container_memcache_key(self.account_name,
|
||||
self.container_name)
|
||||
@ -185,14 +177,8 @@ class ContainerController(Controller):
|
||||
return HTTPNotFound(request=req)
|
||||
container_partition, containers = self.app.container_ring.get_nodes(
|
||||
self.account_name, self.container_name)
|
||||
headers = []
|
||||
for account in accounts:
|
||||
headers.append({'X-Timestamp': normalize_timestamp(time.time()),
|
||||
'X-Trans-Id': self.trans_id,
|
||||
'X-Account-Host': '%(ip)s:%(port)s' % account,
|
||||
'X-Account-Partition': account_partition,
|
||||
'X-Account-Device': account['device'],
|
||||
'Connection': 'close'})
|
||||
headers = self._backend_requests(req, len(containers),
|
||||
account_partition, accounts)
|
||||
if self.app.memcache:
|
||||
cache_key = get_container_memcache_key(self.account_name,
|
||||
self.container_name)
|
||||
@ -204,3 +190,26 @@ class ContainerController(Controller):
|
||||
if resp.status_int == HTTP_ACCEPTED:
|
||||
return HTTPNotFound(request=req)
|
||||
return resp
|
||||
|
||||
def _backend_requests(self, req, n_outgoing,
|
||||
account_partition, accounts):
|
||||
headers = [{'Connection': 'close',
|
||||
'X-Timestamp': normalize_timestamp(time.time()),
|
||||
'x-trans-id': self.trans_id}
|
||||
for _junk in range(n_outgoing)]
|
||||
|
||||
for header in headers:
|
||||
self.transfer_headers(req.headers, header)
|
||||
|
||||
for i, account in enumerate(accounts):
|
||||
i = i % len(headers)
|
||||
|
||||
headers[i]['X-Account-Partition'] = account_partition
|
||||
headers[i]['X-Account-Host'] = csv_append(
|
||||
headers[i].get('X-Account-Host'),
|
||||
'%(ip)s:%(port)s' % account)
|
||||
headers[i]['X-Account-Device'] = csv_append(
|
||||
headers[i].get('X-Account-Device'),
|
||||
account['device'])
|
||||
|
||||
return headers
|
||||
|
@ -38,7 +38,7 @@ from eventlet.queue import Queue
|
||||
from eventlet.timeout import Timeout
|
||||
|
||||
from swift.common.utils import ContextPool, normalize_timestamp, \
|
||||
config_true_value, public, json
|
||||
config_true_value, public, json, csv_append
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.constraints import check_metadata, check_object_creation, \
|
||||
CONTAINER_LISTING_LIMIT, MAX_FILE_SIZE
|
||||
@ -483,23 +483,48 @@ class ObjectController(Controller):
|
||||
partition, nodes = self.app.object_ring.get_nodes(
|
||||
self.account_name, self.container_name, self.object_name)
|
||||
req.headers['X-Timestamp'] = normalize_timestamp(time.time())
|
||||
headers = []
|
||||
for container in containers:
|
||||
nheaders = dict(req.headers.iteritems())
|
||||
nheaders['Connection'] = 'close'
|
||||
nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container
|
||||
nheaders['X-Container-Partition'] = container_partition
|
||||
nheaders['X-Container-Device'] = container['device']
|
||||
if delete_at_nodes:
|
||||
node = delete_at_nodes.pop(0)
|
||||
nheaders['X-Delete-At-Host'] = '%(ip)s:%(port)s' % node
|
||||
nheaders['X-Delete-At-Partition'] = delete_at_part
|
||||
nheaders['X-Delete-At-Device'] = node['device']
|
||||
headers.append(nheaders)
|
||||
|
||||
headers = self._backend_requests(
|
||||
req, len(nodes), container_partition, containers,
|
||||
delete_at_part, delete_at_nodes)
|
||||
|
||||
resp = self.make_requests(req, self.app.object_ring, partition,
|
||||
'POST', req.path_info, headers)
|
||||
return resp
|
||||
|
||||
def _backend_requests(self, req, n_outgoing,
|
||||
container_partition, containers,
|
||||
delete_at_partition=None, delete_at_nodes=None):
|
||||
headers = [dict(req.headers.iteritems())
|
||||
for _junk in range(n_outgoing)]
|
||||
|
||||
for header in headers:
|
||||
header['Connection'] = 'close'
|
||||
|
||||
for i, container in enumerate(containers):
|
||||
i = i % len(headers)
|
||||
|
||||
headers[i]['X-Container-Partition'] = container_partition
|
||||
headers[i]['X-Container-Host'] = csv_append(
|
||||
headers[i].get('X-Container-Host'),
|
||||
'%(ip)s:%(port)s' % container)
|
||||
headers[i]['X-Container-Device'] = csv_append(
|
||||
headers[i].get('X-Container-Device'),
|
||||
container['device'])
|
||||
|
||||
for i, node in enumerate(delete_at_nodes or []):
|
||||
i = i % len(headers)
|
||||
|
||||
headers[i]['X-Delete-At-Partition'] = delete_at_partition
|
||||
headers[i]['X-Delete-At-Host'] = csv_append(
|
||||
headers[i].get('X-Delete-At-Host'),
|
||||
'%(ip)s:%(port)s' % node)
|
||||
headers[i]['X-Delete-At-Device'] = csv_append(
|
||||
headers[i].get('X-Delete-At-Device'),
|
||||
node['device'])
|
||||
|
||||
return headers
|
||||
|
||||
def _send_file(self, conn, path):
|
||||
"""Method for a file PUT coro"""
|
||||
while True:
|
||||
@ -714,22 +739,18 @@ class ObjectController(Controller):
|
||||
node_iter = self.iter_nodes(partition, nodes, self.app.object_ring)
|
||||
pile = GreenPile(len(nodes))
|
||||
chunked = req.headers.get('transfer-encoding')
|
||||
for container in containers:
|
||||
nheaders = dict(req.headers.iteritems())
|
||||
nheaders['Connection'] = 'close'
|
||||
nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container
|
||||
nheaders['X-Container-Partition'] = container_partition
|
||||
nheaders['X-Container-Device'] = container['device']
|
||||
|
||||
outgoing_headers = self._backend_requests(
|
||||
req, len(nodes), container_partition, containers,
|
||||
delete_at_part, delete_at_nodes)
|
||||
|
||||
for nheaders in outgoing_headers:
|
||||
# RFC2616:8.2.3 disallows 100-continue without a body
|
||||
if (req.content_length > 0) or chunked:
|
||||
nheaders['Expect'] = '100-continue'
|
||||
if delete_at_nodes:
|
||||
node = delete_at_nodes.pop(0)
|
||||
nheaders['X-Delete-At-Host'] = '%(ip)s:%(port)s' % node
|
||||
nheaders['X-Delete-At-Partition'] = delete_at_part
|
||||
nheaders['X-Delete-At-Device'] = node['device']
|
||||
pile.spawn(self._connect_put_node, node_iter, partition,
|
||||
req.path_info, nheaders, self.app.logger.thread_locals)
|
||||
|
||||
conns = [conn for conn in pile if conn]
|
||||
if len(conns) <= len(nodes) / 2:
|
||||
self.app.logger.error(
|
||||
@ -923,14 +944,9 @@ class ObjectController(Controller):
|
||||
'was %r' % req.headers['x-timestamp'])
|
||||
else:
|
||||
req.headers['X-Timestamp'] = normalize_timestamp(time.time())
|
||||
headers = []
|
||||
for container in containers:
|
||||
nheaders = dict(req.headers.iteritems())
|
||||
nheaders['Connection'] = 'close'
|
||||
nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container
|
||||
nheaders['X-Container-Partition'] = container_partition
|
||||
nheaders['X-Container-Device'] = container['device']
|
||||
headers.append(nheaders)
|
||||
|
||||
headers = self._backend_requests(
|
||||
req, len(nodes), container_partition, containers)
|
||||
resp = self.make_requests(req, self.app.object_ring,
|
||||
partition, 'DELETE', req.path_info, headers)
|
||||
return resp
|
||||
|
@ -13,6 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import operator
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
@ -1079,6 +1080,77 @@ class TestContainerController(unittest.TestCase):
|
||||
self.assertEquals(resp.content_type, 'application/xml')
|
||||
self.assertEquals(resp.charset, 'utf-8')
|
||||
|
||||
def test_updating_multiple_container_servers(self):
|
||||
http_connect_args = []
|
||||
def fake_http_connect(ipaddr, port, device, partition, method, path,
|
||||
headers=None, query_string=None, ssl=False):
|
||||
class SuccessfulFakeConn(object):
|
||||
@property
|
||||
def status(self):
|
||||
return 200
|
||||
|
||||
def getresponse(self):
|
||||
return self
|
||||
|
||||
def read(self):
|
||||
return ''
|
||||
|
||||
captured_args = {'ipaddr': ipaddr, 'port': port,
|
||||
'device': device, 'partition': partition,
|
||||
'method': method, 'path': path, 'ssl': ssl,
|
||||
'headers': headers, 'query_string': query_string}
|
||||
|
||||
http_connect_args.append(
|
||||
dict((k,v) for k,v in captured_args.iteritems()
|
||||
if v is not None))
|
||||
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': '12345',
|
||||
'X-Account-Partition': '30',
|
||||
'X-Account-Host': '1.2.3.4:5, 6.7.8.9:10',
|
||||
'X-Account-Device': 'sdb1, sdf1'})
|
||||
|
||||
orig_http_connect = container_server.http_connect
|
||||
try:
|
||||
container_server.http_connect = fake_http_connect
|
||||
self.controller.PUT(req)
|
||||
finally:
|
||||
container_server.http_connect = orig_http_connect
|
||||
|
||||
http_connect_args.sort(key=operator.itemgetter('ipaddr'))
|
||||
|
||||
self.assertEquals(len(http_connect_args), 2)
|
||||
self.assertEquals(
|
||||
http_connect_args[0],
|
||||
{'ipaddr': '1.2.3.4',
|
||||
'port': '5',
|
||||
'path': '/a/c',
|
||||
'device': 'sdb1',
|
||||
'partition': '30',
|
||||
'method': 'PUT',
|
||||
'ssl': False,
|
||||
'headers': {'x-bytes-used': 0,
|
||||
'x-delete-timestamp': '0',
|
||||
'x-object-count': 0,
|
||||
'x-put-timestamp': '0000012345.00000',
|
||||
'x-trans-id': '-'}})
|
||||
self.assertEquals(
|
||||
http_connect_args[1],
|
||||
{'ipaddr': '6.7.8.9',
|
||||
'port': '10',
|
||||
'path': '/a/c',
|
||||
'device': 'sdf1',
|
||||
'partition': '30',
|
||||
'method': 'PUT',
|
||||
'ssl': False,
|
||||
'headers': {'x-bytes-used': 0,
|
||||
'x-delete-timestamp': '0',
|
||||
'x-object-count': 0,
|
||||
'x-put-timestamp': '0000012345.00000',
|
||||
'x-trans-id': '-'}})
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -16,6 +16,7 @@
|
||||
""" Tests for swift.object_server """
|
||||
|
||||
import cPickle as pickle
|
||||
import operator
|
||||
import os
|
||||
import unittest
|
||||
import email
|
||||
@ -1635,6 +1636,177 @@ class TestObjectController(unittest.TestCase):
|
||||
self.assertEquals(given_args, ['127.0.0.1', '1234', 'sdc1', 1, 'PUT',
|
||||
'/a/c/o', {'x-timestamp': '1', 'x-out': 'set'}])
|
||||
|
||||
|
||||
def test_updating_multiple_delete_at_container_servers(self):
|
||||
self.object_controller.expiring_objects_account = 'exp'
|
||||
self.object_controller.expiring_objects_container_divisor = 60
|
||||
|
||||
http_connect_args = []
|
||||
def fake_http_connect(ipaddr, port, device, partition, method, path,
|
||||
headers=None, query_string=None, ssl=False):
|
||||
class SuccessfulFakeConn(object):
|
||||
@property
|
||||
def status(self):
|
||||
return 200
|
||||
|
||||
def getresponse(self):
|
||||
return self
|
||||
|
||||
def read(self):
|
||||
return ''
|
||||
|
||||
captured_args = {'ipaddr': ipaddr, 'port': port,
|
||||
'device': device, 'partition': partition,
|
||||
'method': method, 'path': path, 'ssl': ssl,
|
||||
'headers': headers, 'query_string': query_string}
|
||||
|
||||
http_connect_args.append(
|
||||
dict((k,v) for k,v in captured_args.iteritems()
|
||||
if v is not None))
|
||||
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': '12345',
|
||||
'Content-Type': 'application/burrito',
|
||||
'Content-Length': '0',
|
||||
'X-Container-Partition': '20',
|
||||
'X-Container-Host': '1.2.3.4:5',
|
||||
'X-Container-Device': 'sdb1',
|
||||
'X-Delete-At': 9999999999,
|
||||
'X-Delete-At-Host': "10.1.1.1:6001,10.2.2.2:6002",
|
||||
'X-Delete-At-Partition': '6237',
|
||||
'X-Delete-At-Device': 'sdp,sdq'})
|
||||
|
||||
orig_http_connect = object_server.http_connect
|
||||
try:
|
||||
object_server.http_connect = fake_http_connect
|
||||
resp = self.object_controller.PUT(req)
|
||||
finally:
|
||||
object_server.http_connect = orig_http_connect
|
||||
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
|
||||
http_connect_args.sort(key=operator.itemgetter('ipaddr'))
|
||||
|
||||
self.assertEquals(len(http_connect_args), 3)
|
||||
self.assertEquals(
|
||||
http_connect_args[0],
|
||||
{'ipaddr': '1.2.3.4',
|
||||
'port': '5',
|
||||
'path': '/a/c/o',
|
||||
'device': 'sdb1',
|
||||
'partition': '20',
|
||||
'method': 'PUT',
|
||||
'ssl': False,
|
||||
'headers': {'x-content-type': 'application/burrito',
|
||||
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
|
||||
'x-size': '0',
|
||||
'x-timestamp': '12345',
|
||||
'x-trans-id': '-'}})
|
||||
self.assertEquals(
|
||||
http_connect_args[1],
|
||||
{'ipaddr': '10.1.1.1',
|
||||
'port': '6001',
|
||||
'path': '/exp/9999999960/9999999999-a/c/o',
|
||||
'device': 'sdp',
|
||||
'partition': '6237',
|
||||
'method': 'PUT',
|
||||
'ssl': False,
|
||||
'headers': {'x-content-type': 'text/plain',
|
||||
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
|
||||
'x-size': '0',
|
||||
'x-timestamp': '12345',
|
||||
'x-trans-id': '-'}})
|
||||
self.assertEquals(
|
||||
http_connect_args[2],
|
||||
{'ipaddr': '10.2.2.2',
|
||||
'port': '6002',
|
||||
'path': '/exp/9999999960/9999999999-a/c/o',
|
||||
'device': 'sdq',
|
||||
'partition': '6237',
|
||||
'method': 'PUT',
|
||||
'ssl': False,
|
||||
'headers': {'x-content-type': 'text/plain',
|
||||
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
|
||||
'x-size': '0',
|
||||
'x-timestamp': '12345',
|
||||
'x-trans-id': '-'}})
|
||||
|
||||
def test_updating_multiple_container_servers(self):
|
||||
http_connect_args = []
|
||||
def fake_http_connect(ipaddr, port, device, partition, method, path,
|
||||
headers=None, query_string=None, ssl=False):
|
||||
class SuccessfulFakeConn(object):
|
||||
@property
|
||||
def status(self):
|
||||
return 200
|
||||
|
||||
def getresponse(self):
|
||||
return self
|
||||
|
||||
def read(self):
|
||||
return ''
|
||||
|
||||
captured_args = {'ipaddr': ipaddr, 'port': port,
|
||||
'device': device, 'partition': partition,
|
||||
'method': method, 'path': path, 'ssl': ssl,
|
||||
'headers': headers, 'query_string': query_string}
|
||||
|
||||
http_connect_args.append(
|
||||
dict((k,v) for k,v in captured_args.iteritems()
|
||||
if v is not None))
|
||||
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': '12345',
|
||||
'Content-Type': 'application/burrito',
|
||||
'Content-Length': '0',
|
||||
'X-Container-Partition': '20',
|
||||
'X-Container-Host': '1.2.3.4:5, 6.7.8.9:10',
|
||||
'X-Container-Device': 'sdb1, sdf1'})
|
||||
|
||||
orig_http_connect = object_server.http_connect
|
||||
try:
|
||||
object_server.http_connect = fake_http_connect
|
||||
self.object_controller.PUT(req)
|
||||
finally:
|
||||
object_server.http_connect = orig_http_connect
|
||||
|
||||
http_connect_args.sort(key=operator.itemgetter('ipaddr'))
|
||||
|
||||
self.assertEquals(len(http_connect_args), 2)
|
||||
self.assertEquals(
|
||||
http_connect_args[0],
|
||||
{'ipaddr': '1.2.3.4',
|
||||
'port': '5',
|
||||
'path': '/a/c/o',
|
||||
'device': 'sdb1',
|
||||
'partition': '20',
|
||||
'method': 'PUT',
|
||||
'ssl': False,
|
||||
'headers': {'x-content-type': 'application/burrito',
|
||||
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
|
||||
'x-size': '0',
|
||||
'x-timestamp': '12345',
|
||||
'x-trans-id': '-'}})
|
||||
self.assertEquals(
|
||||
http_connect_args[1],
|
||||
{'ipaddr': '6.7.8.9',
|
||||
'port': '10',
|
||||
'path': '/a/c/o',
|
||||
'device': 'sdf1',
|
||||
'partition': '20',
|
||||
'method': 'PUT',
|
||||
'ssl': False,
|
||||
'headers': {'x-content-type': 'application/burrito',
|
||||
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
|
||||
'x-size': '0',
|
||||
'x-timestamp': '12345',
|
||||
'x-trans-id': '-'}})
|
||||
|
||||
def test_async_update_saves_on_exception(self):
|
||||
|
||||
def fake_http_connect(*args):
|
||||
|
@ -304,27 +304,35 @@ def fake_http_connect(*code_iter, **kwargs):
|
||||
|
||||
class FakeRing(object):
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, replicas=3):
|
||||
# 9 total nodes (6 more past the initial 3) is the cap, no matter if
|
||||
# this is set higher.
|
||||
# this is set higher, or R^2 for R replicas
|
||||
self.replicas = replicas
|
||||
self.max_more_nodes = 0
|
||||
self.devs = {}
|
||||
|
||||
def set_replicas(self, replicas):
|
||||
self.replicas = replicas
|
||||
self.devs = {}
|
||||
|
||||
def get_nodes(self, account, container=None, obj=None):
|
||||
devs = []
|
||||
for x in xrange(3):
|
||||
for x in xrange(self.replicas):
|
||||
devs.append(self.devs.get(x))
|
||||
if devs[x] is None:
|
||||
self.devs[x] = devs[x] = \
|
||||
{'ip': '10.0.0.%s' % x, 'port': 1000 + x, 'device': 'sda'}
|
||||
{'ip': '10.0.0.%s' % x,
|
||||
'port': 1000 + x,
|
||||
'device': 'sd' + (chr(ord('a') + x))}
|
||||
return 1, devs
|
||||
|
||||
def get_part_nodes(self, part):
|
||||
return self.get_nodes('blah')[1]
|
||||
|
||||
def get_more_nodes(self, nodes):
|
||||
# 9 is the true cap
|
||||
for x in xrange(3, min(3 + self.max_more_nodes, 9)):
|
||||
# replicas^2 is the true cap
|
||||
for x in xrange(self.replicas, min(self.replicas + self.max_more_nodes,
|
||||
self.replicas * self.replicas)):
|
||||
yield {'ip': '10.0.0.%s' % x, 'port': 1000 + x, 'device': 'sda'}
|
||||
|
||||
|
||||
@ -743,6 +751,11 @@ class TestObjectController(unittest.TestCase):
|
||||
object_ring=FakeRing())
|
||||
monkey_patch_mimetools()
|
||||
|
||||
def tearDown(self):
|
||||
self.app.account_ring.set_replicas(3)
|
||||
self.app.container_ring.set_replicas(3)
|
||||
self.app.object_ring.set_replicas(3)
|
||||
|
||||
def assert_status_map(self, method, statuses, expected, raise_exc=False):
|
||||
with save_globals():
|
||||
kwargs = {}
|
||||
@ -3753,6 +3766,187 @@ class TestObjectController(unittest.TestCase):
|
||||
'x-foo',
|
||||
resp.headers['access-control-allow-headers'])
|
||||
|
||||
def _gather_x_container_headers(self, controller_call, req, *connect_args,
|
||||
**kwargs):
|
||||
header_list = kwargs.pop('header_list', ['X-Container-Partition',
|
||||
'X-Container-Host',
|
||||
'X-Container-Device'])
|
||||
seen_headers = []
|
||||
def capture_headers(ipaddr, port, device, partition, method,
|
||||
path, headers=None, query_string=None):
|
||||
captured = {}
|
||||
for header in header_list:
|
||||
captured[header] = headers.get(header)
|
||||
seen_headers.append(captured)
|
||||
|
||||
with save_globals():
|
||||
self.app.allow_account_management = True
|
||||
|
||||
set_http_connect(*connect_args, give_connect=capture_headers,
|
||||
**kwargs)
|
||||
resp = controller_call(req)
|
||||
self.assertEqual(2, resp.status_int // 100) # sanity check
|
||||
|
||||
# don't care about the account/container HEADs, so chuck
|
||||
# the first two requests
|
||||
return sorted(seen_headers[2:],
|
||||
key=lambda d: d.get(header_list[0]) or 'Z')
|
||||
|
||||
def test_PUT_x_container_headers_with_equal_replicas(self):
|
||||
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'Content-Length': '5'}, body='12345')
|
||||
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
|
||||
seen_headers = self._gather_x_container_headers(
|
||||
controller.PUT, req,
|
||||
200, 200, 201, 201, 201) # HEAD HEAD PUT PUT PUT
|
||||
self.assertEqual(seen_headers, [
|
||||
{'X-Container-Host': '10.0.0.0:1000',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sda'},
|
||||
{'X-Container-Host': '10.0.0.1:1001',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sdb'},
|
||||
{'X-Container-Host': '10.0.0.2:1002',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sdc'}])
|
||||
|
||||
def test_PUT_x_container_headers_with_fewer_container_replicas(self):
|
||||
self.app.container_ring.set_replicas(2)
|
||||
|
||||
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'Content-Length': '5'}, body='12345')
|
||||
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
|
||||
seen_headers = self._gather_x_container_headers(
|
||||
controller.PUT, req,
|
||||
200, 200, 201, 201, 201) # HEAD HEAD PUT PUT PUT
|
||||
|
||||
self.assertEqual(seen_headers, [
|
||||
{'X-Container-Host': '10.0.0.0:1000',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sda'},
|
||||
{'X-Container-Host': '10.0.0.1:1001',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sdb'},
|
||||
{'X-Container-Host': None,
|
||||
'X-Container-Partition': None,
|
||||
'X-Container-Device': None}])
|
||||
|
||||
def test_PUT_x_container_headers_with_more_container_replicas(self):
|
||||
self.app.container_ring.set_replicas(4)
|
||||
|
||||
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'Content-Length': '5'}, body='12345')
|
||||
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
|
||||
seen_headers = self._gather_x_container_headers(
|
||||
controller.PUT, req,
|
||||
200, 200, 201, 201, 201) # HEAD HEAD PUT PUT PUT
|
||||
|
||||
self.assertEqual(seen_headers, [
|
||||
{'X-Container-Host': '10.0.0.0:1000,10.0.0.3:1003',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sda,sdd'},
|
||||
{'X-Container-Host': '10.0.0.1:1001',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sdb'},
|
||||
{'X-Container-Host': '10.0.0.2:1002',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sdc'}])
|
||||
|
||||
def test_POST_x_container_headers_with_more_container_replicas(self):
|
||||
self.app.container_ring.set_replicas(4)
|
||||
self.app.object_post_as_copy = False
|
||||
|
||||
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'Content-Type': 'application/stuff'})
|
||||
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
|
||||
seen_headers = self._gather_x_container_headers(
|
||||
controller.POST, req,
|
||||
200, 200, 200, 200, 200) # HEAD HEAD POST POST POST
|
||||
|
||||
self.assertEqual(seen_headers, [
|
||||
{'X-Container-Host': '10.0.0.0:1000,10.0.0.3:1003',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sda,sdd'},
|
||||
{'X-Container-Host': '10.0.0.1:1001',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sdb'},
|
||||
{'X-Container-Host': '10.0.0.2:1002',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sdc'}])
|
||||
|
||||
def test_DELETE_x_container_headers_with_more_container_replicas(self):
|
||||
self.app.container_ring.set_replicas(4)
|
||||
|
||||
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'DELETE'},
|
||||
headers={'Content-Type': 'application/stuff'})
|
||||
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
|
||||
seen_headers = self._gather_x_container_headers(
|
||||
controller.DELETE, req,
|
||||
200, 200, 200, 200, 200) # HEAD HEAD DELETE DELETE DELETE
|
||||
|
||||
self.assertEqual(seen_headers, [
|
||||
{'X-Container-Host': '10.0.0.0:1000,10.0.0.3:1003',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sda,sdd'},
|
||||
{'X-Container-Host': '10.0.0.1:1001',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sdb'},
|
||||
{'X-Container-Host': '10.0.0.2:1002',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sdc'}])
|
||||
|
||||
def test_PUT_x_delete_at_with_fewer_container_replicas(self):
|
||||
self.app.container_ring.set_replicas(2)
|
||||
|
||||
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'Content-Type': 'application/stuff',
|
||||
'Content-Length': '0',
|
||||
'X-Delete-At': int(time()) + 100000})
|
||||
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
|
||||
seen_headers = self._gather_x_container_headers(
|
||||
controller.PUT, req,
|
||||
200, 200, 201, 201, 201, # HEAD HEAD PUT PUT PUT
|
||||
header_list=('X-Delete-At-Host', 'X-Delete-At-Device',
|
||||
'X-Delete-At-Partition'))
|
||||
|
||||
self.assertEqual(seen_headers, [
|
||||
{'X-Delete-At-Host': '10.0.0.0:1000',
|
||||
'X-Delete-At-Partition': 1,
|
||||
'X-Delete-At-Device': 'sda'},
|
||||
{'X-Delete-At-Host': '10.0.0.1:1001',
|
||||
'X-Delete-At-Partition': 1,
|
||||
'X-Delete-At-Device': 'sdb'},
|
||||
{'X-Delete-At-Host': None,
|
||||
'X-Delete-At-Partition': None,
|
||||
'X-Delete-At-Device': None}])
|
||||
|
||||
|
||||
def test_PUT_x_delete_at_with_more_container_replicas(self):
|
||||
self.app.container_ring.set_replicas(4)
|
||||
self.app.expiring_objects_account = 'expires'
|
||||
self.app.expiring_objects_container_divisor = 60
|
||||
|
||||
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'Content-Type': 'application/stuff',
|
||||
'Content-Length': 0,
|
||||
'X-Delete-At': int(time()) + 100000})
|
||||
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
|
||||
seen_headers = self._gather_x_container_headers(
|
||||
controller.PUT, req,
|
||||
200, 200, 201, 201, 201, # HEAD HEAD PUT PUT PUT
|
||||
header_list=('X-Delete-At-Host', 'X-Delete-At-Device',
|
||||
'X-Delete-At-Partition'))
|
||||
self.assertEqual(seen_headers, [
|
||||
{'X-Delete-At-Host': '10.0.0.0:1000,10.0.0.3:1003',
|
||||
'X-Delete-At-Partition': 1,
|
||||
'X-Delete-At-Device': 'sda,sdd'},
|
||||
{'X-Delete-At-Host': '10.0.0.1:1001',
|
||||
'X-Delete-At-Partition': 1,
|
||||
'X-Delete-At-Device': 'sdb'},
|
||||
{'X-Delete-At-Host': '10.0.0.2:1002',
|
||||
'X-Delete-At-Partition': 1,
|
||||
'X-Delete-At-Device': 'sdc'}])
|
||||
|
||||
|
||||
class TestContainerController(unittest.TestCase):
|
||||
"Test swift.proxy_server.ContainerController"
|
||||
@ -4360,6 +4554,108 @@ class TestContainerController(unittest.TestCase):
|
||||
'x-foo',
|
||||
resp.headers['access-control-allow-headers'])
|
||||
|
||||
def _gather_x_account_headers(self, controller_call, req, *connect_args,
|
||||
**kwargs):
|
||||
seen_headers = []
|
||||
to_capture = ('X-Account-Partition', 'X-Account-Host',
|
||||
'X-Account-Device')
|
||||
|
||||
def capture_headers(ipaddr, port, device, partition, method,
|
||||
path, headers=None, query_string=None):
|
||||
captured = {}
|
||||
for header in to_capture:
|
||||
captured[header] = headers.get(header)
|
||||
seen_headers.append(captured)
|
||||
|
||||
with save_globals():
|
||||
self.app.allow_account_management = True
|
||||
|
||||
set_http_connect(*connect_args, give_connect=capture_headers,
|
||||
**kwargs)
|
||||
resp = controller_call(req)
|
||||
self.assertEqual(2, resp.status_int // 100) # sanity check
|
||||
|
||||
# don't care about the account HEAD, so throw away the
|
||||
# first element
|
||||
return sorted(seen_headers[1:],
|
||||
key=lambda d: d['X-Account-Host'] or 'Z')
|
||||
|
||||
def test_PUT_x_account_headers_with_fewer_account_replicas(self):
|
||||
self.app.account_ring.set_replicas(2)
|
||||
req = Request.blank('/a/c', headers={'': ''})
|
||||
controller = proxy_server.ContainerController(self.app, 'a', 'c')
|
||||
|
||||
seen_headers = self._gather_x_account_headers(
|
||||
controller.PUT, req,
|
||||
200, 201, 201, 201) # HEAD PUT PUT PUT
|
||||
self.assertEqual(seen_headers, [
|
||||
{'X-Account-Host': '10.0.0.0:1000',
|
||||
'X-Account-Partition': 1,
|
||||
'X-Account-Device': 'sda'},
|
||||
{'X-Account-Host': '10.0.0.1:1001',
|
||||
'X-Account-Partition': 1,
|
||||
'X-Account-Device': 'sdb'},
|
||||
{'X-Account-Host': None,
|
||||
'X-Account-Partition': None,
|
||||
'X-Account-Device': None}])
|
||||
|
||||
def test_PUT_x_account_headers_with_more_account_replicas(self):
|
||||
self.app.account_ring.set_replicas(4)
|
||||
req = Request.blank('/a/c', headers={'': ''})
|
||||
controller = proxy_server.ContainerController(self.app, 'a', 'c')
|
||||
|
||||
seen_headers = self._gather_x_account_headers(
|
||||
controller.PUT, req,
|
||||
200, 201, 201, 201) # HEAD PUT PUT PUT
|
||||
self.assertEqual(seen_headers, [
|
||||
{'X-Account-Host': '10.0.0.0:1000,10.0.0.3:1003',
|
||||
'X-Account-Partition': 1,
|
||||
'X-Account-Device': 'sda,sdd'},
|
||||
{'X-Account-Host': '10.0.0.1:1001',
|
||||
'X-Account-Partition': 1,
|
||||
'X-Account-Device': 'sdb'},
|
||||
{'X-Account-Host': '10.0.0.2:1002',
|
||||
'X-Account-Partition': 1,
|
||||
'X-Account-Device': 'sdc'}])
|
||||
|
||||
def test_DELETE_x_account_headers_with_fewer_account_replicas(self):
|
||||
self.app.account_ring.set_replicas(2)
|
||||
req = Request.blank('/a/c', headers={'': ''})
|
||||
controller = proxy_server.ContainerController(self.app, 'a', 'c')
|
||||
|
||||
seen_headers = self._gather_x_account_headers(
|
||||
controller.DELETE, req,
|
||||
200, 204, 204, 204) # HEAD DELETE DELETE DELETE
|
||||
self.assertEqual(seen_headers, [
|
||||
{'X-Account-Host': '10.0.0.0:1000',
|
||||
'X-Account-Partition': 1,
|
||||
'X-Account-Device': 'sda'},
|
||||
{'X-Account-Host': '10.0.0.1:1001',
|
||||
'X-Account-Partition': 1,
|
||||
'X-Account-Device': 'sdb'},
|
||||
{'X-Account-Host': None,
|
||||
'X-Account-Partition': None,
|
||||
'X-Account-Device': None}])
|
||||
|
||||
def test_DELETE_x_account_headers_with_more_account_replicas(self):
|
||||
self.app.account_ring.set_replicas(4)
|
||||
req = Request.blank('/a/c', headers={'': ''})
|
||||
controller = proxy_server.ContainerController(self.app, 'a', 'c')
|
||||
|
||||
seen_headers = self._gather_x_account_headers(
|
||||
controller.DELETE, req,
|
||||
200, 204, 204, 204) # HEAD DELETE DELETE DELETE
|
||||
self.assertEqual(seen_headers, [
|
||||
{'X-Account-Host': '10.0.0.0:1000,10.0.0.3:1003',
|
||||
'X-Account-Partition': 1,
|
||||
'X-Account-Device': 'sda,sdd'},
|
||||
{'X-Account-Host': '10.0.0.1:1001',
|
||||
'X-Account-Partition': 1,
|
||||
'X-Account-Device': 'sdb'},
|
||||
{'X-Account-Host': '10.0.0.2:1002',
|
||||
'X-Account-Partition': 1,
|
||||
'X-Account-Device': 'sdc'}])
|
||||
|
||||
|
||||
class TestAccountController(unittest.TestCase):
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user