Merge "do container listing updates in another (green)thread"
This commit is contained in:
commit
4e92d5e7b7
@ -129,6 +129,8 @@ Logging address. The default is /dev/log.
|
||||
Request timeout to external services. The default is 3 seconds.
|
||||
.IP \fBconn_timeout\fR
|
||||
Connection timeout to external services. The default is 0.5 seconds.
|
||||
.IP \fBcontainer_update_timeout\fR
|
||||
Request timeout to do a container update on an object update. The default is 1 second.
|
||||
.RE
|
||||
.PD
|
||||
|
||||
|
@ -412,76 +412,86 @@ The following configuration options are available:
|
||||
|
||||
[DEFAULT]
|
||||
|
||||
=================== ========== =============================================
|
||||
Option Default Description
|
||||
------------------- ---------- ---------------------------------------------
|
||||
swift_dir /etc/swift Swift configuration directory
|
||||
devices /srv/node Parent directory of where devices are mounted
|
||||
mount_check true Whether or not check if the devices are
|
||||
mounted to prevent accidentally writing
|
||||
to the root device
|
||||
bind_ip 0.0.0.0 IP Address for server to bind to
|
||||
bind_port 6000 Port for server to bind to
|
||||
bind_timeout 30 Seconds to attempt bind before giving up
|
||||
workers auto Override the number of pre-forked workers
|
||||
that will accept connections. If set it
|
||||
should be an integer, zero means no fork. If
|
||||
unset, it will try to default to the number
|
||||
of effective cpu cores and fallback to one.
|
||||
Increasing the number of workers helps slow
|
||||
filesystem operations in one request from
|
||||
negatively impacting other requests, but only
|
||||
the :ref:`servers_per_port
|
||||
<server-per-port-configuration>`
|
||||
option provides complete I/O isolation with
|
||||
no measurable overhead.
|
||||
servers_per_port 0 If each disk in each storage policy ring has
|
||||
unique port numbers for its "ip" value, you
|
||||
can use this setting to have each
|
||||
object-server worker only service requests
|
||||
for the single disk matching the port in the
|
||||
ring. The value of this setting determines
|
||||
how many worker processes run for each port
|
||||
(disk) in the ring. If you have 24 disks
|
||||
per server, and this setting is 4, then
|
||||
each storage node will have 1 + (24 * 4) =
|
||||
97 total object-server processes running.
|
||||
This gives complete I/O isolation, drastically
|
||||
reducing the impact of slow disks on storage
|
||||
node performance. The object-replicator and
|
||||
object-reconstructor need to see this setting
|
||||
too, so it must be in the [DEFAULT] section.
|
||||
See :ref:`server-per-port-configuration`.
|
||||
max_clients 1024 Maximum number of clients one worker can
|
||||
process simultaneously (it will actually
|
||||
accept(2) N + 1). Setting this to one (1)
|
||||
will only handle one request at a time,
|
||||
without accepting another request
|
||||
concurrently.
|
||||
disable_fallocate false Disable "fast fail" fallocate checks if the
|
||||
underlying filesystem does not support it.
|
||||
log_max_line_length 0 Caps the length of log lines to the
|
||||
value given; no limit if set to 0, the
|
||||
default.
|
||||
log_custom_handlers None Comma-separated list of functions to call
|
||||
to setup custom log handlers.
|
||||
eventlet_debug false If true, turn on debug logging for eventlet
|
||||
fallocate_reserve 0 You can set fallocate_reserve to the number of
|
||||
bytes you'd like fallocate to reserve, whether
|
||||
there is space for the given file size or not.
|
||||
This is useful for systems that behave badly
|
||||
when they completely run out of space; you can
|
||||
make the services pretend they're out of space
|
||||
early.
|
||||
conn_timeout 0.5 Time to wait while attempting to connect to
|
||||
another backend node.
|
||||
node_timeout 3 Time to wait while sending each chunk of data
|
||||
to another backend node.
|
||||
client_timeout 60 Time to wait while receiving each chunk of
|
||||
data from a client or another backend node.
|
||||
network_chunk_size 65536 Size of chunks to read/write over the network
|
||||
disk_chunk_size 65536 Size of chunks to read/write to disk
|
||||
=================== ========== =============================================
|
||||
======================== ========== ==========================================
|
||||
Option Default Description
|
||||
------------------------ ---------- ------------------------------------------
|
||||
swift_dir /etc/swift Swift configuration directory
|
||||
devices /srv/node Parent directory of where devices are
|
||||
mounted
|
||||
mount_check true Whether or not check if the devices are
|
||||
mounted to prevent accidentally writing
|
||||
to the root device
|
||||
bind_ip 0.0.0.0 IP Address for server to bind to
|
||||
bind_port 6000 Port for server to bind to
|
||||
bind_timeout 30 Seconds to attempt bind before giving up
|
||||
workers auto Override the number of pre-forked workers
|
||||
that will accept connections. If set it
|
||||
should be an integer, zero means no fork.
|
||||
If unset, it will try to default to the
|
||||
number of effective cpu cores and fallback
|
||||
to one. Increasing the number of workers
|
||||
helps slow filesystem operations in one
|
||||
request from negatively impacting other
|
||||
requests, but only the
|
||||
:ref:`servers_per_port
|
||||
<server-per-port-configuration>` option
|
||||
provides complete I/O isolation with no
|
||||
measurable overhead.
|
||||
servers_per_port 0 If each disk in each storage policy ring
|
||||
has unique port numbers for its "ip"
|
||||
value, you can use this setting to have
|
||||
each object-server worker only service
|
||||
requests for the single disk matching the
|
||||
port in the ring. The value of this
|
||||
setting determines how many worker
|
||||
processes run for each port (disk) in the
|
||||
ring. If you have 24 disks per server, and
|
||||
this setting is 4, then each storage node
|
||||
will have 1 + (24 * 4) = 97 total
|
||||
object-server processes running. This
|
||||
gives complete I/O isolation, drastically
|
||||
reducing the impact of slow disks on
|
||||
storage node performance. The
|
||||
object-replicator and object-reconstructor
|
||||
need to see this setting too, so it must
|
||||
be in the [DEFAULT] section.
|
||||
See :ref:`server-per-port-configuration`.
|
||||
max_clients 1024 Maximum number of clients one worker can
|
||||
process simultaneously (it will actually
|
||||
accept(2) N + 1). Setting this to one (1)
|
||||
will only handle one request at a time,
|
||||
without accepting another request
|
||||
concurrently.
|
||||
disable_fallocate false Disable "fast fail" fallocate checks if
|
||||
the underlying filesystem does not support
|
||||
it.
|
||||
log_max_line_length 0 Caps the length of log lines to the
|
||||
value given; no limit if set to 0, the
|
||||
default.
|
||||
log_custom_handlers None Comma-separated list of functions to call
|
||||
to setup custom log handlers.
|
||||
eventlet_debug false If true, turn on debug logging for
|
||||
eventlet
|
||||
fallocate_reserve 0 You can set fallocate_reserve to the
|
||||
number of bytes you'd like fallocate to
|
||||
reserve, whether there is space for the
|
||||
given file size or not. This is useful for
|
||||
systems that behave badly when they
|
||||
completely run out of space; you can
|
||||
make the services pretend they're out of
|
||||
space early.
|
||||
conn_timeout 0.5 Time to wait while attempting to connect
|
||||
to another backend node.
|
||||
node_timeout 3 Time to wait while sending each chunk of
|
||||
data to another backend node.
|
||||
client_timeout 60 Time to wait while receiving each chunk of
|
||||
data from a client or another backend node
|
||||
network_chunk_size 65536 Size of chunks to read/write over the
|
||||
network
|
||||
disk_chunk_size 65536 Size of chunks to read/write to disk
|
||||
container_update_timeout 1 Time to wait while sending a container
|
||||
update on object update.
|
||||
======================== ========== ==========================================
|
||||
|
||||
.. _object-server-options:
|
||||
|
||||
|
@ -60,6 +60,8 @@ bind_port = 6000
|
||||
# conn_timeout = 0.5
|
||||
# Time to wait while sending each chunk of data to another backend node.
|
||||
# node_timeout = 3
|
||||
# Time to wait while sending a container update on object update.
|
||||
# container_update_timeout = 1.0
|
||||
# Time to wait while receiving each chunk of data from a client or another
|
||||
# backend node.
|
||||
# client_timeout = 60
|
||||
|
@ -28,6 +28,7 @@ from swift import gettext_ as _
|
||||
from hashlib import md5
|
||||
|
||||
from eventlet import sleep, wsgi, Timeout
|
||||
from eventlet.greenthread import spawn
|
||||
|
||||
from swift.common.utils import public, get_logger, \
|
||||
config_true_value, timing_stats, replication, \
|
||||
@ -108,7 +109,9 @@ class ObjectController(BaseStorageServer):
|
||||
"""
|
||||
super(ObjectController, self).__init__(conf)
|
||||
self.logger = logger or get_logger(conf, log_route='object-server')
|
||||
self.node_timeout = int(conf.get('node_timeout', 3))
|
||||
self.node_timeout = float(conf.get('node_timeout', 3))
|
||||
self.container_update_timeout = float(
|
||||
conf.get('container_update_timeout', 1))
|
||||
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
|
||||
self.client_timeout = int(conf.get('client_timeout', 60))
|
||||
self.disk_chunk_size = int(conf.get('disk_chunk_size', 65536))
|
||||
@ -198,7 +201,8 @@ class ObjectController(BaseStorageServer):
|
||||
device, partition, account, container, obj, policy, **kwargs)
|
||||
|
||||
def async_update(self, op, account, container, obj, host, partition,
|
||||
contdevice, headers_out, objdevice, policy):
|
||||
contdevice, headers_out, objdevice, policy,
|
||||
logger_thread_locals=None):
|
||||
"""
|
||||
Sends or saves an async update.
|
||||
|
||||
@ -213,7 +217,12 @@ class ObjectController(BaseStorageServer):
|
||||
request
|
||||
:param objdevice: device name that the object is in
|
||||
:param policy: the associated BaseStoragePolicy instance
|
||||
:param logger_thread_locals: The thread local values to be set on the
|
||||
self.logger to retain transaction
|
||||
logging information.
|
||||
"""
|
||||
if logger_thread_locals:
|
||||
self.logger.thread_locals = logger_thread_locals
|
||||
headers_out['user-agent'] = 'object-server %s' % os.getpid()
|
||||
full_path = '/%s/%s/%s' % (account, container, obj)
|
||||
if all([host, partition, contdevice]):
|
||||
@ -285,10 +294,28 @@ class ObjectController(BaseStorageServer):
|
||||
headers_out['x-trans-id'] = headers_in.get('x-trans-id', '-')
|
||||
headers_out['referer'] = request.as_referer()
|
||||
headers_out['X-Backend-Storage-Policy-Index'] = int(policy)
|
||||
update_greenthreads = []
|
||||
for conthost, contdevice in updates:
|
||||
self.async_update(op, account, container, obj, conthost,
|
||||
contpartition, contdevice, headers_out,
|
||||
objdevice, policy)
|
||||
gt = spawn(self.async_update, op, account, container, obj,
|
||||
conthost, contpartition, contdevice, headers_out,
|
||||
objdevice, policy,
|
||||
logger_thread_locals=self.logger.thread_locals)
|
||||
update_greenthreads.append(gt)
|
||||
# Wait a little bit to see if the container updates are successful.
|
||||
# If we immediately return after firing off the greenthread above, then
|
||||
# we're more likely to confuse the end-user who does a listing right
|
||||
# after getting a successful response to the object create. The
|
||||
# `container_update_timeout` bounds the length of time we wait so that
|
||||
# one slow container server doesn't make the entire request lag.
|
||||
try:
|
||||
with Timeout(self.container_update_timeout):
|
||||
for gt in update_greenthreads:
|
||||
gt.wait()
|
||||
except Timeout:
|
||||
# updates didn't go through, log it and return
|
||||
self.logger.debug(
|
||||
'Container update timeout (%.4fs) waiting for %s',
|
||||
self.container_update_timeout, updates)
|
||||
|
||||
def delete_at_update(self, op, delete_at, account, container, obj,
|
||||
request, objdevice, policy):
|
||||
|
@ -34,8 +34,9 @@ from tempfile import mkdtemp
|
||||
from hashlib import md5
|
||||
import itertools
|
||||
import tempfile
|
||||
from contextlib import contextmanager
|
||||
|
||||
from eventlet import sleep, spawn, wsgi, listen, Timeout, tpool
|
||||
from eventlet import sleep, spawn, wsgi, listen, Timeout, tpool, greenthread
|
||||
from eventlet.green import httplib
|
||||
|
||||
from nose import SkipTest
|
||||
@ -68,6 +69,35 @@ test_policies = [
|
||||
]
|
||||
|
||||
|
||||
@contextmanager
|
||||
def fake_spawn():
|
||||
"""
|
||||
Spawn and capture the result so we can later wait on it. This means we can
|
||||
test code executing in a greenthread but still wait() on the result to
|
||||
ensure that the method has completed.
|
||||
"""
|
||||
|
||||
orig = object_server.spawn
|
||||
greenlets = []
|
||||
|
||||
def _inner_fake_spawn(func, *a, **kw):
|
||||
gt = greenthread.spawn(func, *a, **kw)
|
||||
greenlets.append(gt)
|
||||
return gt
|
||||
|
||||
object_server.spawn = _inner_fake_spawn
|
||||
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
for gt in greenlets:
|
||||
try:
|
||||
gt.wait()
|
||||
except: # noqa
|
||||
pass # real spawn won't do anything but pollute logs
|
||||
object_server.spawn = orig
|
||||
|
||||
|
||||
@patch_policies(test_policies)
|
||||
class TestObjectController(unittest.TestCase):
|
||||
"""Test swift.obj.server.ObjectController"""
|
||||
@ -372,55 +402,54 @@ class TestObjectController(unittest.TestCase):
|
||||
|
||||
return lambda *args, **kwargs: FakeConn(response, with_exc)
|
||||
|
||||
old_http_connect = object_server.http_connect
|
||||
try:
|
||||
ts = time()
|
||||
timestamp = normalize_timestamp(ts)
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': timestamp,
|
||||
'Content-Type': 'text/plain',
|
||||
'Content-Length': '0'})
|
||||
ts = time()
|
||||
timestamp = normalize_timestamp(ts)
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': timestamp,
|
||||
'Content-Type': 'text/plain',
|
||||
'Content-Length': '0'})
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'X-Timestamp': normalize_timestamp(ts + 1),
|
||||
'X-Container-Host': '1.2.3.4:0',
|
||||
'X-Container-Partition': '3',
|
||||
'X-Container-Device': 'sda1',
|
||||
'X-Container-Timestamp': '1',
|
||||
'Content-Type': 'application/new1'})
|
||||
with mock.patch.object(object_server, 'http_connect',
|
||||
mock_http_connect(202)):
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'X-Timestamp': normalize_timestamp(ts + 1),
|
||||
'X-Container-Host': '1.2.3.4:0',
|
||||
'X-Container-Partition': '3',
|
||||
'X-Container-Device': 'sda1',
|
||||
'X-Container-Timestamp': '1',
|
||||
'Content-Type': 'application/new1'})
|
||||
object_server.http_connect = mock_http_connect(202)
|
||||
self.assertEquals(resp.status_int, 202)
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'X-Timestamp': normalize_timestamp(ts + 2),
|
||||
'X-Container-Host': '1.2.3.4:0',
|
||||
'X-Container-Partition': '3',
|
||||
'X-Container-Device': 'sda1',
|
||||
'X-Container-Timestamp': '1',
|
||||
'Content-Type': 'application/new1'})
|
||||
with mock.patch.object(object_server, 'http_connect',
|
||||
mock_http_connect(202, with_exc=True)):
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEquals(resp.status_int, 202)
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'X-Timestamp': normalize_timestamp(ts + 2),
|
||||
'X-Container-Host': '1.2.3.4:0',
|
||||
'X-Container-Partition': '3',
|
||||
'X-Container-Device': 'sda1',
|
||||
'X-Container-Timestamp': '1',
|
||||
'Content-Type': 'application/new1'})
|
||||
object_server.http_connect = mock_http_connect(202, with_exc=True)
|
||||
self.assertEquals(resp.status_int, 202)
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'X-Timestamp': normalize_timestamp(ts + 3),
|
||||
'X-Container-Host': '1.2.3.4:0',
|
||||
'X-Container-Partition': '3',
|
||||
'X-Container-Device': 'sda1',
|
||||
'X-Container-Timestamp': '1',
|
||||
'Content-Type': 'application/new2'})
|
||||
with mock.patch.object(object_server, 'http_connect',
|
||||
mock_http_connect(500)):
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEquals(resp.status_int, 202)
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'X-Timestamp': normalize_timestamp(ts + 3),
|
||||
'X-Container-Host': '1.2.3.4:0',
|
||||
'X-Container-Partition': '3',
|
||||
'X-Container-Device': 'sda1',
|
||||
'X-Container-Timestamp': '1',
|
||||
'Content-Type': 'application/new2'})
|
||||
object_server.http_connect = mock_http_connect(500)
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEquals(resp.status_int, 202)
|
||||
finally:
|
||||
object_server.http_connect = old_http_connect
|
||||
self.assertEquals(resp.status_int, 202)
|
||||
|
||||
def test_POST_quarantine_zbyte(self):
|
||||
timestamp = normalize_timestamp(time())
|
||||
@ -1295,52 +1324,54 @@ class TestObjectController(unittest.TestCase):
|
||||
|
||||
return lambda *args, **kwargs: FakeConn(response, with_exc)
|
||||
|
||||
old_http_connect = object_server.http_connect
|
||||
try:
|
||||
timestamp = normalize_timestamp(time())
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': timestamp,
|
||||
'X-Container-Host': '1.2.3.4:0',
|
||||
'X-Container-Partition': '3',
|
||||
'X-Container-Device': 'sda1',
|
||||
'X-Container-Timestamp': '1',
|
||||
'Content-Type': 'application/new1',
|
||||
'Content-Length': '0'})
|
||||
object_server.http_connect = mock_http_connect(201)
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
timestamp = normalize_timestamp(time())
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': timestamp,
|
||||
'X-Container-Host': '1.2.3.4:0',
|
||||
'X-Container-Partition': '3',
|
||||
'X-Container-Device': 'sda1',
|
||||
'X-Container-Timestamp': '1',
|
||||
'Content-Type': 'application/new1',
|
||||
'Content-Length': '0'})
|
||||
object_server.http_connect = mock_http_connect(500)
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
timestamp = normalize_timestamp(time())
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': timestamp,
|
||||
'X-Container-Host': '1.2.3.4:0',
|
||||
'X-Container-Partition': '3',
|
||||
'X-Container-Device': 'sda1',
|
||||
'X-Container-Timestamp': '1',
|
||||
'Content-Type': 'application/new1',
|
||||
'Content-Length': '0'})
|
||||
object_server.http_connect = mock_http_connect(500, with_exc=True)
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
finally:
|
||||
object_server.http_connect = old_http_connect
|
||||
timestamp = normalize_timestamp(time())
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': timestamp,
|
||||
'X-Container-Host': '1.2.3.4:0',
|
||||
'X-Container-Partition': '3',
|
||||
'X-Container-Device': 'sda1',
|
||||
'X-Container-Timestamp': '1',
|
||||
'Content-Type': 'application/new1',
|
||||
'Content-Length': '0'})
|
||||
with mock.patch.object(object_server, 'http_connect',
|
||||
mock_http_connect(201)):
|
||||
with fake_spawn():
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
timestamp = normalize_timestamp(time())
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': timestamp,
|
||||
'X-Container-Host': '1.2.3.4:0',
|
||||
'X-Container-Partition': '3',
|
||||
'X-Container-Device': 'sda1',
|
||||
'X-Container-Timestamp': '1',
|
||||
'Content-Type': 'application/new1',
|
||||
'Content-Length': '0'})
|
||||
with mock.patch.object(object_server, 'http_connect',
|
||||
mock_http_connect(500)):
|
||||
with fake_spawn():
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
timestamp = normalize_timestamp(time())
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': timestamp,
|
||||
'X-Container-Host': '1.2.3.4:0',
|
||||
'X-Container-Partition': '3',
|
||||
'X-Container-Device': 'sda1',
|
||||
'X-Container-Timestamp': '1',
|
||||
'Content-Type': 'application/new1',
|
||||
'Content-Length': '0'})
|
||||
with mock.patch.object(object_server, 'http_connect',
|
||||
mock_http_connect(500, with_exc=True)):
|
||||
with fake_spawn():
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
|
||||
def test_PUT_ssync_multi_frag(self):
|
||||
timestamp = utils.Timestamp(time()).internal
|
||||
@ -2481,7 +2512,8 @@ class TestObjectController(unittest.TestCase):
|
||||
'Content-Type': 'text/plain'})
|
||||
with mocked_http_conn(
|
||||
200, give_connect=capture_updates) as fake_conn:
|
||||
resp = req.get_response(self.object_controller)
|
||||
with fake_spawn():
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertRaises(StopIteration, fake_conn.code_iter.next)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
self.assertEquals(1, len(container_updates))
|
||||
@ -2520,7 +2552,8 @@ class TestObjectController(unittest.TestCase):
|
||||
'Content-Type': 'text/html'})
|
||||
with mocked_http_conn(
|
||||
200, give_connect=capture_updates) as fake_conn:
|
||||
resp = req.get_response(self.object_controller)
|
||||
with fake_spawn():
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertRaises(StopIteration, fake_conn.code_iter.next)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
self.assertEquals(1, len(container_updates))
|
||||
@ -2558,7 +2591,8 @@ class TestObjectController(unittest.TestCase):
|
||||
'Content-Type': 'text/enriched'})
|
||||
with mocked_http_conn(
|
||||
200, give_connect=capture_updates) as fake_conn:
|
||||
resp = req.get_response(self.object_controller)
|
||||
with fake_spawn():
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertRaises(StopIteration, fake_conn.code_iter.next)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
self.assertEquals(1, len(container_updates))
|
||||
@ -2596,7 +2630,8 @@ class TestObjectController(unittest.TestCase):
|
||||
'X-Container-Partition': 'p'})
|
||||
with mocked_http_conn(
|
||||
200, give_connect=capture_updates) as fake_conn:
|
||||
resp = req.get_response(self.object_controller)
|
||||
with fake_spawn():
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertRaises(StopIteration, fake_conn.code_iter.next)
|
||||
self.assertEqual(resp.status_int, 204)
|
||||
self.assertEquals(1, len(container_updates))
|
||||
@ -2627,7 +2662,8 @@ class TestObjectController(unittest.TestCase):
|
||||
'X-Container-Partition': 'p'})
|
||||
with mocked_http_conn(
|
||||
200, give_connect=capture_updates) as fake_conn:
|
||||
resp = req.get_response(self.object_controller)
|
||||
with fake_spawn():
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertRaises(StopIteration, fake_conn.code_iter.next)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
self.assertEquals(1, len(container_updates))
|
||||
@ -3096,7 +3132,8 @@ class TestObjectController(unittest.TestCase):
|
||||
|
||||
with mock.patch.object(object_server, 'http_connect',
|
||||
fake_http_connect):
|
||||
resp = req.get_response(self.object_controller)
|
||||
with fake_spawn():
|
||||
resp = req.get_response(self.object_controller)
|
||||
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
@ -3209,7 +3246,8 @@ class TestObjectController(unittest.TestCase):
|
||||
|
||||
with mock.patch.object(object_server, 'http_connect',
|
||||
fake_http_connect):
|
||||
req.get_response(self.object_controller)
|
||||
with fake_spawn():
|
||||
req.get_response(self.object_controller)
|
||||
|
||||
http_connect_args.sort(key=operator.itemgetter('ipaddr'))
|
||||
|
||||
@ -3286,7 +3324,8 @@ class TestObjectController(unittest.TestCase):
|
||||
'/sda1/p/a/c/o', method='PUT', body='', headers=headers)
|
||||
with mocked_http_conn(
|
||||
500, 500, give_connect=capture_updates) as fake_conn:
|
||||
resp = req.get_response(self.object_controller)
|
||||
with fake_spawn():
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertRaises(StopIteration, fake_conn.code_iter.next)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
self.assertEquals(2, len(container_updates))
|
||||
@ -3522,7 +3561,8 @@ class TestObjectController(unittest.TestCase):
|
||||
'Content-Type': 'text/plain'}, body='')
|
||||
with mocked_http_conn(
|
||||
200, give_connect=capture_updates) as fake_conn:
|
||||
resp = req.get_response(self.object_controller)
|
||||
with fake_spawn():
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertRaises(StopIteration, fake_conn.code_iter.next)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
self.assertEqual(len(container_updates), 1)
|
||||
@ -3563,7 +3603,8 @@ class TestObjectController(unittest.TestCase):
|
||||
headers=headers, body='')
|
||||
with mocked_http_conn(
|
||||
200, give_connect=capture_updates) as fake_conn:
|
||||
resp = req.get_response(self.object_controller)
|
||||
with fake_spawn():
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertRaises(StopIteration, fake_conn.code_iter.next)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
self.assertEqual(len(container_updates), 1)
|
||||
@ -3603,7 +3644,8 @@ class TestObjectController(unittest.TestCase):
|
||||
diskfile_mgr = self.object_controller._diskfile_router[policy]
|
||||
diskfile_mgr.pickle_async_update = fake_pickle_async_update
|
||||
with mocked_http_conn(500) as fake_conn:
|
||||
resp = req.get_response(self.object_controller)
|
||||
with fake_spawn():
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertRaises(StopIteration, fake_conn.code_iter.next)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
self.assertEqual(len(given_args), 7)
|
||||
@ -3630,6 +3672,104 @@ class TestObjectController(unittest.TestCase):
|
||||
'container': 'c',
|
||||
'op': 'PUT'})
|
||||
|
||||
def test_container_update_as_greenthread(self):
|
||||
greenthreads = []
|
||||
saved_spawn_calls = []
|
||||
called_async_update_args = []
|
||||
|
||||
def local_fake_spawn(func, *a, **kw):
|
||||
saved_spawn_calls.append((func, a, kw))
|
||||
return mock.MagicMock()
|
||||
|
||||
def local_fake_async_update(*a, **kw):
|
||||
# just capture the args to see that we would have called
|
||||
called_async_update_args.append([a, kw])
|
||||
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': '12345',
|
||||
'Content-Type': 'application/burrito',
|
||||
'Content-Length': '0',
|
||||
'X-Backend-Storage-Policy-Index': 0,
|
||||
'X-Container-Partition': '20',
|
||||
'X-Container-Host': '1.2.3.4:5',
|
||||
'X-Container-Device': 'sdb1'})
|
||||
with mock.patch.object(object_server, 'spawn',
|
||||
local_fake_spawn):
|
||||
with mock.patch.object(self.object_controller,
|
||||
'async_update',
|
||||
local_fake_async_update):
|
||||
resp = req.get_response(self.object_controller)
|
||||
# check the response is completed and successful
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
# check that async_update hasn't been called
|
||||
self.assertFalse(len(called_async_update_args))
|
||||
# now do the work in greenthreads
|
||||
for func, a, kw in saved_spawn_calls:
|
||||
gt = spawn(func, *a, **kw)
|
||||
greenthreads.append(gt)
|
||||
# wait for the greenthreads to finish
|
||||
for gt in greenthreads:
|
||||
gt.wait()
|
||||
# check that the calls to async_update have happened
|
||||
headers_out = {'X-Size': '0',
|
||||
'X-Content-Type': 'application/burrito',
|
||||
'X-Timestamp': '0000012345.00000',
|
||||
'X-Trans-Id': '-',
|
||||
'Referer': 'PUT http://localhost/sda1/p/a/c/o',
|
||||
'X-Backend-Storage-Policy-Index': '0',
|
||||
'X-Etag': 'd41d8cd98f00b204e9800998ecf8427e'}
|
||||
expected = [('PUT', 'a', 'c', 'o', '1.2.3.4:5', '20', 'sdb1',
|
||||
headers_out, 'sda1', POLICIES[0]),
|
||||
{'logger_thread_locals': (None, None)}]
|
||||
self.assertEqual(called_async_update_args, [expected])
|
||||
|
||||
def test_container_update_as_greenthread_with_timeout(self):
|
||||
'''
|
||||
give it one container to update (for only one greenthred)
|
||||
fake the greenthred so it will raise a timeout
|
||||
test that the right message is logged and the method returns None
|
||||
'''
|
||||
called_async_update_args = []
|
||||
|
||||
def local_fake_spawn(func, *a, **kw):
|
||||
m = mock.MagicMock()
|
||||
|
||||
def wait_with_error():
|
||||
raise Timeout()
|
||||
m.wait = wait_with_error # because raise can't be in a lambda
|
||||
return m
|
||||
|
||||
def local_fake_async_update(*a, **kw):
|
||||
# just capture the args to see that we would have called
|
||||
called_async_update_args.append([a, kw])
|
||||
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': '12345',
|
||||
'Content-Type': 'application/burrito',
|
||||
'Content-Length': '0',
|
||||
'X-Backend-Storage-Policy-Index': 0,
|
||||
'X-Container-Partition': '20',
|
||||
'X-Container-Host': '1.2.3.4:5',
|
||||
'X-Container-Device': 'sdb1'})
|
||||
with mock.patch.object(object_server, 'spawn',
|
||||
local_fake_spawn):
|
||||
with mock.patch.object(self.object_controller,
|
||||
'container_update_timeout',
|
||||
1.414213562):
|
||||
resp = req.get_response(self.object_controller)
|
||||
# check the response is completed and successful
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
# check that the timeout was logged
|
||||
expected_logged_error = "Container update timeout (1.4142s) " \
|
||||
"waiting for [('1.2.3.4:5', 'sdb1')]"
|
||||
self.assertTrue(
|
||||
expected_logged_error in
|
||||
self.object_controller.logger.get_lines_for_level('debug'))
|
||||
|
||||
def test_container_update_bad_args(self):
|
||||
policy = random.choice(list(POLICIES))
|
||||
given_args = []
|
||||
|
Loading…
x
Reference in New Issue
Block a user