Merge "Local write affinity for object PUT requests."
This commit is contained in:
commit
edf4068c8b
@ -282,6 +282,127 @@ allows it to be more easily consumed by third party utilities::
|
||||
{"object": {"retries:": 0, "missing_two": 0, "copies_found": 7863, "missing_one": 0, "copies_expected": 7863, "pct_found": 100.0, "overlapping": 0, "missing_all": 0}, "container": {"retries:": 0, "missing_two": 0, "copies_found": 12534, "missing_one": 0, "copies_expected": 12534, "pct_found": 100.0, "overlapping": 15, "missing_all": 0}}
|
||||
|
||||
|
||||
-----------------------------------
|
||||
Geographically Distributed Clusters
|
||||
-----------------------------------
|
||||
|
||||
Swift's default configuration is currently designed to work in a
|
||||
single region, where a region is defined as a group of machines with
|
||||
high-bandwidth, low-latency links between them. However, configuration
|
||||
options exist that make running a performant multi-region Swift
|
||||
cluster possible.
|
||||
|
||||
For the rest of this section, we will assume a two-region Swift
|
||||
cluster: region 1 in San Francisco (SF), and region 2 in New York
|
||||
(NY). Each region shall contain within it 3 zones, numbered 1, 2, and
|
||||
3, for a total of 6 zones.
|
||||
|
||||
~~~~~~~~~~~~~
|
||||
read_affinity
|
||||
~~~~~~~~~~~~~
|
||||
|
||||
This setting makes the proxy server prefer local backend servers for
|
||||
GET and HEAD requests over non-local ones. For example, it is
|
||||
preferable for an SF proxy server to service object GET requests
|
||||
by talking to SF object servers, as the client will receive lower
|
||||
latency and higher throughput.
|
||||
|
||||
By default, Swift randomly chooses one of the three replicas to give
|
||||
to the client, thereby spreading the load evenly. In the case of a
|
||||
geographically-distributed cluster, the administrator is likely to
|
||||
prioritize keeping traffic local over even distribution of results.
|
||||
This is where the read_affinity setting comes in.
|
||||
|
||||
Example::
|
||||
|
||||
[app:proxy-server]
|
||||
read_affinity = r1=100
|
||||
|
||||
This will make the proxy attempt to service GET and HEAD requests from
|
||||
backends in region 1 before contacting any backends in region 2.
|
||||
However, if no region 1 backends are available (due to replica
|
||||
placement, failed hardware, or other reasons), then the proxy will
|
||||
fall back to backend servers in other regions.
|
||||
|
||||
Example::
|
||||
|
||||
[app:proxy-server]
|
||||
read_affinity = r1z1=100, r1=200
|
||||
|
||||
This will make the proxy attempt to service GET and HEAD requests from
|
||||
backends in region 1 zone 1, then backends in region 1, then any other
|
||||
backends. If a proxy is physically close to a particular zone or
|
||||
zones, this can provide bandwidth savings. For example, if a zone
|
||||
corresponds to servers in a particular rack, and the proxy server is
|
||||
in that same rack, then setting read_affinity to prefer reads from
|
||||
within the rack will result in less traffic between the top-of-rack
|
||||
switches.
|
||||
|
||||
The read_affinity setting may contain any number of region/zone
|
||||
specifiers; the priority number (after the equals sign) determines the
|
||||
ordering in which backend servers will be contacted. A lower number
|
||||
means higher priority.
|
||||
|
||||
Note that read_affinity only affects the ordering of primary nodes
|
||||
(see ring docs for definition of primary node), not the ordering of
|
||||
handoff nodes.
|
||||
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
write_affinity and write_affinity_node_count
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
This setting makes the proxy server prefer local backend servers for
|
||||
object PUT requests over non-local ones. For example, it may be
|
||||
preferable for an SF proxy server to service object PUT requests
|
||||
by talking to SF object servers, as the client will receive lower
|
||||
latency and higher throughput. However, if this setting is used, note
|
||||
that a NY proxy server handling a GET request for an object that was
|
||||
PUT using write affinity may have to fetch it across the WAN link, as
|
||||
the object won't immediately have any replicas in NY. However,
|
||||
replication will move the object's replicas to their proper homes in
|
||||
both SF and NY.
|
||||
|
||||
Note that only object PUT requests are affected by the write_affinity
|
||||
setting; POST, GET, HEAD, DELETE, OPTIONS, and account/container PUT
|
||||
requests are not affected.
|
||||
|
||||
This setting lets you trade data distribution for throughput. If
|
||||
write_affinity is enabled, then object replicas will initially be
|
||||
stored all within a particular region or zone, thereby decreasing the
|
||||
quality of the data distribution, but the replicas will be distributed
|
||||
over fast WAN links, giving higher throughput to clients. Note that
|
||||
the replicators will eventually move objects to their proper,
|
||||
well-distributed homes.
|
||||
|
||||
The write_affinity setting is useful only when you don't typically
|
||||
read objects immediately after writing them. For example, consider a
|
||||
workload of mainly backups: if you have a bunch of machines in NY that
|
||||
periodically write backups to Swift, then odds are that you don't then
|
||||
immediately read those backups in SF. If your workload doesn't look
|
||||
like that, then you probably shouldn't use write_affinity.
|
||||
|
||||
The write_affinity_node_count setting is only useful in conjunction
|
||||
with write_affinity; it governs how many local object servers will be
|
||||
tried before falling back to non-local ones.
|
||||
|
||||
Example::
|
||||
|
||||
[app:proxy-server]
|
||||
write_affinity = r1
|
||||
write_affinity_node_count = 2 * replicas
|
||||
|
||||
Assuming 3 replicas, this configuration will make object PUTs try
|
||||
storing the object's replicas on up to 6 disks ("2 * replicas") in
|
||||
region 1 ("r1").
|
||||
|
||||
You should be aware that, if you have data coming into SF faster than
|
||||
your link to NY can transfer it, then your cluster's data distribution
|
||||
will get worse and worse over time as objects pile up in SF. If this
|
||||
happens, it is recommended to disable write_affinity and simply let
|
||||
object PUTs traverse the WAN link, as that will naturally limit the
|
||||
object growth rate to what your WAN link can handle.
|
||||
|
||||
|
||||
--------------------------------
|
||||
Cluster Telemetry and Monitoring
|
||||
--------------------------------
|
||||
|
@ -147,6 +147,26 @@ use = egg:swift#proxy
|
||||
# read_affinity = r1z1=100, r1z2=200, r2=300
|
||||
# Default is empty, meaning no preference.
|
||||
# read_affinity =
|
||||
#
|
||||
# Which backend servers to prefer on writes. Format is r<N> for region
|
||||
# N or r<N>z<M> for region N, zone M. If this is set, then when
|
||||
# handling an object PUT request, some number (see setting
|
||||
# write_affinity_node_count) of local backend servers will be tried
|
||||
# before any nonlocal ones.
|
||||
#
|
||||
# Example: try to write to regions 1 and 2 before writing to any other
|
||||
# nodes:
|
||||
# write_affinity = r1, r2
|
||||
# Default is empty, meaning no preference.
|
||||
# write_affinity =
|
||||
#
|
||||
# The number of local (as governed by the write_affinity setting)
|
||||
# nodes to attempt to contact first, before any non-local ones. You
|
||||
# can use '* replicas' at the end to have it use the number given
|
||||
# times the number of replicas for the ring being used for the
|
||||
# request.
|
||||
# write_affinity_node_count = 2 * replicas
|
||||
|
||||
|
||||
[filter:tempauth]
|
||||
use = egg:swift#tempauth
|
||||
|
@ -1663,8 +1663,8 @@ def affinity_key_function(affinity_str):
|
||||
|
||||
:param affinity_str: affinity config value, e.g. "r1z2=3"
|
||||
or "r1=1, r2z1=2, r2z2=2"
|
||||
:returns: single-argument function, or None if argument invalid
|
||||
|
||||
:returns: single-argument function
|
||||
:raises: ValueError if argument invalid
|
||||
"""
|
||||
affinity_str = affinity_str.strip()
|
||||
|
||||
@ -1701,6 +1701,56 @@ def affinity_key_function(affinity_str):
|
||||
return keyfn
|
||||
|
||||
|
||||
def affinity_locality_predicate(write_affinity_str):
|
||||
"""
|
||||
Turns a write-affinity config value into a predicate function for nodes.
|
||||
The returned value will be a 1-arg function that takes a node dictionary
|
||||
and returns a true value if it is "local" and a false value otherwise. The
|
||||
definition of "local" comes from the affinity_str argument passed in here.
|
||||
|
||||
For example, if affinity_str is "r1, r2z2", then only nodes where region=1
|
||||
or where (region=2 and zone=2) are considered local.
|
||||
|
||||
If affinity_str is empty or all whitespace, then the resulting function
|
||||
will consider everything local
|
||||
|
||||
:param affinity_str: affinity config value, e.g. "r1z2"
|
||||
or "r1, r2z1, r2z2"
|
||||
:returns: single-argument function, or None if affinity_str is empty
|
||||
:raises: ValueError if argument invalid
|
||||
"""
|
||||
affinity_str = write_affinity_str.strip()
|
||||
|
||||
if not affinity_str:
|
||||
return None
|
||||
|
||||
matchers = []
|
||||
pieces = [s.strip() for s in affinity_str.split(',')]
|
||||
for piece in pieces:
|
||||
# matches r<number> or r<number>z<number>
|
||||
match = re.match("r(\d+)(?:z(\d+))?$", piece)
|
||||
if match:
|
||||
region, zone = match.groups()
|
||||
region = int(region)
|
||||
zone = int(zone) if zone else None
|
||||
|
||||
matcher = {'region': region}
|
||||
if zone is not None:
|
||||
matcher['zone'] = zone
|
||||
matchers.append(matcher)
|
||||
else:
|
||||
raise ValueError("Invalid write-affinity value: %r" % affinity_str)
|
||||
|
||||
def is_local(ring_node):
|
||||
for matcher in matchers:
|
||||
if (matcher['region'] == ring_node['region']
|
||||
and ('zone' not in matcher
|
||||
or matcher['zone'] == ring_node['zone'])):
|
||||
return True
|
||||
return False
|
||||
return is_local
|
||||
|
||||
|
||||
def get_remote_client(req):
|
||||
# remote host for zeus
|
||||
client = req.headers.get('x-cluster-client-ip')
|
||||
|
@ -28,6 +28,7 @@ import os
|
||||
import time
|
||||
import functools
|
||||
import inspect
|
||||
import itertools
|
||||
from urllib import quote
|
||||
|
||||
from eventlet import spawn_n, GreenPile
|
||||
@ -599,7 +600,7 @@ class Controller(object):
|
||||
info['nodes'] = nodes
|
||||
return info
|
||||
|
||||
def iter_nodes(self, ring, partition):
|
||||
def iter_nodes(self, ring, partition, node_iter=None):
|
||||
"""
|
||||
Yields nodes for a ring partition, skipping over error
|
||||
limited nodes and stopping at the configurable number of
|
||||
@ -615,9 +616,22 @@ class Controller(object):
|
||||
|
||||
:param ring: ring to get yield nodes from
|
||||
:param partition: ring partition to yield nodes for
|
||||
:param node_iter: optional iterable of nodes to try. Useful if you
|
||||
want to filter or reorder the nodes.
|
||||
"""
|
||||
primary_nodes = self.app.sort_nodes(ring.get_part_nodes(partition))
|
||||
part_nodes = ring.get_part_nodes(partition)
|
||||
if node_iter is None:
|
||||
node_iter = itertools.chain(part_nodes,
|
||||
ring.get_more_nodes(partition))
|
||||
num_primary_nodes = len(part_nodes)
|
||||
|
||||
# Use of list() here forcibly yanks the first N nodes (the primary
|
||||
# nodes) from node_iter, so the rest of its values are handoffs.
|
||||
primary_nodes = self.app.sort_nodes(
|
||||
list(itertools.islice(node_iter, num_primary_nodes)))
|
||||
handoff_nodes = node_iter
|
||||
nodes_left = self.app.request_node_count(ring)
|
||||
|
||||
for node in primary_nodes:
|
||||
if not self.error_limited(node):
|
||||
yield node
|
||||
@ -625,8 +639,9 @@ class Controller(object):
|
||||
nodes_left -= 1
|
||||
if nodes_left <= 0:
|
||||
return
|
||||
|
||||
handoffs = 0
|
||||
for node in ring.get_more_nodes(partition):
|
||||
for node in handoff_nodes:
|
||||
if not self.error_limited(node):
|
||||
handoffs += 1
|
||||
if self.app.log_handoffs:
|
||||
|
@ -381,6 +381,44 @@ class ObjectController(Controller):
|
||||
except ListingIterNotAuthorized:
|
||||
pass
|
||||
|
||||
def iter_nodes_local_first(self, ring, partition):
|
||||
"""
|
||||
Yields nodes for a ring partition.
|
||||
|
||||
If the 'write_affinity' setting is non-empty, then this will yield N
|
||||
local nodes (as defined by the write_affinity setting) first, then the
|
||||
rest of the nodes as normal. It is a re-ordering of the nodes such
|
||||
that the local ones come first; no node is omitted. The effect is
|
||||
that the request will be serviced by local object servers first, but
|
||||
nonlocal ones will be employed if not enough local ones are available.
|
||||
|
||||
:param ring: ring to get nodes from
|
||||
:param partition: ring partition to yield nodes for
|
||||
"""
|
||||
|
||||
primary_nodes = ring.get_part_nodes(partition)
|
||||
num_locals = self.app.write_affinity_node_count(ring)
|
||||
is_local = self.app.write_affinity_is_local_fn
|
||||
|
||||
if is_local is None:
|
||||
return self.iter_nodes(ring, partition)
|
||||
|
||||
all_nodes = itertools.chain(primary_nodes,
|
||||
ring.get_more_nodes(partition))
|
||||
first_n_local_nodes = list(itertools.islice(
|
||||
itertools.ifilter(is_local, all_nodes), num_locals))
|
||||
|
||||
# refresh it; it moved when we computed first_n_local_nodes
|
||||
all_nodes = itertools.chain(primary_nodes,
|
||||
ring.get_more_nodes(partition))
|
||||
local_first_node_iter = itertools.chain(
|
||||
first_n_local_nodes,
|
||||
itertools.ifilter(lambda node: node not in first_n_local_nodes,
|
||||
all_nodes))
|
||||
|
||||
return self.iter_nodes(
|
||||
ring, partition, node_iter=local_first_node_iter)
|
||||
|
||||
def is_good_source(self, src):
|
||||
"""
|
||||
Indicates whether or not the request made to the backend found
|
||||
@ -898,7 +936,7 @@ class ObjectController(Controller):
|
||||
delete_at_container = delete_at_part = delete_at_nodes = None
|
||||
|
||||
node_iter = GreenthreadSafeIterator(
|
||||
self.iter_nodes(self.app.object_ring, partition))
|
||||
self.iter_nodes_local_first(self.app.object_ring, partition))
|
||||
pile = GreenPile(len(nodes))
|
||||
te = req.headers.get('transfer-encoding', '')
|
||||
chunked = ('chunked' in te)
|
||||
|
@ -35,7 +35,7 @@ from eventlet import Timeout
|
||||
from swift.common.ring import Ring
|
||||
from swift.common.utils import cache_from_env, get_logger, \
|
||||
get_remote_client, split_path, config_true_value, generate_trans_id, \
|
||||
affinity_key_function
|
||||
affinity_key_function, affinity_locality_predicate
|
||||
from swift.common.constraints import check_utf8
|
||||
from swift.proxy.controllers import AccountController, ObjectController, \
|
||||
ContainerController
|
||||
@ -133,6 +133,25 @@ class Application(object):
|
||||
# make the message a little more useful
|
||||
raise ValueError("Invalid read_affinity value: %r (%s)" %
|
||||
(read_affinity, err.message))
|
||||
try:
|
||||
write_affinity = conf.get('write_affinity', '')
|
||||
self.write_affinity_is_local_fn \
|
||||
= affinity_locality_predicate(write_affinity)
|
||||
except ValueError as err:
|
||||
# make the message a little more useful
|
||||
raise ValueError("Invalid write_affinity value: %r (%s)" %
|
||||
(write_affinity, err.message))
|
||||
value = conf.get('write_affinity_node_count',
|
||||
'2 * replicas').lower().split()
|
||||
if len(value) == 1:
|
||||
value = int(value[0])
|
||||
self.write_affinity_node_count = lambda r: value
|
||||
elif len(value) == 3 and value[1] == '*' and value[2] == 'replicas':
|
||||
value = int(value[0])
|
||||
self.write_affinity_node_count = lambda r: value * r.replica_count
|
||||
else:
|
||||
raise ValueError(
|
||||
'Invalid write_affinity_node_count value: %r' % ''.join(value))
|
||||
|
||||
def get_controller(self, path):
|
||||
"""
|
||||
|
@ -19,11 +19,11 @@ from httplib import HTTPException
|
||||
|
||||
class FakeRing(object):
|
||||
|
||||
def __init__(self, replicas=3):
|
||||
def __init__(self, replicas=3, max_more_nodes=0):
|
||||
# 9 total nodes (6 more past the initial 3) is the cap, no matter if
|
||||
# this is set higher, or R^2 for R replicas
|
||||
self.replicas = replicas
|
||||
self.max_more_nodes = 0
|
||||
self.max_more_nodes = max_more_nodes
|
||||
self.devs = {}
|
||||
|
||||
def set_replicas(self, replicas):
|
||||
@ -46,17 +46,24 @@ class FakeRing(object):
|
||||
{'ip': '10.0.0.%s' % x,
|
||||
'port': 1000 + x,
|
||||
'device': 'sd' + (chr(ord('a') + x)),
|
||||
'zone': x % 3,
|
||||
'region': x % 2,
|
||||
'id': x}
|
||||
return 1, devs
|
||||
|
||||
def get_part_nodes(self, part):
|
||||
return self.get_nodes('blah')[1]
|
||||
|
||||
def get_more_nodes(self, nodes):
|
||||
def get_more_nodes(self, part):
|
||||
# replicas^2 is the true cap
|
||||
for x in xrange(self.replicas, min(self.replicas + self.max_more_nodes,
|
||||
self.replicas * self.replicas)):
|
||||
yield {'ip': '10.0.0.%s' % x, 'port': 1000 + x, 'device': 'sda'}
|
||||
yield {'ip': '10.0.0.%s' % x,
|
||||
'port': 1000 + x,
|
||||
'device': 'sda',
|
||||
'zone': x % 3,
|
||||
'region': x % 2,
|
||||
'id': x}
|
||||
|
||||
|
||||
class FakeMemcache(object):
|
||||
|
@ -1663,6 +1663,50 @@ class TestAffinityKeyFunction(unittest.TestCase):
|
||||
ids = [n['id'] for n in sorted(self.nodes, key=keyfn)]
|
||||
self.assertEqual([3, 2, 0, 1, 4, 5, 6, 7], ids)
|
||||
|
||||
|
||||
class TestAffinityLocalityPredicate(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.nodes = [dict(id=0, region=1, zone=1),
|
||||
dict(id=1, region=1, zone=2),
|
||||
dict(id=2, region=2, zone=1),
|
||||
dict(id=3, region=2, zone=2),
|
||||
dict(id=4, region=3, zone=1),
|
||||
dict(id=5, region=3, zone=2),
|
||||
dict(id=6, region=4, zone=0),
|
||||
dict(id=7, region=4, zone=1)]
|
||||
|
||||
def test_empty(self):
|
||||
pred = utils.affinity_locality_predicate('')
|
||||
self.assert_(pred is None)
|
||||
|
||||
def test_region(self):
|
||||
pred = utils.affinity_locality_predicate('r1')
|
||||
self.assert_(callable(pred))
|
||||
ids = [n['id'] for n in self.nodes if pred(n)]
|
||||
self.assertEqual([0, 1], ids)
|
||||
|
||||
def test_zone(self):
|
||||
pred = utils.affinity_locality_predicate('r1z1')
|
||||
self.assert_(callable(pred))
|
||||
ids = [n['id'] for n in self.nodes if pred(n)]
|
||||
self.assertEqual([0], ids)
|
||||
|
||||
def test_multiple(self):
|
||||
pred = utils.affinity_locality_predicate('r1, r3, r4z0')
|
||||
self.assert_(callable(pred))
|
||||
ids = [n['id'] for n in self.nodes if pred(n)]
|
||||
self.assertEqual([0, 1, 4, 5, 6], ids)
|
||||
|
||||
def test_invalid(self):
|
||||
self.assertRaises(ValueError,
|
||||
utils.affinity_locality_predicate, 'falafel')
|
||||
self.assertRaises(ValueError,
|
||||
utils.affinity_locality_predicate, 'r8zQ')
|
||||
self.assertRaises(ValueError,
|
||||
utils.affinity_locality_predicate, 'r2d2')
|
||||
self.assertRaises(ValueError,
|
||||
utils.affinity_locality_predicate, 'r1z1=1')
|
||||
|
||||
class TestGreenthreadSafeIterator(unittest.TestCase):
|
||||
def increment(self, iterable):
|
||||
plus_ones = []
|
||||
|
@ -29,7 +29,7 @@ class TestAccountController(unittest.TestCase):
|
||||
self.app = proxy_server.Application(None, FakeMemcache(),
|
||||
account_ring=FakeRing(),
|
||||
container_ring=FakeRing(),
|
||||
object_ring=FakeRing)
|
||||
object_ring=FakeRing())
|
||||
|
||||
def test_account_info_in_response_env(self):
|
||||
controller = proxy_server.AccountController(self.app, 'AUTH_bob')
|
||||
|
@ -29,7 +29,7 @@ class TestContainerController(unittest.TestCase):
|
||||
self.app = proxy_server.Application(None, FakeMemcache(),
|
||||
account_ring=FakeRing(),
|
||||
container_ring=FakeRing(),
|
||||
object_ring=FakeRing)
|
||||
object_ring=FakeRing())
|
||||
|
||||
def test_container_info_in_response_env(self):
|
||||
controller = proxy_server.ContainerController(self.app, 'a', 'c')
|
||||
|
67
test/unit/proxy/controllers/test_obj.py
Executable file
67
test/unit/proxy/controllers/test_obj.py
Executable file
@ -0,0 +1,67 @@
|
||||
#!/usr/bin/env python
|
||||
# Copyright (c) 2010-2012 OpenStack, LLC.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import mock
|
||||
import unittest
|
||||
|
||||
from swift.proxy import server as proxy_server
|
||||
from test.unit import fake_http_connect, FakeRing, FakeMemcache
|
||||
|
||||
|
||||
class TestObjControllerWriteAffinity(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.app = proxy_server.Application(
|
||||
None, FakeMemcache(), account_ring=FakeRing(),
|
||||
container_ring=FakeRing(), object_ring=FakeRing(max_more_nodes=9))
|
||||
self.app.request_node_count = lambda ring: 10000000
|
||||
self.app.sort_nodes = lambda l: l # stop shuffling the primary nodes
|
||||
|
||||
def test_iter_nodes_local_first_noops_when_no_affinity(self):
|
||||
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
|
||||
self.app.write_affinity_is_local_fn = None
|
||||
|
||||
all_nodes = self.app.object_ring.get_part_nodes(1)
|
||||
all_nodes.extend(self.app.object_ring.get_more_nodes(1))
|
||||
|
||||
local_first_nodes = list(controller.iter_nodes_local_first(
|
||||
self.app.object_ring, 1))
|
||||
|
||||
fr = FakeRing()
|
||||
|
||||
self.maxDiff = None
|
||||
|
||||
self.assertEqual(all_nodes, local_first_nodes)
|
||||
|
||||
def test_iter_nodes_local_first_moves_locals_first(self):
|
||||
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
|
||||
self.app.write_affinity_is_local_fn = (lambda node: node['region'] == 1)
|
||||
self.app.write_affinity_node_count = lambda ring: 4
|
||||
|
||||
all_nodes = self.app.object_ring.get_part_nodes(1)
|
||||
all_nodes.extend(self.app.object_ring.get_more_nodes(1))
|
||||
|
||||
local_first_nodes = list(controller.iter_nodes_local_first(
|
||||
self.app.object_ring, 1))
|
||||
|
||||
# the local nodes move up in the ordering
|
||||
self.assertEqual([1, 1, 1, 1],
|
||||
[node['region'] for node in local_first_nodes[:4]])
|
||||
# we don't skip any nodes
|
||||
self.assertEqual(sorted(all_nodes), sorted(local_first_nodes))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
@ -766,6 +766,78 @@ class TestObjectController(unittest.TestCase):
|
||||
res = controller.PUT(req)
|
||||
self.assertTrue(res.status.startswith('201 '))
|
||||
|
||||
def test_PUT_respects_write_affinity(self):
|
||||
written_to = []
|
||||
|
||||
def test_connect(ipaddr, port, device, partition, method, path,
|
||||
headers=None, query_string=None):
|
||||
if path == '/a/c/o.jpg':
|
||||
written_to.append((ipaddr, port, device))
|
||||
|
||||
with save_globals():
|
||||
def is_r0(node):
|
||||
return node['region'] == 0
|
||||
|
||||
self.app.object_ring.max_more_nodes = 100
|
||||
self.app.write_affinity_is_local_fn = is_r0
|
||||
self.app.write_affinity_node_count = lambda r: 3
|
||||
|
||||
controller = \
|
||||
proxy_server.ObjectController(self.app, 'a', 'c', 'o.jpg')
|
||||
set_http_connect(200, 200, 201, 201, 201,
|
||||
give_connect=test_connect)
|
||||
req = Request.blank('/a/c/o.jpg', {})
|
||||
req.content_length = 1
|
||||
req.body = 'a'
|
||||
self.app.memcache.store = {}
|
||||
res = controller.PUT(req)
|
||||
self.assertTrue(res.status.startswith('201 '))
|
||||
|
||||
self.assertEqual(3, len(written_to))
|
||||
for ip, port, device in written_to:
|
||||
# this is kind of a hokey test, but in FakeRing, the port is even
|
||||
# when the region is 0, and odd when the region is 1, so this test
|
||||
# asserts that we only wrote to nodes in region 0.
|
||||
self.assertEqual(0, port % 2)
|
||||
|
||||
def test_PUT_respects_write_affinity_with_507s(self):
|
||||
written_to = []
|
||||
|
||||
def test_connect(ipaddr, port, device, partition, method, path,
|
||||
headers=None, query_string=None):
|
||||
if path == '/a/c/o.jpg':
|
||||
written_to.append((ipaddr, port, device))
|
||||
|
||||
with save_globals():
|
||||
def is_r0(node):
|
||||
return node['region'] == 0
|
||||
|
||||
self.app.object_ring.max_more_nodes = 100
|
||||
self.app.write_affinity_is_local_fn = is_r0
|
||||
self.app.write_affinity_node_count = lambda r: 3
|
||||
|
||||
controller = \
|
||||
proxy_server.ObjectController(self.app, 'a', 'c', 'o.jpg')
|
||||
controller.error_limit(
|
||||
self.app.object_ring.get_part_nodes(1)[0], 'test')
|
||||
set_http_connect(200, 200, # account, container
|
||||
201, 201, 201, # 3 working backends
|
||||
give_connect=test_connect)
|
||||
req = Request.blank('/a/c/o.jpg', {})
|
||||
req.content_length = 1
|
||||
req.body = 'a'
|
||||
self.app.memcache.store = {}
|
||||
res = controller.PUT(req)
|
||||
self.assertTrue(res.status.startswith('201 '))
|
||||
|
||||
self.assertEqual(3, len(written_to))
|
||||
# this is kind of a hokey test, but in FakeRing, the port is even when
|
||||
# the region is 0, and odd when the region is 1, so this test asserts
|
||||
# that we wrote to 2 nodes in region 0, then went to 1 non-r0 node.
|
||||
self.assertEqual(0, written_to[0][1] % 2) # it's (ip, port, device)
|
||||
self.assertEqual(0, written_to[1][1] % 2)
|
||||
self.assertNotEqual(0, written_to[2][1] % 2)
|
||||
|
||||
def test_PUT_message_length_using_content_length(self):
|
||||
prolis = _test_sockets[0]
|
||||
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
||||
@ -2188,6 +2260,25 @@ class TestObjectController(unittest.TestCase):
|
||||
self.assertEquals(len(first_nodes), 6)
|
||||
self.assertEquals(len(second_nodes), 7)
|
||||
|
||||
def test_iter_nodes_with_custom_node_iter(self):
|
||||
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
|
||||
node_list = [dict(id=n) for n in xrange(10)]
|
||||
with nested(
|
||||
mock.patch.object(self.app, 'sort_nodes', lambda n: n),
|
||||
mock.patch.object(self.app, 'request_node_count',
|
||||
lambda r: 3)):
|
||||
got_nodes = list(controller.iter_nodes(self.app.object_ring, 0,
|
||||
node_iter=iter(node_list)))
|
||||
self.assertEqual(node_list[:3], got_nodes)
|
||||
|
||||
with nested(
|
||||
mock.patch.object(self.app, 'sort_nodes', lambda n: n),
|
||||
mock.patch.object(self.app, 'request_node_count',
|
||||
lambda r: 1000000)):
|
||||
got_nodes = list(controller.iter_nodes(self.app.object_ring, 0,
|
||||
node_iter=iter(node_list)))
|
||||
self.assertEqual(node_list, got_nodes)
|
||||
|
||||
def test_best_response_sets_etag(self):
|
||||
controller = proxy_server.ObjectController(self.app, 'account',
|
||||
'container', 'object')
|
||||
|
Loading…
x
Reference in New Issue
Block a user