Merge "timing-based affinity sorting for primary replicas"

This commit is contained in:
Jenkins 2013-02-27 01:30:15 +00:00 committed by Gerrit Code Review
commit 1dc38b4672
7 changed files with 100 additions and 20 deletions

@ -87,6 +87,13 @@ use = egg:swift#proxy
# Once segment rate-limiting kicks in for an object, limit segments served
# to N per second.
# rate_limit_segments_per_sec = 1
# Storage nodes can be chosen at random (shuffle) or by using timing
# measurements. Using timing measurements may allow for lower overall latency.
# The valid values for sorting_method are "shuffle" and "timing"
# sorting_method = shuffle
# If the timing sorting_method is used, the timings will only be valid for
# the number of seconds configured by timing_expiry.
# timing_expiry = 300
[filter:tempauth]
use = egg:swift#tempauth

@ -26,7 +26,6 @@
import time
from urllib import unquote
from random import shuffle
from swift.common.utils import normalize_timestamp, public
from swift.common.constraints import check_metadata, MAX_ACCOUNT_NAME_LENGTH
@ -49,7 +48,7 @@ class AccountController(Controller):
def GETorHEAD(self, req):
"""Handler for HTTP GET/HEAD requests."""
partition, nodes = self.app.account_ring.get_nodes(self.account_name)
shuffle(nodes)
nodes = self.app.sort_nodes(nodes)
resp = self.GETorHEAD_base(
req, _('Account'), partition, nodes, req.path_info.rstrip('/'),
len(nodes))

@ -400,10 +400,12 @@ class Controller(object):
break
attempts_left -= 1
try:
start_node_timing = time.time()
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], partition, 'HEAD',
path, headers)
self.app.set_node_timing(node, time.time() - start_node_timing)
with Timeout(self.app.node_timeout):
resp = conn.getresponse()
resp.read()
@ -491,10 +493,12 @@ class Controller(object):
headers = {'x-trans-id': self.trans_id, 'Connection': 'close'}
for node in self.iter_nodes(part, nodes, self.app.container_ring):
try:
start_node_timing = time.time()
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], part, 'HEAD',
path, headers)
self.app.set_node_timing(node, time.time() - start_node_timing)
with Timeout(self.app.node_timeout):
resp = conn.getresponse()
resp.read()
@ -558,11 +562,13 @@ class Controller(object):
self.app.logger.thread_locals = logger_thread_locals
for node in nodes:
try:
start_node_timing = time.time()
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], part, method, path,
headers=headers, query_string=query)
conn.node = node
self.app.set_node_timing(node, time.time() - start_node_timing)
with Timeout(self.app.node_timeout):
resp = conn.getresponse()
if not is_informational(resp.status) and \
@ -767,6 +773,7 @@ class Controller(object):
break
if self.error_limited(node):
continue
start_node_timing = time.time()
try:
with ConnectionTimeout(self.app.conn_timeout):
headers = dict(req.headers)
@ -775,6 +782,7 @@ class Controller(object):
node['ip'], node['port'], node['device'], partition,
req.method, path, headers=headers,
query_string=req.query_string)
self.app.set_node_timing(node, time.time() - start_node_timing)
with Timeout(self.app.node_timeout):
possible_source = conn.getresponse()
# See NOTE: swift_conn at top of file about this.

@ -26,7 +26,6 @@
import time
from urllib import unquote
from random import shuffle
from swift.common.utils import normalize_timestamp, public, csv_append
from swift.common.constraints import check_metadata, MAX_CONTAINER_NAME_LENGTH
@ -69,7 +68,7 @@ class ContainerController(Controller):
return HTTPNotFound(request=req)
part, nodes = self.app.container_ring.get_nodes(
self.account_name, self.container_name)
shuffle(nodes)
nodes = self.app.sort_nodes(nodes)
resp = self.GETorHEAD_base(
req, _('Container'), part, nodes, req.path_info, len(nodes))
if self.app.memcache:

@ -31,7 +31,6 @@ import time
from datetime import datetime
from urllib import unquote, quote
from hashlib import md5
from random import shuffle
from eventlet import sleep, GreenPile
from eventlet.queue import Queue
@ -123,7 +122,7 @@ class SegmentedIterable(object):
sleep(max(self.next_get_time - time.time(), 0))
self.next_get_time = time.time() + \
1.0 / self.controller.app.rate_limit_segments_per_sec
shuffle(nodes)
nodes = self.controller.app.sort_nodes(nodes)
resp = self.controller.GETorHEAD_base(
req, _('Object'), partition,
self.controller.iter_nodes(partition, nodes,
@ -271,7 +270,7 @@ class ObjectController(Controller):
lreq.environ['QUERY_STRING'] = \
'format=json&prefix=%s&marker=%s' % (quote(lprefix),
quote(marker))
shuffle(lnodes)
nodes = self.app.sort_nodes(lnodes)
lresp = self.GETorHEAD_base(
lreq, _('Container'), lpartition, lnodes, lreq.path_info,
len(lnodes))
@ -337,7 +336,7 @@ class ObjectController(Controller):
partition, nodes = self.app.object_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
shuffle(nodes)
nodes = self.app.sort_nodes(nodes)
resp = self.GETorHEAD_base(
req, _('Object'), partition,
self.iter_nodes(partition, nodes, self.app.object_ring),
@ -558,10 +557,12 @@ class ObjectController(Controller):
self.app.logger.thread_locals = logger_thread_locals
for node in nodes:
try:
start_time = time.time()
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(
node['ip'], node['port'], node['device'], part, 'PUT',
path, headers)
self.app.set_node_timing(node, time.time() - start_time)
with Timeout(self.app.node_timeout):
resp = conn.getexpect()
if resp.status == HTTP_CONTINUE:

@ -28,6 +28,8 @@ import mimetypes
import os
from ConfigParser import ConfigParser
import uuid
from random import shuffle
from time import time
from eventlet import Timeout
@ -108,6 +110,9 @@ class Application(object):
a.strip()
for a in conf.get('cors_allow_origin', '').split(',')
if a.strip()]
self.node_timings = {}
self.timing_expiry = int(conf.get('timing_expiry', 300))
self.sorting_method = conf.get('sorting_method', 'shuffle').lower()
def get_controller(self, path):
"""
@ -242,6 +247,33 @@ class Application(object):
self.logger.exception(_('ERROR Unhandled exception in request'))
return HTTPServerError(request=req)
def sort_nodes(self, nodes):
'''
Sorts nodes in-place (and returns the sorted list) according to
the configured strategy. The default "sorting" is to randomly
shuffle the nodes. If the "timing" strategy is chosen, the nodes
are sorted according to the stored timing data.
'''
# In the case of timing sorting, shuffling ensures that close timings
# (ie within the rounding resolution) won't prefer one over another.
# Python's sort is stable (http://wiki.python.org/moin/HowTo/Sorting/)
shuffle(nodes)
if self.sorting_method == 'timing':
now = time()
def key_func(node):
timing, expires = self.node_timings.get(node['ip'], (-1.0, 0))
return timing if expires > now else -1.0
nodes.sort(key=key_func)
return nodes
def set_node_timing(self, node, timing):
if self.sorting_method != 'timing':
return
now = time()
timing = round(timing, 3) # sort timings to the millisecond
self.node_timings[node['ip']] = (timing, now + self.timing_expiry)
def app_factory(global_conf, **local_conf):
"""paste.deploy app factory for creating WSGI proxy apps."""

@ -32,6 +32,7 @@ import time
from urllib import unquote, quote
from hashlib import md5
from tempfile import mkdtemp
import random
import eventlet
from eventlet import sleep, spawn, Timeout, util, wsgi, listen
@ -334,7 +335,8 @@ class FakeRing(object):
self.devs[x] = devs[x] = \
{'ip': '10.0.0.%s' % x,
'port': 1000 + x,
'device': 'sd' + (chr(ord('a') + x))}
'device': 'sd' + (chr(ord('a') + x)),
'id': x}
return 1, devs
def get_part_nodes(self, part):
@ -410,15 +412,6 @@ def set_http_connect(*args, **kwargs):
swift.proxy.controllers.container.http_connect = new_connect
def set_shuffle():
shuffle = lambda l: None
proxy_server.shuffle = shuffle
swift.proxy.controllers.base.shuffle = shuffle
swift.proxy.controllers.obj.shuffle = shuffle
swift.proxy.controllers.account.shuffle = shuffle
swift.proxy.controllers.container.shuffle = shuffle
# tests
class TestController(unittest.TestCase):
@ -763,6 +756,41 @@ class TestProxyServer(unittest.TestCase):
finally:
rmtree(swift_dir, ignore_errors=True)
def test_node_timing(self):
baseapp = proxy_server.Application({'sorting_method': 'timing'},
FakeMemcache(),
container_ring=FakeRing(),
object_ring=FakeRing(),
account_ring=FakeRing())
self.assertEquals(baseapp.node_timings, {})
req = Request.blank('/v1/account', environ={'REQUEST_METHOD': 'HEAD'})
baseapp.update_request(req)
resp = baseapp.handle_request(req)
self.assertEquals(resp.status_int, 503) # couldn't connect to anything
exp_timings = {}
self.assertEquals(baseapp.node_timings, exp_timings)
proxy_server.time = lambda: times.pop(0)
try:
times = [time.time()]
exp_timings = {'127.0.0.1': (0.1,
times[0] + baseapp.timing_expiry)}
baseapp.set_node_timing({'ip': '127.0.0.1'}, 0.1)
self.assertEquals(baseapp.node_timings, exp_timings)
finally:
proxy_server.time = time.time
proxy_server.shuffle = lambda l: l
try:
nodes = [{'ip': '127.0.0.1'}, {'ip': '127.0.0.2'}, {'ip': '127.0.0.3'}]
res = baseapp.sort_nodes(nodes)
exp_sorting = [{'ip': '127.0.0.2'}, {'ip': '127.0.0.3'},
{'ip': '127.0.0.1'}]
self.assertEquals(res, exp_sorting)
finally:
proxy_server.shuffle = random.shuffle
class TestObjectController(unittest.TestCase):
@ -1759,9 +1787,9 @@ class TestObjectController(unittest.TestCase):
def test_error_limiting(self):
with save_globals():
set_shuffle()
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
controller.app.sort_nodes = lambda l: l
self.assert_status_map(controller.HEAD, (200, 200, 503, 200, 200),
200)
self.assertEquals(controller.app.object_ring.devs[0]['errors'], 2)
@ -4224,9 +4252,9 @@ class TestContainerController(unittest.TestCase):
def test_error_limiting(self):
with save_globals():
set_shuffle()
controller = proxy_server.ContainerController(self.app, 'account',
'container')
controller.app.sort_nodes = lambda l: l
self.assert_status_map(controller.HEAD, (200, 503, 200, 200), 200,
missing_container=False)
self.assertEquals(
@ -5222,6 +5250,12 @@ class FakeObjectController(object):
for node in ring.get_more_nodes(partition):
yield node
def sort_nodes(self, nodes):
return nodes
def set_node_timing(self, node, timing):
return
class Stub(object):
pass