Merge "Allow for multiple X-(Account|Container)-* headers."
This commit is contained in:
commit
64270fab71
@ -1531,6 +1531,18 @@ def list_from_csv(comma_separated_str):
|
||||
return []
|
||||
|
||||
|
||||
def csv_append(csv_string, item):
|
||||
"""
|
||||
Appends an item to a comma-separated string.
|
||||
|
||||
If the comma-separated string is empty/None, just returns item.
|
||||
"""
|
||||
if csv_string:
|
||||
return ",".join((csv_string, item))
|
||||
else:
|
||||
return item
|
||||
|
||||
|
||||
def reiterate(iterable):
|
||||
"""
|
||||
Consume the first item from an iterator, then re-chain it to the rest of
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
from __future__ import with_statement
|
||||
|
||||
import itertools
|
||||
import os
|
||||
import time
|
||||
import traceback
|
||||
@ -88,19 +89,39 @@ class ContainerController(object):
|
||||
|
||||
def account_update(self, req, account, container, broker):
|
||||
"""
|
||||
Update the account server with latest container info.
|
||||
Update the account server(s) with latest container info.
|
||||
|
||||
:param req: swob.Request object
|
||||
:param account: account name
|
||||
:param container: container name
|
||||
:param broker: container DB broker object
|
||||
:returns: if the account request returns a 404 error code,
|
||||
:returns: if all the account requests return a 404 error code,
|
||||
HTTPNotFound response object, otherwise None.
|
||||
"""
|
||||
account_host = req.headers.get('X-Account-Host')
|
||||
account_partition = req.headers.get('X-Account-Partition')
|
||||
account_device = req.headers.get('X-Account-Device')
|
||||
if all([account_host, account_partition, account_device]):
|
||||
account_hosts = [h.strip() for h in
|
||||
req.headers.get('X-Account-Host', '').split(',')]
|
||||
account_devices = [d.strip() for d in
|
||||
req.headers.get('X-Account-Device', '').split(',')]
|
||||
account_partition = req.headers.get('X-Account-Partition', '')
|
||||
|
||||
if len(account_hosts) != len(account_devices):
|
||||
# This shouldn't happen unless there's a bug in the proxy,
|
||||
# but if there is, we want to know about it.
|
||||
self.logger.error(_('ERROR Account update failed: different '
|
||||
'numbers of hosts and devices in request: '
|
||||
'"%s" vs "%s"' %
|
||||
(req.headers.get('X-Account-Host', ''),
|
||||
req.headers.get('X-Account-Device', ''))))
|
||||
return
|
||||
|
||||
if account_partition:
|
||||
updates = zip(account_hosts, account_devices)
|
||||
else:
|
||||
updates = []
|
||||
|
||||
account_404s = 0
|
||||
|
||||
for account_host, account_device in updates:
|
||||
account_ip, account_port = account_host.rsplit(':', 1)
|
||||
new_path = '/' + '/'.join([account, container])
|
||||
info = broker.get_info()
|
||||
@ -122,7 +143,7 @@ class ContainerController(object):
|
||||
account_response = conn.getresponse()
|
||||
account_response.read()
|
||||
if account_response.status == HTTP_NOT_FOUND:
|
||||
return HTTPNotFound(request=req)
|
||||
account_404s += 1
|
||||
elif not is_success(account_response.status):
|
||||
self.logger.error(_(
|
||||
'ERROR Account update failed '
|
||||
@ -138,7 +159,10 @@ class ContainerController(object):
|
||||
'%(ip)s:%(port)s/%(device)s (will retry later)'),
|
||||
{'ip': account_ip, 'port': account_port,
|
||||
'device': account_device})
|
||||
return None
|
||||
if updates and account_404s == len(updates):
|
||||
return HTTPNotFound(req=req)
|
||||
else:
|
||||
return None
|
||||
|
||||
@public
|
||||
@timing_stats
|
||||
|
@ -18,6 +18,7 @@
|
||||
from __future__ import with_statement
|
||||
import cPickle as pickle
|
||||
import errno
|
||||
import itertools
|
||||
import os
|
||||
import time
|
||||
import traceback
|
||||
@ -489,16 +490,34 @@ class ObjectController(object):
|
||||
:param obj: object name
|
||||
:param headers_in: dictionary of headers from the original request
|
||||
:param headers_out: dictionary of headers to send in the container
|
||||
request
|
||||
request(s)
|
||||
:param objdevice: device name that the object is in
|
||||
"""
|
||||
host = headers_in.get('X-Container-Host', None)
|
||||
partition = headers_in.get('X-Container-Partition', None)
|
||||
contdevice = headers_in.get('X-Container-Device', None)
|
||||
if not all([host, partition, contdevice]):
|
||||
conthosts = [h.strip() for h in
|
||||
headers_in.get('X-Container-Host', '').split(',')]
|
||||
contdevices = [d.strip() for d in
|
||||
headers_in.get('X-Container-Device', '').split(',')]
|
||||
contpartition = headers_in.get('X-Container-Partition', '')
|
||||
|
||||
if len(conthosts) != len(contdevices):
|
||||
# This shouldn't happen unless there's a bug in the proxy,
|
||||
# but if there is, we want to know about it.
|
||||
self.logger.error(_('ERROR Container update failed: different '
|
||||
'numbers of hosts and devices in request: '
|
||||
'"%s" vs "%s"' %
|
||||
(req.headers.get('X-Container-Host', ''),
|
||||
req.headers.get('X-Container-Device', ''))))
|
||||
return
|
||||
self.async_update(op, account, container, obj, host, partition,
|
||||
contdevice, headers_out, objdevice)
|
||||
|
||||
if contpartition:
|
||||
updates = zip(conthosts, contdevices)
|
||||
else:
|
||||
updates = []
|
||||
|
||||
for conthost, contdevice in updates:
|
||||
self.async_update(op, account, container, obj, conthost,
|
||||
contpartition, contdevice, headers_out,
|
||||
objdevice)
|
||||
|
||||
def delete_at_update(self, op, delete_at, account, container, obj,
|
||||
headers_in, objdevice):
|
||||
@ -516,22 +535,33 @@ class ObjectController(object):
|
||||
# At that time, Swift will be so popular and pervasive I will have
|
||||
# created income for thousands of future programmers.
|
||||
delete_at = max(min(delete_at, 9999999999), 0)
|
||||
host = partition = contdevice = None
|
||||
updates = [(None, None)]
|
||||
|
||||
partition = None
|
||||
hosts = contdevices = [None]
|
||||
headers_out = {'x-timestamp': headers_in['x-timestamp'],
|
||||
'x-trans-id': headers_in.get('x-trans-id', '-')}
|
||||
if op != 'DELETE':
|
||||
host = headers_in.get('X-Delete-At-Host', None)
|
||||
partition = headers_in.get('X-Delete-At-Partition', None)
|
||||
contdevice = headers_in.get('X-Delete-At-Device', None)
|
||||
hosts = headers_in.get('X-Delete-At-Host', '')
|
||||
contdevices = headers_in.get('X-Delete-At-Device', '')
|
||||
updates = [upd for upd in
|
||||
zip((h.strip() for h in hosts.split(',')),
|
||||
(c.strip() for c in contdevices.split(',')))
|
||||
if all(upd) and partition]
|
||||
if not updates:
|
||||
updates = [(None, None)]
|
||||
headers_out['x-size'] = '0'
|
||||
headers_out['x-content-type'] = 'text/plain'
|
||||
headers_out['x-etag'] = 'd41d8cd98f00b204e9800998ecf8427e'
|
||||
self.async_update(
|
||||
op, self.expiring_objects_account,
|
||||
str(delete_at / self.expiring_objects_container_divisor *
|
||||
self.expiring_objects_container_divisor),
|
||||
'%s-%s/%s/%s' % (delete_at, account, container, obj),
|
||||
host, partition, contdevice, headers_out, objdevice)
|
||||
|
||||
for host, contdevice in updates:
|
||||
self.async_update(
|
||||
op, self.expiring_objects_account,
|
||||
str(delete_at / self.expiring_objects_container_divisor *
|
||||
self.expiring_objects_container_divisor),
|
||||
'%s-%s/%s/%s' % (delete_at, account, container, obj),
|
||||
host, partition, contdevice, headers_out, objdevice)
|
||||
|
||||
@public
|
||||
@timing_stats
|
||||
|
@ -28,7 +28,7 @@ import time
|
||||
from urllib import unquote
|
||||
from random import shuffle
|
||||
|
||||
from swift.common.utils import normalize_timestamp, public
|
||||
from swift.common.utils import normalize_timestamp, public, csv_append
|
||||
from swift.common.constraints import check_metadata, MAX_CONTAINER_NAME_LENGTH
|
||||
from swift.common.http import HTTP_ACCEPTED
|
||||
from swift.proxy.controllers.base import Controller, delay_denial, \
|
||||
@ -134,16 +134,8 @@ class ContainerController(Controller):
|
||||
return HTTPNotFound(request=req)
|
||||
container_partition, containers = self.app.container_ring.get_nodes(
|
||||
self.account_name, self.container_name)
|
||||
headers = []
|
||||
for account in accounts:
|
||||
nheaders = {'X-Timestamp': normalize_timestamp(time.time()),
|
||||
'x-trans-id': self.trans_id,
|
||||
'X-Account-Host': '%(ip)s:%(port)s' % account,
|
||||
'X-Account-Partition': account_partition,
|
||||
'X-Account-Device': account['device'],
|
||||
'Connection': 'close'}
|
||||
self.transfer_headers(req.headers, nheaders)
|
||||
headers.append(nheaders)
|
||||
headers = self._backend_requests(req, len(containers),
|
||||
account_partition, accounts)
|
||||
if self.app.memcache:
|
||||
cache_key = get_container_memcache_key(self.account_name,
|
||||
self.container_name)
|
||||
@ -190,14 +182,8 @@ class ContainerController(Controller):
|
||||
return HTTPNotFound(request=req)
|
||||
container_partition, containers = self.app.container_ring.get_nodes(
|
||||
self.account_name, self.container_name)
|
||||
headers = []
|
||||
for account in accounts:
|
||||
headers.append({'X-Timestamp': normalize_timestamp(time.time()),
|
||||
'X-Trans-Id': self.trans_id,
|
||||
'X-Account-Host': '%(ip)s:%(port)s' % account,
|
||||
'X-Account-Partition': account_partition,
|
||||
'X-Account-Device': account['device'],
|
||||
'Connection': 'close'})
|
||||
headers = self._backend_requests(req, len(containers),
|
||||
account_partition, accounts)
|
||||
if self.app.memcache:
|
||||
cache_key = get_container_memcache_key(self.account_name,
|
||||
self.container_name)
|
||||
@ -209,3 +195,26 @@ class ContainerController(Controller):
|
||||
if resp.status_int == HTTP_ACCEPTED:
|
||||
return HTTPNotFound(request=req)
|
||||
return resp
|
||||
|
||||
def _backend_requests(self, req, n_outgoing,
|
||||
account_partition, accounts):
|
||||
headers = [{'Connection': 'close',
|
||||
'X-Timestamp': normalize_timestamp(time.time()),
|
||||
'x-trans-id': self.trans_id}
|
||||
for _junk in range(n_outgoing)]
|
||||
|
||||
for header in headers:
|
||||
self.transfer_headers(req.headers, header)
|
||||
|
||||
for i, account in enumerate(accounts):
|
||||
i = i % len(headers)
|
||||
|
||||
headers[i]['X-Account-Partition'] = account_partition
|
||||
headers[i]['X-Account-Host'] = csv_append(
|
||||
headers[i].get('X-Account-Host'),
|
||||
'%(ip)s:%(port)s' % account)
|
||||
headers[i]['X-Account-Device'] = csv_append(
|
||||
headers[i].get('X-Account-Device'),
|
||||
account['device'])
|
||||
|
||||
return headers
|
||||
|
@ -38,7 +38,7 @@ from eventlet.queue import Queue
|
||||
from eventlet.timeout import Timeout
|
||||
|
||||
from swift.common.utils import ContextPool, normalize_timestamp, \
|
||||
config_true_value, public, json
|
||||
config_true_value, public, json, csv_append
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.constraints import check_metadata, check_object_creation, \
|
||||
CONTAINER_LISTING_LIMIT, MAX_FILE_SIZE
|
||||
@ -487,23 +487,48 @@ class ObjectController(Controller):
|
||||
partition, nodes = self.app.object_ring.get_nodes(
|
||||
self.account_name, self.container_name, self.object_name)
|
||||
req.headers['X-Timestamp'] = normalize_timestamp(time.time())
|
||||
headers = []
|
||||
for container in containers:
|
||||
nheaders = dict(req.headers.iteritems())
|
||||
nheaders['Connection'] = 'close'
|
||||
nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container
|
||||
nheaders['X-Container-Partition'] = container_partition
|
||||
nheaders['X-Container-Device'] = container['device']
|
||||
if delete_at_nodes:
|
||||
node = delete_at_nodes.pop(0)
|
||||
nheaders['X-Delete-At-Host'] = '%(ip)s:%(port)s' % node
|
||||
nheaders['X-Delete-At-Partition'] = delete_at_part
|
||||
nheaders['X-Delete-At-Device'] = node['device']
|
||||
headers.append(nheaders)
|
||||
|
||||
headers = self._backend_requests(
|
||||
req, len(nodes), container_partition, containers,
|
||||
delete_at_part, delete_at_nodes)
|
||||
|
||||
resp = self.make_requests(req, self.app.object_ring, partition,
|
||||
'POST', req.path_info, headers)
|
||||
return resp
|
||||
|
||||
def _backend_requests(self, req, n_outgoing,
|
||||
container_partition, containers,
|
||||
delete_at_partition=None, delete_at_nodes=None):
|
||||
headers = [dict(req.headers.iteritems())
|
||||
for _junk in range(n_outgoing)]
|
||||
|
||||
for header in headers:
|
||||
header['Connection'] = 'close'
|
||||
|
||||
for i, container in enumerate(containers):
|
||||
i = i % len(headers)
|
||||
|
||||
headers[i]['X-Container-Partition'] = container_partition
|
||||
headers[i]['X-Container-Host'] = csv_append(
|
||||
headers[i].get('X-Container-Host'),
|
||||
'%(ip)s:%(port)s' % container)
|
||||
headers[i]['X-Container-Device'] = csv_append(
|
||||
headers[i].get('X-Container-Device'),
|
||||
container['device'])
|
||||
|
||||
for i, node in enumerate(delete_at_nodes or []):
|
||||
i = i % len(headers)
|
||||
|
||||
headers[i]['X-Delete-At-Partition'] = delete_at_partition
|
||||
headers[i]['X-Delete-At-Host'] = csv_append(
|
||||
headers[i].get('X-Delete-At-Host'),
|
||||
'%(ip)s:%(port)s' % node)
|
||||
headers[i]['X-Delete-At-Device'] = csv_append(
|
||||
headers[i].get('X-Delete-At-Device'),
|
||||
node['device'])
|
||||
|
||||
return headers
|
||||
|
||||
def _send_file(self, conn, path):
|
||||
"""Method for a file PUT coro"""
|
||||
while True:
|
||||
@ -719,22 +744,18 @@ class ObjectController(Controller):
|
||||
node_iter = self.iter_nodes(partition, nodes, self.app.object_ring)
|
||||
pile = GreenPile(len(nodes))
|
||||
chunked = req.headers.get('transfer-encoding')
|
||||
for container in containers:
|
||||
nheaders = dict(req.headers.iteritems())
|
||||
nheaders['Connection'] = 'close'
|
||||
nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container
|
||||
nheaders['X-Container-Partition'] = container_partition
|
||||
nheaders['X-Container-Device'] = container['device']
|
||||
|
||||
outgoing_headers = self._backend_requests(
|
||||
req, len(nodes), container_partition, containers,
|
||||
delete_at_part, delete_at_nodes)
|
||||
|
||||
for nheaders in outgoing_headers:
|
||||
# RFC2616:8.2.3 disallows 100-continue without a body
|
||||
if (req.content_length > 0) or chunked:
|
||||
nheaders['Expect'] = '100-continue'
|
||||
if delete_at_nodes:
|
||||
node = delete_at_nodes.pop(0)
|
||||
nheaders['X-Delete-At-Host'] = '%(ip)s:%(port)s' % node
|
||||
nheaders['X-Delete-At-Partition'] = delete_at_part
|
||||
nheaders['X-Delete-At-Device'] = node['device']
|
||||
pile.spawn(self._connect_put_node, node_iter, partition,
|
||||
req.path_info, nheaders, self.app.logger.thread_locals)
|
||||
|
||||
conns = [conn for conn in pile if conn]
|
||||
if len(conns) <= len(nodes) / 2:
|
||||
self.app.logger.error(
|
||||
@ -929,14 +950,9 @@ class ObjectController(Controller):
|
||||
'was %r' % req.headers['x-timestamp'])
|
||||
else:
|
||||
req.headers['X-Timestamp'] = normalize_timestamp(time.time())
|
||||
headers = []
|
||||
for container in containers:
|
||||
nheaders = dict(req.headers.iteritems())
|
||||
nheaders['Connection'] = 'close'
|
||||
nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container
|
||||
nheaders['X-Container-Partition'] = container_partition
|
||||
nheaders['X-Container-Device'] = container['device']
|
||||
headers.append(nheaders)
|
||||
|
||||
headers = self._backend_requests(
|
||||
req, len(nodes), container_partition, containers)
|
||||
resp = self.make_requests(req, self.app.object_ring,
|
||||
partition, 'DELETE', req.path_info, headers)
|
||||
return resp
|
||||
|
@ -13,6 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import operator
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
@ -1079,6 +1080,77 @@ class TestContainerController(unittest.TestCase):
|
||||
self.assertEquals(resp.content_type, 'application/xml')
|
||||
self.assertEquals(resp.charset, 'utf-8')
|
||||
|
||||
def test_updating_multiple_container_servers(self):
|
||||
http_connect_args = []
|
||||
def fake_http_connect(ipaddr, port, device, partition, method, path,
|
||||
headers=None, query_string=None, ssl=False):
|
||||
class SuccessfulFakeConn(object):
|
||||
@property
|
||||
def status(self):
|
||||
return 200
|
||||
|
||||
def getresponse(self):
|
||||
return self
|
||||
|
||||
def read(self):
|
||||
return ''
|
||||
|
||||
captured_args = {'ipaddr': ipaddr, 'port': port,
|
||||
'device': device, 'partition': partition,
|
||||
'method': method, 'path': path, 'ssl': ssl,
|
||||
'headers': headers, 'query_string': query_string}
|
||||
|
||||
http_connect_args.append(
|
||||
dict((k,v) for k,v in captured_args.iteritems()
|
||||
if v is not None))
|
||||
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': '12345',
|
||||
'X-Account-Partition': '30',
|
||||
'X-Account-Host': '1.2.3.4:5, 6.7.8.9:10',
|
||||
'X-Account-Device': 'sdb1, sdf1'})
|
||||
|
||||
orig_http_connect = container_server.http_connect
|
||||
try:
|
||||
container_server.http_connect = fake_http_connect
|
||||
self.controller.PUT(req)
|
||||
finally:
|
||||
container_server.http_connect = orig_http_connect
|
||||
|
||||
http_connect_args.sort(key=operator.itemgetter('ipaddr'))
|
||||
|
||||
self.assertEquals(len(http_connect_args), 2)
|
||||
self.assertEquals(
|
||||
http_connect_args[0],
|
||||
{'ipaddr': '1.2.3.4',
|
||||
'port': '5',
|
||||
'path': '/a/c',
|
||||
'device': 'sdb1',
|
||||
'partition': '30',
|
||||
'method': 'PUT',
|
||||
'ssl': False,
|
||||
'headers': {'x-bytes-used': 0,
|
||||
'x-delete-timestamp': '0',
|
||||
'x-object-count': 0,
|
||||
'x-put-timestamp': '0000012345.00000',
|
||||
'x-trans-id': '-'}})
|
||||
self.assertEquals(
|
||||
http_connect_args[1],
|
||||
{'ipaddr': '6.7.8.9',
|
||||
'port': '10',
|
||||
'path': '/a/c',
|
||||
'device': 'sdf1',
|
||||
'partition': '30',
|
||||
'method': 'PUT',
|
||||
'ssl': False,
|
||||
'headers': {'x-bytes-used': 0,
|
||||
'x-delete-timestamp': '0',
|
||||
'x-object-count': 0,
|
||||
'x-put-timestamp': '0000012345.00000',
|
||||
'x-trans-id': '-'}})
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -16,6 +16,7 @@
|
||||
""" Tests for swift.object_server """
|
||||
|
||||
import cPickle as pickle
|
||||
import operator
|
||||
import os
|
||||
import unittest
|
||||
import email
|
||||
@ -1635,6 +1636,177 @@ class TestObjectController(unittest.TestCase):
|
||||
self.assertEquals(given_args, ['127.0.0.1', '1234', 'sdc1', 1, 'PUT',
|
||||
'/a/c/o', {'x-timestamp': '1', 'x-out': 'set'}])
|
||||
|
||||
|
||||
def test_updating_multiple_delete_at_container_servers(self):
|
||||
self.object_controller.expiring_objects_account = 'exp'
|
||||
self.object_controller.expiring_objects_container_divisor = 60
|
||||
|
||||
http_connect_args = []
|
||||
def fake_http_connect(ipaddr, port, device, partition, method, path,
|
||||
headers=None, query_string=None, ssl=False):
|
||||
class SuccessfulFakeConn(object):
|
||||
@property
|
||||
def status(self):
|
||||
return 200
|
||||
|
||||
def getresponse(self):
|
||||
return self
|
||||
|
||||
def read(self):
|
||||
return ''
|
||||
|
||||
captured_args = {'ipaddr': ipaddr, 'port': port,
|
||||
'device': device, 'partition': partition,
|
||||
'method': method, 'path': path, 'ssl': ssl,
|
||||
'headers': headers, 'query_string': query_string}
|
||||
|
||||
http_connect_args.append(
|
||||
dict((k,v) for k,v in captured_args.iteritems()
|
||||
if v is not None))
|
||||
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': '12345',
|
||||
'Content-Type': 'application/burrito',
|
||||
'Content-Length': '0',
|
||||
'X-Container-Partition': '20',
|
||||
'X-Container-Host': '1.2.3.4:5',
|
||||
'X-Container-Device': 'sdb1',
|
||||
'X-Delete-At': 9999999999,
|
||||
'X-Delete-At-Host': "10.1.1.1:6001,10.2.2.2:6002",
|
||||
'X-Delete-At-Partition': '6237',
|
||||
'X-Delete-At-Device': 'sdp,sdq'})
|
||||
|
||||
orig_http_connect = object_server.http_connect
|
||||
try:
|
||||
object_server.http_connect = fake_http_connect
|
||||
resp = self.object_controller.PUT(req)
|
||||
finally:
|
||||
object_server.http_connect = orig_http_connect
|
||||
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
|
||||
http_connect_args.sort(key=operator.itemgetter('ipaddr'))
|
||||
|
||||
self.assertEquals(len(http_connect_args), 3)
|
||||
self.assertEquals(
|
||||
http_connect_args[0],
|
||||
{'ipaddr': '1.2.3.4',
|
||||
'port': '5',
|
||||
'path': '/a/c/o',
|
||||
'device': 'sdb1',
|
||||
'partition': '20',
|
||||
'method': 'PUT',
|
||||
'ssl': False,
|
||||
'headers': {'x-content-type': 'application/burrito',
|
||||
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
|
||||
'x-size': '0',
|
||||
'x-timestamp': '12345',
|
||||
'x-trans-id': '-'}})
|
||||
self.assertEquals(
|
||||
http_connect_args[1],
|
||||
{'ipaddr': '10.1.1.1',
|
||||
'port': '6001',
|
||||
'path': '/exp/9999999960/9999999999-a/c/o',
|
||||
'device': 'sdp',
|
||||
'partition': '6237',
|
||||
'method': 'PUT',
|
||||
'ssl': False,
|
||||
'headers': {'x-content-type': 'text/plain',
|
||||
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
|
||||
'x-size': '0',
|
||||
'x-timestamp': '12345',
|
||||
'x-trans-id': '-'}})
|
||||
self.assertEquals(
|
||||
http_connect_args[2],
|
||||
{'ipaddr': '10.2.2.2',
|
||||
'port': '6002',
|
||||
'path': '/exp/9999999960/9999999999-a/c/o',
|
||||
'device': 'sdq',
|
||||
'partition': '6237',
|
||||
'method': 'PUT',
|
||||
'ssl': False,
|
||||
'headers': {'x-content-type': 'text/plain',
|
||||
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
|
||||
'x-size': '0',
|
||||
'x-timestamp': '12345',
|
||||
'x-trans-id': '-'}})
|
||||
|
||||
def test_updating_multiple_container_servers(self):
|
||||
http_connect_args = []
|
||||
def fake_http_connect(ipaddr, port, device, partition, method, path,
|
||||
headers=None, query_string=None, ssl=False):
|
||||
class SuccessfulFakeConn(object):
|
||||
@property
|
||||
def status(self):
|
||||
return 200
|
||||
|
||||
def getresponse(self):
|
||||
return self
|
||||
|
||||
def read(self):
|
||||
return ''
|
||||
|
||||
captured_args = {'ipaddr': ipaddr, 'port': port,
|
||||
'device': device, 'partition': partition,
|
||||
'method': method, 'path': path, 'ssl': ssl,
|
||||
'headers': headers, 'query_string': query_string}
|
||||
|
||||
http_connect_args.append(
|
||||
dict((k,v) for k,v in captured_args.iteritems()
|
||||
if v is not None))
|
||||
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': '12345',
|
||||
'Content-Type': 'application/burrito',
|
||||
'Content-Length': '0',
|
||||
'X-Container-Partition': '20',
|
||||
'X-Container-Host': '1.2.3.4:5, 6.7.8.9:10',
|
||||
'X-Container-Device': 'sdb1, sdf1'})
|
||||
|
||||
orig_http_connect = object_server.http_connect
|
||||
try:
|
||||
object_server.http_connect = fake_http_connect
|
||||
self.object_controller.PUT(req)
|
||||
finally:
|
||||
object_server.http_connect = orig_http_connect
|
||||
|
||||
http_connect_args.sort(key=operator.itemgetter('ipaddr'))
|
||||
|
||||
self.assertEquals(len(http_connect_args), 2)
|
||||
self.assertEquals(
|
||||
http_connect_args[0],
|
||||
{'ipaddr': '1.2.3.4',
|
||||
'port': '5',
|
||||
'path': '/a/c/o',
|
||||
'device': 'sdb1',
|
||||
'partition': '20',
|
||||
'method': 'PUT',
|
||||
'ssl': False,
|
||||
'headers': {'x-content-type': 'application/burrito',
|
||||
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
|
||||
'x-size': '0',
|
||||
'x-timestamp': '12345',
|
||||
'x-trans-id': '-'}})
|
||||
self.assertEquals(
|
||||
http_connect_args[1],
|
||||
{'ipaddr': '6.7.8.9',
|
||||
'port': '10',
|
||||
'path': '/a/c/o',
|
||||
'device': 'sdf1',
|
||||
'partition': '20',
|
||||
'method': 'PUT',
|
||||
'ssl': False,
|
||||
'headers': {'x-content-type': 'application/burrito',
|
||||
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
|
||||
'x-size': '0',
|
||||
'x-timestamp': '12345',
|
||||
'x-trans-id': '-'}})
|
||||
|
||||
def test_async_update_saves_on_exception(self):
|
||||
|
||||
def fake_http_connect(*args):
|
||||
|
@ -315,27 +315,35 @@ def fake_http_connect(*code_iter, **kwargs):
|
||||
|
||||
class FakeRing(object):
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, replicas=3):
|
||||
# 9 total nodes (6 more past the initial 3) is the cap, no matter if
|
||||
# this is set higher.
|
||||
# this is set higher, or R^2 for R replicas
|
||||
self.replicas = replicas
|
||||
self.max_more_nodes = 0
|
||||
self.devs = {}
|
||||
|
||||
def set_replicas(self, replicas):
|
||||
self.replicas = replicas
|
||||
self.devs = {}
|
||||
|
||||
def get_nodes(self, account, container=None, obj=None):
|
||||
devs = []
|
||||
for x in xrange(3):
|
||||
for x in xrange(self.replicas):
|
||||
devs.append(self.devs.get(x))
|
||||
if devs[x] is None:
|
||||
self.devs[x] = devs[x] = \
|
||||
{'ip': '10.0.0.%s' % x, 'port': 1000 + x, 'device': 'sda'}
|
||||
{'ip': '10.0.0.%s' % x,
|
||||
'port': 1000 + x,
|
||||
'device': 'sd' + (chr(ord('a') + x))}
|
||||
return 1, devs
|
||||
|
||||
def get_part_nodes(self, part):
|
||||
return self.get_nodes('blah')[1]
|
||||
|
||||
def get_more_nodes(self, nodes):
|
||||
# 9 is the true cap
|
||||
for x in xrange(3, min(3 + self.max_more_nodes, 9)):
|
||||
# replicas^2 is the true cap
|
||||
for x in xrange(self.replicas, min(self.replicas + self.max_more_nodes,
|
||||
self.replicas * self.replicas)):
|
||||
yield {'ip': '10.0.0.%s' % x, 'port': 1000 + x, 'device': 'sda'}
|
||||
|
||||
|
||||
@ -754,6 +762,11 @@ class TestObjectController(unittest.TestCase):
|
||||
object_ring=FakeRing())
|
||||
monkey_patch_mimetools()
|
||||
|
||||
def tearDown(self):
|
||||
self.app.account_ring.set_replicas(3)
|
||||
self.app.container_ring.set_replicas(3)
|
||||
self.app.object_ring.set_replicas(3)
|
||||
|
||||
def assert_status_map(self, method, statuses, expected, raise_exc=False):
|
||||
with save_globals():
|
||||
kwargs = {}
|
||||
@ -3829,6 +3842,187 @@ class TestObjectController(unittest.TestCase):
|
||||
'x-trans-id', 'x-object-meta-color'])
|
||||
self.assertEquals(expected_exposed, exposed)
|
||||
|
||||
def _gather_x_container_headers(self, controller_call, req, *connect_args,
|
||||
**kwargs):
|
||||
header_list = kwargs.pop('header_list', ['X-Container-Partition',
|
||||
'X-Container-Host',
|
||||
'X-Container-Device'])
|
||||
seen_headers = []
|
||||
def capture_headers(ipaddr, port, device, partition, method,
|
||||
path, headers=None, query_string=None):
|
||||
captured = {}
|
||||
for header in header_list:
|
||||
captured[header] = headers.get(header)
|
||||
seen_headers.append(captured)
|
||||
|
||||
with save_globals():
|
||||
self.app.allow_account_management = True
|
||||
|
||||
set_http_connect(*connect_args, give_connect=capture_headers,
|
||||
**kwargs)
|
||||
resp = controller_call(req)
|
||||
self.assertEqual(2, resp.status_int // 100) # sanity check
|
||||
|
||||
# don't care about the account/container HEADs, so chuck
|
||||
# the first two requests
|
||||
return sorted(seen_headers[2:],
|
||||
key=lambda d: d.get(header_list[0]) or 'Z')
|
||||
|
||||
def test_PUT_x_container_headers_with_equal_replicas(self):
|
||||
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'Content-Length': '5'}, body='12345')
|
||||
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
|
||||
seen_headers = self._gather_x_container_headers(
|
||||
controller.PUT, req,
|
||||
200, 200, 201, 201, 201) # HEAD HEAD PUT PUT PUT
|
||||
self.assertEqual(seen_headers, [
|
||||
{'X-Container-Host': '10.0.0.0:1000',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sda'},
|
||||
{'X-Container-Host': '10.0.0.1:1001',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sdb'},
|
||||
{'X-Container-Host': '10.0.0.2:1002',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sdc'}])
|
||||
|
||||
def test_PUT_x_container_headers_with_fewer_container_replicas(self):
|
||||
self.app.container_ring.set_replicas(2)
|
||||
|
||||
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'Content-Length': '5'}, body='12345')
|
||||
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
|
||||
seen_headers = self._gather_x_container_headers(
|
||||
controller.PUT, req,
|
||||
200, 200, 201, 201, 201) # HEAD HEAD PUT PUT PUT
|
||||
|
||||
self.assertEqual(seen_headers, [
|
||||
{'X-Container-Host': '10.0.0.0:1000',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sda'},
|
||||
{'X-Container-Host': '10.0.0.1:1001',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sdb'},
|
||||
{'X-Container-Host': None,
|
||||
'X-Container-Partition': None,
|
||||
'X-Container-Device': None}])
|
||||
|
||||
def test_PUT_x_container_headers_with_more_container_replicas(self):
|
||||
self.app.container_ring.set_replicas(4)
|
||||
|
||||
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'Content-Length': '5'}, body='12345')
|
||||
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
|
||||
seen_headers = self._gather_x_container_headers(
|
||||
controller.PUT, req,
|
||||
200, 200, 201, 201, 201) # HEAD HEAD PUT PUT PUT
|
||||
|
||||
self.assertEqual(seen_headers, [
|
||||
{'X-Container-Host': '10.0.0.0:1000,10.0.0.3:1003',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sda,sdd'},
|
||||
{'X-Container-Host': '10.0.0.1:1001',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sdb'},
|
||||
{'X-Container-Host': '10.0.0.2:1002',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sdc'}])
|
||||
|
||||
def test_POST_x_container_headers_with_more_container_replicas(self):
|
||||
self.app.container_ring.set_replicas(4)
|
||||
self.app.object_post_as_copy = False
|
||||
|
||||
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'Content-Type': 'application/stuff'})
|
||||
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
|
||||
seen_headers = self._gather_x_container_headers(
|
||||
controller.POST, req,
|
||||
200, 200, 200, 200, 200) # HEAD HEAD POST POST POST
|
||||
|
||||
self.assertEqual(seen_headers, [
|
||||
{'X-Container-Host': '10.0.0.0:1000,10.0.0.3:1003',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sda,sdd'},
|
||||
{'X-Container-Host': '10.0.0.1:1001',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sdb'},
|
||||
{'X-Container-Host': '10.0.0.2:1002',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sdc'}])
|
||||
|
||||
def test_DELETE_x_container_headers_with_more_container_replicas(self):
|
||||
self.app.container_ring.set_replicas(4)
|
||||
|
||||
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'DELETE'},
|
||||
headers={'Content-Type': 'application/stuff'})
|
||||
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
|
||||
seen_headers = self._gather_x_container_headers(
|
||||
controller.DELETE, req,
|
||||
200, 200, 200, 200, 200) # HEAD HEAD DELETE DELETE DELETE
|
||||
|
||||
self.assertEqual(seen_headers, [
|
||||
{'X-Container-Host': '10.0.0.0:1000,10.0.0.3:1003',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sda,sdd'},
|
||||
{'X-Container-Host': '10.0.0.1:1001',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sdb'},
|
||||
{'X-Container-Host': '10.0.0.2:1002',
|
||||
'X-Container-Partition': 1,
|
||||
'X-Container-Device': 'sdc'}])
|
||||
|
||||
def test_PUT_x_delete_at_with_fewer_container_replicas(self):
|
||||
self.app.container_ring.set_replicas(2)
|
||||
|
||||
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'Content-Type': 'application/stuff',
|
||||
'Content-Length': '0',
|
||||
'X-Delete-At': int(time()) + 100000})
|
||||
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
|
||||
seen_headers = self._gather_x_container_headers(
|
||||
controller.PUT, req,
|
||||
200, 200, 201, 201, 201, # HEAD HEAD PUT PUT PUT
|
||||
header_list=('X-Delete-At-Host', 'X-Delete-At-Device',
|
||||
'X-Delete-At-Partition'))
|
||||
|
||||
self.assertEqual(seen_headers, [
|
||||
{'X-Delete-At-Host': '10.0.0.0:1000',
|
||||
'X-Delete-At-Partition': 1,
|
||||
'X-Delete-At-Device': 'sda'},
|
||||
{'X-Delete-At-Host': '10.0.0.1:1001',
|
||||
'X-Delete-At-Partition': 1,
|
||||
'X-Delete-At-Device': 'sdb'},
|
||||
{'X-Delete-At-Host': None,
|
||||
'X-Delete-At-Partition': None,
|
||||
'X-Delete-At-Device': None}])
|
||||
|
||||
|
||||
def test_PUT_x_delete_at_with_more_container_replicas(self):
|
||||
self.app.container_ring.set_replicas(4)
|
||||
self.app.expiring_objects_account = 'expires'
|
||||
self.app.expiring_objects_container_divisor = 60
|
||||
|
||||
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'Content-Type': 'application/stuff',
|
||||
'Content-Length': 0,
|
||||
'X-Delete-At': int(time()) + 100000})
|
||||
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
|
||||
seen_headers = self._gather_x_container_headers(
|
||||
controller.PUT, req,
|
||||
200, 200, 201, 201, 201, # HEAD HEAD PUT PUT PUT
|
||||
header_list=('X-Delete-At-Host', 'X-Delete-At-Device',
|
||||
'X-Delete-At-Partition'))
|
||||
self.assertEqual(seen_headers, [
|
||||
{'X-Delete-At-Host': '10.0.0.0:1000,10.0.0.3:1003',
|
||||
'X-Delete-At-Partition': 1,
|
||||
'X-Delete-At-Device': 'sda,sdd'},
|
||||
{'X-Delete-At-Host': '10.0.0.1:1001',
|
||||
'X-Delete-At-Partition': 1,
|
||||
'X-Delete-At-Device': 'sdb'},
|
||||
{'X-Delete-At-Host': '10.0.0.2:1002',
|
||||
'X-Delete-At-Partition': 1,
|
||||
'X-Delete-At-Device': 'sdc'}])
|
||||
|
||||
|
||||
class TestContainerController(unittest.TestCase):
|
||||
"Test swift.proxy_server.ContainerController"
|
||||
@ -4501,6 +4695,108 @@ class TestContainerController(unittest.TestCase):
|
||||
'x-trans-id', 'x-container-meta-color'])
|
||||
self.assertEquals(expected_exposed, exposed)
|
||||
|
||||
def _gather_x_account_headers(self, controller_call, req, *connect_args,
|
||||
**kwargs):
|
||||
seen_headers = []
|
||||
to_capture = ('X-Account-Partition', 'X-Account-Host',
|
||||
'X-Account-Device')
|
||||
|
||||
def capture_headers(ipaddr, port, device, partition, method,
|
||||
path, headers=None, query_string=None):
|
||||
captured = {}
|
||||
for header in to_capture:
|
||||
captured[header] = headers.get(header)
|
||||
seen_headers.append(captured)
|
||||
|
||||
with save_globals():
|
||||
self.app.allow_account_management = True
|
||||
|
||||
set_http_connect(*connect_args, give_connect=capture_headers,
|
||||
**kwargs)
|
||||
resp = controller_call(req)
|
||||
self.assertEqual(2, resp.status_int // 100) # sanity check
|
||||
|
||||
# don't care about the account HEAD, so throw away the
|
||||
# first element
|
||||
return sorted(seen_headers[1:],
|
||||
key=lambda d: d['X-Account-Host'] or 'Z')
|
||||
|
||||
def test_PUT_x_account_headers_with_fewer_account_replicas(self):
|
||||
self.app.account_ring.set_replicas(2)
|
||||
req = Request.blank('/a/c', headers={'': ''})
|
||||
controller = proxy_server.ContainerController(self.app, 'a', 'c')
|
||||
|
||||
seen_headers = self._gather_x_account_headers(
|
||||
controller.PUT, req,
|
||||
200, 201, 201, 201) # HEAD PUT PUT PUT
|
||||
self.assertEqual(seen_headers, [
|
||||
{'X-Account-Host': '10.0.0.0:1000',
|
||||
'X-Account-Partition': 1,
|
||||
'X-Account-Device': 'sda'},
|
||||
{'X-Account-Host': '10.0.0.1:1001',
|
||||
'X-Account-Partition': 1,
|
||||
'X-Account-Device': 'sdb'},
|
||||
{'X-Account-Host': None,
|
||||
'X-Account-Partition': None,
|
||||
'X-Account-Device': None}])
|
||||
|
||||
def test_PUT_x_account_headers_with_more_account_replicas(self):
|
||||
self.app.account_ring.set_replicas(4)
|
||||
req = Request.blank('/a/c', headers={'': ''})
|
||||
controller = proxy_server.ContainerController(self.app, 'a', 'c')
|
||||
|
||||
seen_headers = self._gather_x_account_headers(
|
||||
controller.PUT, req,
|
||||
200, 201, 201, 201) # HEAD PUT PUT PUT
|
||||
self.assertEqual(seen_headers, [
|
||||
{'X-Account-Host': '10.0.0.0:1000,10.0.0.3:1003',
|
||||
'X-Account-Partition': 1,
|
||||
'X-Account-Device': 'sda,sdd'},
|
||||
{'X-Account-Host': '10.0.0.1:1001',
|
||||
'X-Account-Partition': 1,
|
||||
'X-Account-Device': 'sdb'},
|
||||
{'X-Account-Host': '10.0.0.2:1002',
|
||||
'X-Account-Partition': 1,
|
||||
'X-Account-Device': 'sdc'}])
|
||||
|
||||
def test_DELETE_x_account_headers_with_fewer_account_replicas(self):
|
||||
self.app.account_ring.set_replicas(2)
|
||||
req = Request.blank('/a/c', headers={'': ''})
|
||||
controller = proxy_server.ContainerController(self.app, 'a', 'c')
|
||||
|
||||
seen_headers = self._gather_x_account_headers(
|
||||
controller.DELETE, req,
|
||||
200, 204, 204, 204) # HEAD DELETE DELETE DELETE
|
||||
self.assertEqual(seen_headers, [
|
||||
{'X-Account-Host': '10.0.0.0:1000',
|
||||
'X-Account-Partition': 1,
|
||||
'X-Account-Device': 'sda'},
|
||||
{'X-Account-Host': '10.0.0.1:1001',
|
||||
'X-Account-Partition': 1,
|
||||
'X-Account-Device': 'sdb'},
|
||||
{'X-Account-Host': None,
|
||||
'X-Account-Partition': None,
|
||||
'X-Account-Device': None}])
|
||||
|
||||
def test_DELETE_x_account_headers_with_more_account_replicas(self):
|
||||
self.app.account_ring.set_replicas(4)
|
||||
req = Request.blank('/a/c', headers={'': ''})
|
||||
controller = proxy_server.ContainerController(self.app, 'a', 'c')
|
||||
|
||||
seen_headers = self._gather_x_account_headers(
|
||||
controller.DELETE, req,
|
||||
200, 204, 204, 204) # HEAD DELETE DELETE DELETE
|
||||
self.assertEqual(seen_headers, [
|
||||
{'X-Account-Host': '10.0.0.0:1000,10.0.0.3:1003',
|
||||
'X-Account-Partition': 1,
|
||||
'X-Account-Device': 'sda,sdd'},
|
||||
{'X-Account-Host': '10.0.0.1:1001',
|
||||
'X-Account-Partition': 1,
|
||||
'X-Account-Device': 'sdb'},
|
||||
{'X-Account-Host': '10.0.0.2:1002',
|
||||
'X-Account-Partition': 1,
|
||||
'X-Account-Device': 'sdc'}])
|
||||
|
||||
|
||||
class TestAccountController(unittest.TestCase):
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user