diff --git a/swift/common/utils.py b/swift/common/utils.py index 8ea9ea4392..fbf92eb1ec 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -43,6 +43,7 @@ from urlparse import urlparse as stdlib_urlparse, ParseResult import itertools import eventlet +import eventlet.semaphore from eventlet import GreenPool, sleep, Timeout, tpool from eventlet.green import socket, threading import netifaces @@ -409,6 +410,29 @@ def validate_device_partition(device, partition): raise ValueError('Invalid partition: %s' % quote(partition or '')) +class GreenthreadSafeIterator(object): + """ + Wrap an iterator to ensure that only one greenthread is inside its next() + method at a time. + + This is useful if an iterator's next() method may perform network IO, as + that may trigger a greenthread context switch (aka trampoline), which can + give another greenthread a chance to call next(). At that point, you get + an error like "ValueError: generator already executing". By wrapping calls + to next() with a mutex, we avoid that error. + """ + def __init__(self, unsafe_iterable): + self.unsafe_iter = iter(unsafe_iterable) + self.semaphore = eventlet.semaphore.Semaphore(value=1) + + def __iter__(self): + return self + + def next(self): + with self.semaphore: + return self.unsafe_iter.next() + + class NullLogger(): """A no-op logger for eventlet wsgi.""" diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 5c056cd3fe..e0a86a8469 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -35,7 +35,8 @@ from eventlet.timeout import Timeout from swift.common.wsgi import make_pre_authed_request from swift.common.utils import normalize_timestamp, config_true_value, \ - public, split_path, cache_from_env, list_from_csv + public, split_path, cache_from_env, list_from_csv, \ + GreenthreadSafeIterator from swift.common.bufferedhttp import http_connect from swift.common.exceptions import ChunkReadTimeout, ConnectionTimeout from swift.common.http import is_informational, is_success, is_redirection, \ @@ -557,6 +558,13 @@ class Controller(object): nodes. If a node yielded subsequently gets error limited, an extra node will be yielded to take its place. + Note that if you're going to iterate over this concurrently from + multiple greenthreads, you'll want to use a + swift.common.utils.GreenthreadSafeIterator to serialize access. + Otherwise, you may get ValueErrors from concurrent access. (You also + may not, depending on how logging is configured, the vagaries of + socket IO and eventlet, and the phase of the moon.) + :param ring: ring to get yield nodes from :param partition: ring partition to yield nodes for """ @@ -621,7 +629,7 @@ class Controller(object): :returns: a swob.Response object """ start_nodes = ring.get_part_nodes(part) - nodes = self.iter_nodes(ring, part) + nodes = GreenthreadSafeIterator(self.iter_nodes(ring, part)) pile = GreenPile(len(start_nodes)) for head in headers: pile.spawn(self._make_request, nodes, part, method, path, diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index f27bece0d7..5bdd9be362 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -37,7 +37,7 @@ from eventlet.queue import Queue from eventlet.timeout import Timeout from swift.common.utils import ContextPool, normalize_timestamp, \ - config_true_value, public, json, csv_append + config_true_value, public, json, csv_append, GreenthreadSafeIterator from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_metadata, check_object_creation, \ CONTAINER_LISTING_LIMIT, MAX_FILE_SIZE @@ -864,7 +864,8 @@ class ObjectController(Controller): else: delete_at_part = delete_at_nodes = None - node_iter = self.iter_nodes(self.app.object_ring, partition) + node_iter = GreenthreadSafeIterator( + self.iter_nodes(self.app.object_ring, partition)) pile = GreenPile(len(nodes)) chunked = req.headers.get('transfer-encoding') diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 44a741e071..90de9479d0 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -19,6 +19,7 @@ from __future__ import with_statement from test.unit import temptree import ctypes import errno +import eventlet import logging import os import random @@ -1464,6 +1465,66 @@ class TestStatsdLogging(unittest.TestCase): self.assert_(mock_controller.args[1] > 0) +class UnsafeXrange(object): + """ + Like xrange(limit), but with extra context switching to screw things up. + """ + def __init__(self, upper_bound): + self.current = 0 + self.concurrent_calls = 0 + self.upper_bound = upper_bound + + def __iter__(self): + return self + + def next(self): + if self.concurrent_calls > 0: + raise ValueError("concurrent access is bad, mmmkay? (%r)") + + self.concurrent_calls += 1 + try: + if self.current >= self.upper_bound: + raise StopIteration + else: + val = self.current + self.current += 1 + eventlet.sleep() # yield control + return val + finally: + self.concurrent_calls -= 1 + + +class TestGreenthreadSafeIterator(unittest.TestCase): + def increment(self, iterable): + plus_ones = [] + for n in iterable: + plus_ones.append(n + 1) + return plus_ones + + def test_setup_works(self): + # it should work without concurrent access + self.assertEquals([0, 1, 2, 3], list(UnsafeXrange(4))) + + iterable = UnsafeXrange(10) + pile = eventlet.GreenPile(2) + for _ in xrange(2): + pile.spawn(self.increment, iterable) + + try: + response = sorted([resp for resp in pile]) + self.assertTrue(False, "test setup is insufficiently crazy") + except ValueError: + pass + + def test_access_is_serialized(self): + pile = eventlet.GreenPile(2) + iterable = utils.GreenthreadSafeIterator(UnsafeXrange(10)) + for _ in xrange(2): + pile.spawn(self.increment, iterable) + response = sorted(sum([resp for resp in pile], [])) + self.assertEquals(range(1, 11), response) + + class TestStatsdLoggingDelegation(unittest.TestCase): def setUp(self): self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)