From 2289137164231d7872731c2cf3d81b86f34f01a4 Mon Sep 17 00:00:00 2001 From: John Dickinson Date: Sat, 23 May 2015 15:40:03 -0700 Subject: [PATCH] do container listing updates in another (green)thread The actual server-side changes are simple. The tests are a different matter. Many changes were needed to the object server tests to handle the now-async calls to the container server. In an effort to test this properly, some drive-by changes were made to improve tests. I tested this patch by doing zero-byte object writes to one container as fast as possible. Then I did it again while also saturating 2 of the container replica's disks. The results are linked below. https://gist.github.com/notmyname/2bb85acfd8fbc7fc312a DocImpact Change-Id: I737bd0af3f124a4ce3e0862a155e97c1f0ac3e52 --- doc/manpages/object-server.conf.5 | 2 + doc/source/deployment_guide.rst | 150 +++++++------ etc/object-server.conf-sample | 2 + swift/obj/server.py | 37 +++- test/unit/obj/test_server.py | 348 +++++++++++++++++++++--------- 5 files changed, 360 insertions(+), 179 deletions(-) diff --git a/doc/manpages/object-server.conf.5 b/doc/manpages/object-server.conf.5 index fb2297421a..518e72586e 100644 --- a/doc/manpages/object-server.conf.5 +++ b/doc/manpages/object-server.conf.5 @@ -129,6 +129,8 @@ Logging address. The default is /dev/log. Request timeout to external services. The default is 3 seconds. .IP \fBconn_timeout\fR Connection timeout to external services. The default is 0.5 seconds. +.IP \fBcontainer_update_timeout\fR +Request timeout to do a container update on an object update. The default is 1 second. .RE .PD diff --git a/doc/source/deployment_guide.rst b/doc/source/deployment_guide.rst index b26f3ceff1..bec3f55ecd 100644 --- a/doc/source/deployment_guide.rst +++ b/doc/source/deployment_guide.rst @@ -405,76 +405,86 @@ The following configuration options are available: [DEFAULT] -=================== ========== ============================================= -Option Default Description -------------------- ---------- --------------------------------------------- -swift_dir /etc/swift Swift configuration directory -devices /srv/node Parent directory of where devices are mounted -mount_check true Whether or not check if the devices are - mounted to prevent accidentally writing - to the root device -bind_ip 0.0.0.0 IP Address for server to bind to -bind_port 6000 Port for server to bind to -bind_timeout 30 Seconds to attempt bind before giving up -workers auto Override the number of pre-forked workers - that will accept connections. If set it - should be an integer, zero means no fork. If - unset, it will try to default to the number - of effective cpu cores and fallback to one. - Increasing the number of workers helps slow - filesystem operations in one request from - negatively impacting other requests, but only - the :ref:`servers_per_port - ` - option provides complete I/O isolation with - no measurable overhead. -servers_per_port 0 If each disk in each storage policy ring has - unique port numbers for its "ip" value, you - can use this setting to have each - object-server worker only service requests - for the single disk matching the port in the - ring. The value of this setting determines - how many worker processes run for each port - (disk) in the ring. If you have 24 disks - per server, and this setting is 4, then - each storage node will have 1 + (24 * 4) = - 97 total object-server processes running. - This gives complete I/O isolation, drastically - reducing the impact of slow disks on storage - node performance. The object-replicator and - object-reconstructor need to see this setting - too, so it must be in the [DEFAULT] section. - See :ref:`server-per-port-configuration`. -max_clients 1024 Maximum number of clients one worker can - process simultaneously (it will actually - accept(2) N + 1). Setting this to one (1) - will only handle one request at a time, - without accepting another request - concurrently. -disable_fallocate false Disable "fast fail" fallocate checks if the - underlying filesystem does not support it. -log_max_line_length 0 Caps the length of log lines to the - value given; no limit if set to 0, the - default. -log_custom_handlers None Comma-separated list of functions to call - to setup custom log handlers. -eventlet_debug false If true, turn on debug logging for eventlet -fallocate_reserve 0 You can set fallocate_reserve to the number of - bytes you'd like fallocate to reserve, whether - there is space for the given file size or not. - This is useful for systems that behave badly - when they completely run out of space; you can - make the services pretend they're out of space - early. -conn_timeout 0.5 Time to wait while attempting to connect to - another backend node. -node_timeout 3 Time to wait while sending each chunk of data - to another backend node. -client_timeout 60 Time to wait while receiving each chunk of - data from a client or another backend node. -network_chunk_size 65536 Size of chunks to read/write over the network -disk_chunk_size 65536 Size of chunks to read/write to disk -=================== ========== ============================================= +======================== ========== ========================================== +Option Default Description +------------------------ ---------- ------------------------------------------ +swift_dir /etc/swift Swift configuration directory +devices /srv/node Parent directory of where devices are + mounted +mount_check true Whether or not check if the devices are + mounted to prevent accidentally writing + to the root device +bind_ip 0.0.0.0 IP Address for server to bind to +bind_port 6000 Port for server to bind to +bind_timeout 30 Seconds to attempt bind before giving up +workers auto Override the number of pre-forked workers + that will accept connections. If set it + should be an integer, zero means no fork. + If unset, it will try to default to the + number of effective cpu cores and fallback + to one. Increasing the number of workers + helps slow filesystem operations in one + request from negatively impacting other + requests, but only the + :ref:`servers_per_port + ` option + provides complete I/O isolation with no + measurable overhead. +servers_per_port 0 If each disk in each storage policy ring + has unique port numbers for its "ip" + value, you can use this setting to have + each object-server worker only service + requests for the single disk matching the + port in the ring. The value of this + setting determines how many worker + processes run for each port (disk) in the + ring. If you have 24 disks per server, and + this setting is 4, then each storage node + will have 1 + (24 * 4) = 97 total + object-server processes running. This + gives complete I/O isolation, drastically + reducing the impact of slow disks on + storage node performance. The + object-replicator and object-reconstructor + need to see this setting too, so it must + be in the [DEFAULT] section. + See :ref:`server-per-port-configuration`. +max_clients 1024 Maximum number of clients one worker can + process simultaneously (it will actually + accept(2) N + 1). Setting this to one (1) + will only handle one request at a time, + without accepting another request + concurrently. +disable_fallocate false Disable "fast fail" fallocate checks if + the underlying filesystem does not support + it. +log_max_line_length 0 Caps the length of log lines to the + value given; no limit if set to 0, the + default. +log_custom_handlers None Comma-separated list of functions to call + to setup custom log handlers. +eventlet_debug false If true, turn on debug logging for + eventlet +fallocate_reserve 0 You can set fallocate_reserve to the + number of bytes you'd like fallocate to + reserve, whether there is space for the + given file size or not. This is useful for + systems that behave badly when they + completely run out of space; you can + make the services pretend they're out of + space early. +conn_timeout 0.5 Time to wait while attempting to connect + to another backend node. +node_timeout 3 Time to wait while sending each chunk of + data to another backend node. +client_timeout 60 Time to wait while receiving each chunk of + data from a client or another backend node +network_chunk_size 65536 Size of chunks to read/write over the + network +disk_chunk_size 65536 Size of chunks to read/write to disk +container_update_timeout 1 Time to wait while sending a container + update on object update. +======================== ========== ========================================== .. _object-server-options: diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index b36ec29aa6..31bd160a3e 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -60,6 +60,8 @@ bind_port = 6000 # conn_timeout = 0.5 # Time to wait while sending each chunk of data to another backend node. # node_timeout = 3 +# Time to wait while sending a container update on object update. +# container_update_timeout = 1.0 # Time to wait while receiving each chunk of data from a client or another # backend node. # client_timeout = 60 diff --git a/swift/obj/server.py b/swift/obj/server.py index 85c85544e4..fbe534ac60 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -28,6 +28,7 @@ from swift import gettext_ as _ from hashlib import md5 from eventlet import sleep, wsgi, Timeout +from eventlet.greenthread import spawn from swift.common.utils import public, get_logger, \ config_true_value, timing_stats, replication, \ @@ -108,7 +109,9 @@ class ObjectController(BaseStorageServer): """ super(ObjectController, self).__init__(conf) self.logger = logger or get_logger(conf, log_route='object-server') - self.node_timeout = int(conf.get('node_timeout', 3)) + self.node_timeout = float(conf.get('node_timeout', 3)) + self.container_update_timeout = float( + conf.get('container_update_timeout', 1)) self.conn_timeout = float(conf.get('conn_timeout', 0.5)) self.client_timeout = int(conf.get('client_timeout', 60)) self.disk_chunk_size = int(conf.get('disk_chunk_size', 65536)) @@ -198,7 +201,8 @@ class ObjectController(BaseStorageServer): device, partition, account, container, obj, policy, **kwargs) def async_update(self, op, account, container, obj, host, partition, - contdevice, headers_out, objdevice, policy): + contdevice, headers_out, objdevice, policy, + logger_thread_locals=None): """ Sends or saves an async update. @@ -213,7 +217,12 @@ class ObjectController(BaseStorageServer): request :param objdevice: device name that the object is in :param policy: the associated BaseStoragePolicy instance + :param logger_thread_locals: The thread local values to be set on the + self.logger to retain transaction + logging information. """ + if logger_thread_locals: + self.logger.thread_locals = logger_thread_locals headers_out['user-agent'] = 'object-server %s' % os.getpid() full_path = '/%s/%s/%s' % (account, container, obj) if all([host, partition, contdevice]): @@ -285,10 +294,28 @@ class ObjectController(BaseStorageServer): headers_out['x-trans-id'] = headers_in.get('x-trans-id', '-') headers_out['referer'] = request.as_referer() headers_out['X-Backend-Storage-Policy-Index'] = int(policy) + update_greenthreads = [] for conthost, contdevice in updates: - self.async_update(op, account, container, obj, conthost, - contpartition, contdevice, headers_out, - objdevice, policy) + gt = spawn(self.async_update, op, account, container, obj, + conthost, contpartition, contdevice, headers_out, + objdevice, policy, + logger_thread_locals=self.logger.thread_locals) + update_greenthreads.append(gt) + # Wait a little bit to see if the container updates are successful. + # If we immediately return after firing off the greenthread above, then + # we're more likely to confuse the end-user who does a listing right + # after getting a successful response to the object create. The + # `container_update_timeout` bounds the length of time we wait so that + # one slow container server doesn't make the entire request lag. + try: + with Timeout(self.container_update_timeout): + for gt in update_greenthreads: + gt.wait() + except Timeout: + # updates didn't go through, log it and return + self.logger.debug( + 'Container update timeout (%.4fs) waiting for %s', + self.container_update_timeout, updates) def delete_at_update(self, op, delete_at, account, container, obj, request, objdevice, policy): diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index bff913cb57..1e7a303ea4 100755 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -33,8 +33,9 @@ from tempfile import mkdtemp from hashlib import md5 import itertools import tempfile +from contextlib import contextmanager -from eventlet import sleep, spawn, wsgi, listen, Timeout, tpool +from eventlet import sleep, spawn, wsgi, listen, Timeout, tpool, greenthread from eventlet.green import httplib from nose import SkipTest @@ -67,6 +68,35 @@ test_policies = [ ] +@contextmanager +def fake_spawn(): + """ + Spawn and capture the result so we can later wait on it. This means we can + test code executing in a greenthread but still wait() on the result to + ensure that the method has completed. + """ + + orig = object_server.spawn + greenlets = [] + + def _inner_fake_spawn(func, *a, **kw): + gt = greenthread.spawn(func, *a, **kw) + greenlets.append(gt) + return gt + + object_server.spawn = _inner_fake_spawn + + try: + yield + finally: + for gt in greenlets: + try: + gt.wait() + except: # noqa + pass # real spawn won't do anything but pollute logs + object_server.spawn = orig + + @patch_policies(test_policies) class TestObjectController(unittest.TestCase): """Test swift.obj.server.ObjectController""" @@ -371,55 +401,54 @@ class TestObjectController(unittest.TestCase): return lambda *args, **kwargs: FakeConn(response, with_exc) - old_http_connect = object_server.http_connect - try: - ts = time() - timestamp = normalize_timestamp(ts) - req = Request.blank( - '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Timestamp': timestamp, - 'Content-Type': 'text/plain', - 'Content-Length': '0'}) + ts = time() + timestamp = normalize_timestamp(ts) + req = Request.blank( + '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': timestamp, + 'Content-Type': 'text/plain', + 'Content-Length': '0'}) + resp = req.get_response(self.object_controller) + self.assertEquals(resp.status_int, 201) + req = Request.blank( + '/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'POST'}, + headers={'X-Timestamp': normalize_timestamp(ts + 1), + 'X-Container-Host': '1.2.3.4:0', + 'X-Container-Partition': '3', + 'X-Container-Device': 'sda1', + 'X-Container-Timestamp': '1', + 'Content-Type': 'application/new1'}) + with mock.patch.object(object_server, 'http_connect', + mock_http_connect(202)): resp = req.get_response(self.object_controller) - self.assertEquals(resp.status_int, 201) - req = Request.blank( - '/sda1/p/a/c/o', - environ={'REQUEST_METHOD': 'POST'}, - headers={'X-Timestamp': normalize_timestamp(ts + 1), - 'X-Container-Host': '1.2.3.4:0', - 'X-Container-Partition': '3', - 'X-Container-Device': 'sda1', - 'X-Container-Timestamp': '1', - 'Content-Type': 'application/new1'}) - object_server.http_connect = mock_http_connect(202) + self.assertEquals(resp.status_int, 202) + req = Request.blank( + '/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'POST'}, + headers={'X-Timestamp': normalize_timestamp(ts + 2), + 'X-Container-Host': '1.2.3.4:0', + 'X-Container-Partition': '3', + 'X-Container-Device': 'sda1', + 'X-Container-Timestamp': '1', + 'Content-Type': 'application/new1'}) + with mock.patch.object(object_server, 'http_connect', + mock_http_connect(202, with_exc=True)): resp = req.get_response(self.object_controller) - self.assertEquals(resp.status_int, 202) - req = Request.blank( - '/sda1/p/a/c/o', - environ={'REQUEST_METHOD': 'POST'}, - headers={'X-Timestamp': normalize_timestamp(ts + 2), - 'X-Container-Host': '1.2.3.4:0', - 'X-Container-Partition': '3', - 'X-Container-Device': 'sda1', - 'X-Container-Timestamp': '1', - 'Content-Type': 'application/new1'}) - object_server.http_connect = mock_http_connect(202, with_exc=True) + self.assertEquals(resp.status_int, 202) + req = Request.blank( + '/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'POST'}, + headers={'X-Timestamp': normalize_timestamp(ts + 3), + 'X-Container-Host': '1.2.3.4:0', + 'X-Container-Partition': '3', + 'X-Container-Device': 'sda1', + 'X-Container-Timestamp': '1', + 'Content-Type': 'application/new2'}) + with mock.patch.object(object_server, 'http_connect', + mock_http_connect(500)): resp = req.get_response(self.object_controller) - self.assertEquals(resp.status_int, 202) - req = Request.blank( - '/sda1/p/a/c/o', - environ={'REQUEST_METHOD': 'POST'}, - headers={'X-Timestamp': normalize_timestamp(ts + 3), - 'X-Container-Host': '1.2.3.4:0', - 'X-Container-Partition': '3', - 'X-Container-Device': 'sda1', - 'X-Container-Timestamp': '1', - 'Content-Type': 'application/new2'}) - object_server.http_connect = mock_http_connect(500) - resp = req.get_response(self.object_controller) - self.assertEquals(resp.status_int, 202) - finally: - object_server.http_connect = old_http_connect + self.assertEquals(resp.status_int, 202) def test_POST_quarantine_zbyte(self): timestamp = normalize_timestamp(time()) @@ -1219,52 +1248,54 @@ class TestObjectController(unittest.TestCase): return lambda *args, **kwargs: FakeConn(response, with_exc) - old_http_connect = object_server.http_connect - try: - timestamp = normalize_timestamp(time()) - req = Request.blank( - '/sda1/p/a/c/o', - environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Timestamp': timestamp, - 'X-Container-Host': '1.2.3.4:0', - 'X-Container-Partition': '3', - 'X-Container-Device': 'sda1', - 'X-Container-Timestamp': '1', - 'Content-Type': 'application/new1', - 'Content-Length': '0'}) - object_server.http_connect = mock_http_connect(201) - resp = req.get_response(self.object_controller) - self.assertEquals(resp.status_int, 201) - timestamp = normalize_timestamp(time()) - req = Request.blank( - '/sda1/p/a/c/o', - environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Timestamp': timestamp, - 'X-Container-Host': '1.2.3.4:0', - 'X-Container-Partition': '3', - 'X-Container-Device': 'sda1', - 'X-Container-Timestamp': '1', - 'Content-Type': 'application/new1', - 'Content-Length': '0'}) - object_server.http_connect = mock_http_connect(500) - resp = req.get_response(self.object_controller) - self.assertEquals(resp.status_int, 201) - timestamp = normalize_timestamp(time()) - req = Request.blank( - '/sda1/p/a/c/o', - environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Timestamp': timestamp, - 'X-Container-Host': '1.2.3.4:0', - 'X-Container-Partition': '3', - 'X-Container-Device': 'sda1', - 'X-Container-Timestamp': '1', - 'Content-Type': 'application/new1', - 'Content-Length': '0'}) - object_server.http_connect = mock_http_connect(500, with_exc=True) - resp = req.get_response(self.object_controller) - self.assertEquals(resp.status_int, 201) - finally: - object_server.http_connect = old_http_connect + timestamp = normalize_timestamp(time()) + req = Request.blank( + '/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': timestamp, + 'X-Container-Host': '1.2.3.4:0', + 'X-Container-Partition': '3', + 'X-Container-Device': 'sda1', + 'X-Container-Timestamp': '1', + 'Content-Type': 'application/new1', + 'Content-Length': '0'}) + with mock.patch.object(object_server, 'http_connect', + mock_http_connect(201)): + with fake_spawn(): + resp = req.get_response(self.object_controller) + self.assertEquals(resp.status_int, 201) + timestamp = normalize_timestamp(time()) + req = Request.blank( + '/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': timestamp, + 'X-Container-Host': '1.2.3.4:0', + 'X-Container-Partition': '3', + 'X-Container-Device': 'sda1', + 'X-Container-Timestamp': '1', + 'Content-Type': 'application/new1', + 'Content-Length': '0'}) + with mock.patch.object(object_server, 'http_connect', + mock_http_connect(500)): + with fake_spawn(): + resp = req.get_response(self.object_controller) + self.assertEquals(resp.status_int, 201) + timestamp = normalize_timestamp(time()) + req = Request.blank( + '/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': timestamp, + 'X-Container-Host': '1.2.3.4:0', + 'X-Container-Partition': '3', + 'X-Container-Device': 'sda1', + 'X-Container-Timestamp': '1', + 'Content-Type': 'application/new1', + 'Content-Length': '0'}) + with mock.patch.object(object_server, 'http_connect', + mock_http_connect(500, with_exc=True)): + with fake_spawn(): + resp = req.get_response(self.object_controller) + self.assertEquals(resp.status_int, 201) def test_PUT_ssync_multi_frag(self): timestamp = utils.Timestamp(time()).internal @@ -2407,7 +2438,8 @@ class TestObjectController(unittest.TestCase): 'Content-Type': 'text/plain'}) with mocked_http_conn( 200, give_connect=capture_updates) as fake_conn: - resp = req.get_response(self.object_controller) + with fake_spawn(): + resp = req.get_response(self.object_controller) self.assertRaises(StopIteration, fake_conn.code_iter.next) self.assertEqual(resp.status_int, 201) self.assertEquals(1, len(container_updates)) @@ -2446,7 +2478,8 @@ class TestObjectController(unittest.TestCase): 'Content-Type': 'text/html'}) with mocked_http_conn( 200, give_connect=capture_updates) as fake_conn: - resp = req.get_response(self.object_controller) + with fake_spawn(): + resp = req.get_response(self.object_controller) self.assertRaises(StopIteration, fake_conn.code_iter.next) self.assertEqual(resp.status_int, 201) self.assertEquals(1, len(container_updates)) @@ -2484,7 +2517,8 @@ class TestObjectController(unittest.TestCase): 'Content-Type': 'text/enriched'}) with mocked_http_conn( 200, give_connect=capture_updates) as fake_conn: - resp = req.get_response(self.object_controller) + with fake_spawn(): + resp = req.get_response(self.object_controller) self.assertRaises(StopIteration, fake_conn.code_iter.next) self.assertEqual(resp.status_int, 201) self.assertEquals(1, len(container_updates)) @@ -2522,7 +2556,8 @@ class TestObjectController(unittest.TestCase): 'X-Container-Partition': 'p'}) with mocked_http_conn( 200, give_connect=capture_updates) as fake_conn: - resp = req.get_response(self.object_controller) + with fake_spawn(): + resp = req.get_response(self.object_controller) self.assertRaises(StopIteration, fake_conn.code_iter.next) self.assertEqual(resp.status_int, 204) self.assertEquals(1, len(container_updates)) @@ -2553,7 +2588,8 @@ class TestObjectController(unittest.TestCase): 'X-Container-Partition': 'p'}) with mocked_http_conn( 200, give_connect=capture_updates) as fake_conn: - resp = req.get_response(self.object_controller) + with fake_spawn(): + resp = req.get_response(self.object_controller) self.assertRaises(StopIteration, fake_conn.code_iter.next) self.assertEqual(resp.status_int, 404) self.assertEquals(1, len(container_updates)) @@ -3022,7 +3058,8 @@ class TestObjectController(unittest.TestCase): with mock.patch.object(object_server, 'http_connect', fake_http_connect): - resp = req.get_response(self.object_controller) + with fake_spawn(): + resp = req.get_response(self.object_controller) self.assertEqual(resp.status_int, 201) @@ -3135,7 +3172,8 @@ class TestObjectController(unittest.TestCase): with mock.patch.object(object_server, 'http_connect', fake_http_connect): - req.get_response(self.object_controller) + with fake_spawn(): + req.get_response(self.object_controller) http_connect_args.sort(key=operator.itemgetter('ipaddr')) @@ -3212,7 +3250,8 @@ class TestObjectController(unittest.TestCase): '/sda1/p/a/c/o', method='PUT', body='', headers=headers) with mocked_http_conn( 500, 500, give_connect=capture_updates) as fake_conn: - resp = req.get_response(self.object_controller) + with fake_spawn(): + resp = req.get_response(self.object_controller) self.assertRaises(StopIteration, fake_conn.code_iter.next) self.assertEqual(resp.status_int, 201) self.assertEquals(2, len(container_updates)) @@ -3448,7 +3487,8 @@ class TestObjectController(unittest.TestCase): 'Content-Type': 'text/plain'}, body='') with mocked_http_conn( 200, give_connect=capture_updates) as fake_conn: - resp = req.get_response(self.object_controller) + with fake_spawn(): + resp = req.get_response(self.object_controller) self.assertRaises(StopIteration, fake_conn.code_iter.next) self.assertEqual(resp.status_int, 201) self.assertEqual(len(container_updates), 1) @@ -3489,7 +3529,8 @@ class TestObjectController(unittest.TestCase): headers=headers, body='') with mocked_http_conn( 200, give_connect=capture_updates) as fake_conn: - resp = req.get_response(self.object_controller) + with fake_spawn(): + resp = req.get_response(self.object_controller) self.assertRaises(StopIteration, fake_conn.code_iter.next) self.assertEqual(resp.status_int, 201) self.assertEqual(len(container_updates), 1) @@ -3529,7 +3570,8 @@ class TestObjectController(unittest.TestCase): diskfile_mgr = self.object_controller._diskfile_router[policy] diskfile_mgr.pickle_async_update = fake_pickle_async_update with mocked_http_conn(500) as fake_conn: - resp = req.get_response(self.object_controller) + with fake_spawn(): + resp = req.get_response(self.object_controller) self.assertRaises(StopIteration, fake_conn.code_iter.next) self.assertEqual(resp.status_int, 201) self.assertEqual(len(given_args), 7) @@ -3556,6 +3598,104 @@ class TestObjectController(unittest.TestCase): 'container': 'c', 'op': 'PUT'}) + def test_container_update_as_greenthread(self): + greenthreads = [] + saved_spawn_calls = [] + called_async_update_args = [] + + def local_fake_spawn(func, *a, **kw): + saved_spawn_calls.append((func, a, kw)) + return mock.MagicMock() + + def local_fake_async_update(*a, **kw): + # just capture the args to see that we would have called + called_async_update_args.append([a, kw]) + + req = Request.blank( + '/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': '12345', + 'Content-Type': 'application/burrito', + 'Content-Length': '0', + 'X-Backend-Storage-Policy-Index': 0, + 'X-Container-Partition': '20', + 'X-Container-Host': '1.2.3.4:5', + 'X-Container-Device': 'sdb1'}) + with mock.patch.object(object_server, 'spawn', + local_fake_spawn): + with mock.patch.object(self.object_controller, + 'async_update', + local_fake_async_update): + resp = req.get_response(self.object_controller) + # check the response is completed and successful + self.assertEqual(resp.status_int, 201) + # check that async_update hasn't been called + self.assertFalse(len(called_async_update_args)) + # now do the work in greenthreads + for func, a, kw in saved_spawn_calls: + gt = spawn(func, *a, **kw) + greenthreads.append(gt) + # wait for the greenthreads to finish + for gt in greenthreads: + gt.wait() + # check that the calls to async_update have happened + headers_out = {'X-Size': '0', + 'X-Content-Type': 'application/burrito', + 'X-Timestamp': '0000012345.00000', + 'X-Trans-Id': '-', + 'Referer': 'PUT http://localhost/sda1/p/a/c/o', + 'X-Backend-Storage-Policy-Index': '0', + 'X-Etag': 'd41d8cd98f00b204e9800998ecf8427e'} + expected = [('PUT', 'a', 'c', 'o', '1.2.3.4:5', '20', 'sdb1', + headers_out, 'sda1', POLICIES[0]), + {'logger_thread_locals': (None, None)}] + self.assertEqual(called_async_update_args, [expected]) + + def test_container_update_as_greenthread_with_timeout(self): + ''' + give it one container to update (for only one greenthred) + fake the greenthred so it will raise a timeout + test that the right message is logged and the method returns None + ''' + called_async_update_args = [] + + def local_fake_spawn(func, *a, **kw): + m = mock.MagicMock() + + def wait_with_error(): + raise Timeout() + m.wait = wait_with_error # because raise can't be in a lambda + return m + + def local_fake_async_update(*a, **kw): + # just capture the args to see that we would have called + called_async_update_args.append([a, kw]) + + req = Request.blank( + '/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': '12345', + 'Content-Type': 'application/burrito', + 'Content-Length': '0', + 'X-Backend-Storage-Policy-Index': 0, + 'X-Container-Partition': '20', + 'X-Container-Host': '1.2.3.4:5', + 'X-Container-Device': 'sdb1'}) + with mock.patch.object(object_server, 'spawn', + local_fake_spawn): + with mock.patch.object(self.object_controller, + 'container_update_timeout', + 1.414213562): + resp = req.get_response(self.object_controller) + # check the response is completed and successful + self.assertEqual(resp.status_int, 201) + # check that the timeout was logged + expected_logged_error = "Container update timeout (1.4142s) " \ + "waiting for [('1.2.3.4:5', 'sdb1')]" + self.assertTrue( + expected_logged_error in + self.object_controller.logger.get_lines_for_level('debug')) + def test_container_update_bad_args(self): policy = random.choice(list(POLICIES)) given_args = []