Allow 1+ object-servers-per-disk deployment

Enabled by a new > 0 integer config value, "servers_per_port" in the
[DEFAULT] config section for object-server and/or replication server
configs.  The setting's integer value determines how many different
object-server workers handle requests for any single unique local port
in the ring.  In this mode, the parent swift-object-server process
continues to run as the original user (i.e. root if low-port binding
is required), binds to all ports as defined in the ring, and forks off
the specified number of workers per listen socket.  The child, per-port
servers drop privileges and behave pretty much how object-server workers
always have, except that because the ring has unique ports per disk, the
object-servers will only be handling requests for a single disk.  The
parent process detects dead servers and restarts them (with the correct
listen socket), starts missing servers when an updated ring file is
found with a device on the server with a new port, and kills extraneous
servers when their port is found to no longer be in the ring.  The ring
files are stat'ed at most every "ring_check_interval" seconds, as
configured in the object-server config (same default of 15s).

Immediately stopping all swift-object-worker processes still works by
sending the parent a SIGTERM.  Likewise, a SIGHUP to the parent process
still causes the parent process to close all listen sockets and exit,
allowing existing children to finish serving their existing requests.
The drop_privileges helper function now has an optional param to
suppress the setsid() call, which otherwise screws up the child workers'
process management.

The class method RingData.load() can be told to only load the ring
metadata (i.e. everything except replica2part2dev_id) with the optional
kwarg, header_only=True.  This is used to keep the parent and all
forked off workers from unnecessarily having full copies of all storage
policy rings in memory.

A new helper class, swift.common.storage_policy.BindPortsCache,
provides a method to return a set of all device ports in all rings for
the server on which it is instantiated (identified by its set of IP
addresses).  The BindPortsCache instance will track mtimes of ring
files, so they are not opened more frequently than necessary.

This patch includes enhancements to the probe tests and
object-replicator/object-reconstructor config plumbing to allow the
probe tests to work correctly both in the "normal" config (same IP but
unique ports for each SAIO "server") and a server-per-port setup where
each SAIO "server" must have a unique IP address and unique port per
disk within each "server".  The main probe tests only work with 4
servers and 4 disks, but you can see the difference in the rings for the
EC probe tests where there are 2 disks per server for a total of 8
disks.  Specifically, swift.common.ring.utils.is_local_device() will
ignore the ports when the "my_port" argument is None.  Then,
object-replicator and object-reconstructor both set self.bind_port to
None if server_per_port is enabled.  Bonus improvement for IPv6
addresses in is_local_device().

This PR for vagrant-swift-all-in-one will aid in testing this patch:
https://github.com/swiftstack/vagrant-swift-all-in-one/pull/16/

Also allow SAIO to answer is_local_device() better; common SAIO setups
have multiple "servers" all on the same host with different ports for
the different "servers" (which happen to match the IPs specified in the
rings for the devices on each of those "servers").

However, you can configure the SAIO to have different localhost IP
addresses (e.g. 127.0.0.1, 127.0.0.2, etc.) in the ring and in the
servers' config files' bind_ip setting.

This new whataremyips() implementation combined with a little plumbing
allows is_local_device() to accurately answer, even on an SAIO.

In the default case (an unspecified bind_ip defaults to '0.0.0.0') as
well as an explict "bind to everything" like '0.0.0.0' or '::',
whataremyips() behaves as it always has, returning all IP addresses for
the server.

Also updated probe tests to handle each "server" in the SAIO having a
unique IP address.

For some (noisy) benchmarks that show servers_per_port=X is at least as
good as the same number of "normal" workers:
https://gist.github.com/dbishop/c214f89ca708a6b1624a#file-summary-md

Benchmarks showing the benefits of I/O isolation with a small number of
slow disks:
https://gist.github.com/dbishop/fd0ab067babdecfb07ca#file-results-md

If you were wondering what the overhead of threads_per_disk looks like:
https://gist.github.com/dbishop/1d14755fedc86a161718#file-tabular_results-md

DocImpact

Change-Id: I2239a4000b41a7e7cc53465ce794af49d44796c6
This commit is contained in:
Darrell Bishop 2015-05-14 22:14:15 -07:00
parent 65fce49b3b
commit df134df901
29 changed files with 1979 additions and 244 deletions

View File

@ -139,6 +139,72 @@ swift-ring-builder with no options will display help text with available
commands and options. More information on how the ring works internally commands and options. More information on how the ring works internally
can be found in the :doc:`Ring Overview <overview_ring>`. can be found in the :doc:`Ring Overview <overview_ring>`.
.. _server-per-port-configuration:
-------------------------------
Running object-servers Per Disk
-------------------------------
The lack of true asynchronous file I/O on Linux leaves the object-server
workers vulnerable to misbehaving disks. Because any object-server worker can
service a request for any disk, and a slow I/O request blocks the eventlet hub,
a single slow disk can impair an entire storage node. This also prevents
object servers from fully utilizing all their disks during heavy load.
The :ref:`threads_per_disk <object-server-options>` option was one way to
address this, but came with severe performance overhead which was worse
than the benefit of I/O isolation. Any clusters using threads_per_disk should
switch to using `servers_per_port`.
Another way to get full I/O isolation is to give each disk on a storage node a
different port in the storage policy rings. Then set the
:ref:`servers_per_port <object-server-default-options>`
option in the object-server config. NOTE: while the purpose of this config
setting is to run one or more object-server worker processes per *disk*, the
implementation just runs object-servers per unique port of local devices in the
rings. The deployer must combine this option with appropriately-configured
rings to benefit from this feature.
Here's an example (abbreviated) old-style ring (2 node cluster with 2 disks
each)::
Devices: id region zone ip address port replication ip replication port name
0 1 1 1.1.0.1 6000 1.1.0.1 6000 d1
1 1 1 1.1.0.1 6000 1.1.0.1 6000 d2
2 1 2 1.1.0.2 6000 1.1.0.2 6000 d3
3 1 2 1.1.0.2 6000 1.1.0.2 6000 d4
And here's the same ring set up for `servers_per_port`::
Devices: id region zone ip address port replication ip replication port name
0 1 1 1.1.0.1 6000 1.1.0.1 6000 d1
1 1 1 1.1.0.1 6001 1.1.0.1 6001 d2
2 1 2 1.1.0.2 6000 1.1.0.2 6000 d3
3 1 2 1.1.0.2 6001 1.1.0.2 6001 d4
When migrating from normal to `servers_per_port`, perform these steps in order:
#. Upgrade Swift code to a version capable of doing `servers_per_port`.
#. Enable `servers_per_port` with a > 0 value
#. Restart `swift-object-server` processes with a SIGHUP. At this point, you
will have the `servers_per_port` number of `swift-object-server` processes
serving all requests for all disks on each node. This preserves
availability, but you should perform the next step as quickly as possible.
#. Push out new rings that actually have different ports per disk on each
server. One of the ports in the new ring should be the same as the port
used in the old ring ("6000" in the example above). This will cover
existing proxy-server processes who haven't loaded the new ring yet. They
can still talk to any storage node regardless of whether or not that
storage node has loaded the ring and started object-server processes on the
new ports.
If you do not run a separate object-server for replication, then this setting
must be available to the object-replicator and object-reconstructor (i.e.
appear in the [DEFAULT] config section).
.. _general-service-configuration: .. _general-service-configuration:
----------------------------- -----------------------------
@ -149,14 +215,14 @@ Most Swift services fall into two categories. Swift's wsgi servers and
background daemons. background daemons.
For more information specific to the configuration of Swift's wsgi servers For more information specific to the configuration of Swift's wsgi servers
with paste deploy see :ref:`general-server-configuration` with paste deploy see :ref:`general-server-configuration`.
Configuration for servers and daemons can be expressed together in the same Configuration for servers and daemons can be expressed together in the same
file for each type of server, or separately. If a required section for the file for each type of server, or separately. If a required section for the
service trying to start is missing there will be an error. The sections not service trying to start is missing there will be an error. The sections not
used by the service are ignored. used by the service are ignored.
Consider the example of an object storage node. By convention configuration Consider the example of an object storage node. By convention, configuration
for the object-server, object-updater, object-replicator, and object-auditor for the object-server, object-updater, object-replicator, and object-auditor
exist in a single file ``/etc/swift/object-server.conf``:: exist in a single file ``/etc/swift/object-server.conf``::
@ -323,7 +389,7 @@ max_header_size 8192 max_header_size is the max number of bytes in
tokens including more than 7 catalog entries. tokens including more than 7 catalog entries.
See also include_service_catalog in See also include_service_catalog in
proxy-server.conf-sample (documented in proxy-server.conf-sample (documented in
overview_auth.rst) overview_auth.rst).
=================== ========== ============================================= =================== ========== =============================================
--------------------------- ---------------------------
@ -335,6 +401,8 @@ etc/object-server.conf-sample in the source code repository.
The following configuration options are available: The following configuration options are available:
.. _object-server-default-options:
[DEFAULT] [DEFAULT]
=================== ========== ============================================= =================== ========== =============================================
@ -353,12 +421,30 @@ workers auto Override the number of pre-forked workers
should be an integer, zero means no fork. If should be an integer, zero means no fork. If
unset, it will try to default to the number unset, it will try to default to the number
of effective cpu cores and fallback to one. of effective cpu cores and fallback to one.
Increasing the number of workers may reduce Increasing the number of workers helps slow
the possibility of slow file system filesystem operations in one request from
operations in one request from negatively negatively impacting other requests, but only
impacting other requests, but may not be as the :ref:`servers_per_port
efficient as tuning :ref:`threads_per_disk <server-per-port-configuration>`
<object-server-options>` option provides complete I/O isolation with
no measurable overhead.
servers_per_port 0 If each disk in each storage policy ring has
unique port numbers for its "ip" value, you
can use this setting to have each
object-server worker only service requests
for the single disk matching the port in the
ring. The value of this setting determines
how many worker processes run for each port
(disk) in the ring. If you have 24 disks
per server, and this setting is 4, then
each storage node will have 1 + (24 * 4) =
97 total object-server processes running.
This gives complete I/O isolation, drastically
reducing the impact of slow disks on storage
node performance. The object-replicator and
object-reconstructor need to see this setting
too, so it must be in the [DEFAULT] section.
See :ref:`server-per-port-configuration`.
max_clients 1024 Maximum number of clients one worker can max_clients 1024 Maximum number of clients one worker can
process simultaneously (it will actually process simultaneously (it will actually
accept(2) N + 1). Setting this to one (1) accept(2) N + 1). Setting this to one (1)
@ -421,13 +507,12 @@ keep_cache_private false Allow non-public objects to stay
threads_per_disk 0 Size of the per-disk thread pool threads_per_disk 0 Size of the per-disk thread pool
used for performing disk I/O. The used for performing disk I/O. The
default of 0 means to not use a default of 0 means to not use a
per-disk thread pool. It is per-disk thread pool.
recommended to keep this value This option is no longer
small, as large values can result recommended and the
in high read latencies due to :ref:`servers_per_port
large queue depths. A good <server-per-port-configuration>`
starting point is 4 threads per should be used instead.
disk.
replication_concurrency 4 Set to restrict the number of replication_concurrency 4 Set to restrict the number of
concurrent incoming REPLICATION concurrent incoming REPLICATION
requests; set to 0 for unlimited requests; set to 0 for unlimited
@ -562,7 +647,7 @@ workers auto Override the number of pre-forked workers
the possibility of slow file system the possibility of slow file system
operations in one request from negatively operations in one request from negatively
impacting other requests. See impacting other requests. See
:ref:`general-service-tuning` :ref:`general-service-tuning`.
max_clients 1024 Maximum number of clients one worker can max_clients 1024 Maximum number of clients one worker can
process simultaneously (it will actually process simultaneously (it will actually
accept(2) N + 1). Setting this to one (1) accept(2) N + 1). Setting this to one (1)
@ -690,7 +775,7 @@ workers auto Override the number of pre-forked workers
the possibility of slow file system the possibility of slow file system
operations in one request from negatively operations in one request from negatively
impacting other requests. See impacting other requests. See
:ref:`general-service-tuning` :ref:`general-service-tuning`.
max_clients 1024 Maximum number of clients one worker can max_clients 1024 Maximum number of clients one worker can
process simultaneously (it will actually process simultaneously (it will actually
accept(2) N + 1). Setting this to one (1) accept(2) N + 1). Setting this to one (1)
@ -813,7 +898,7 @@ workers auto Override the number of
will try to default to the will try to default to the
number of effective cpu cores number of effective cpu cores
and fallback to one. See and fallback to one. See
:ref:`general-service-tuning` :ref:`general-service-tuning`.
max_clients 1024 Maximum number of clients one max_clients 1024 Maximum number of clients one
worker can process worker can process
simultaneously (it will simultaneously (it will

View File

@ -12,9 +12,16 @@ bind_port = 6000
# expiring_objects_account_name = expiring_objects # expiring_objects_account_name = expiring_objects
# #
# Use an integer to override the number of pre-forked processes that will # Use an integer to override the number of pre-forked processes that will
# accept connections. # accept connections. NOTE: if servers_per_port is set, this setting is
# ignored.
# workers = auto # workers = auto
# #
# Make object-server run this many worker processes per unique port of
# "local" ring devices across all storage policies. This can help provide
# the isolation of threads_per_disk without the severe overhead. The default
# value of 0 disables this feature.
# servers_per_port = 0
#
# Maximum concurrent requests per worker # Maximum concurrent requests per worker
# max_clients = 1024 # max_clients = 1024
# #

View File

@ -69,7 +69,7 @@ class AccountReaper(Daemon):
self.object_ring = None self.object_ring = None
self.node_timeout = int(conf.get('node_timeout', 10)) self.node_timeout = int(conf.get('node_timeout', 10))
self.conn_timeout = float(conf.get('conn_timeout', 0.5)) self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.myips = whataremyips() self.myips = whataremyips(conf.get('bind_ip', '0.0.0.0'))
self.concurrency = int(conf.get('concurrency', 25)) self.concurrency = int(conf.get('concurrency', 25))
self.container_concurrency = self.object_concurrency = \ self.container_concurrency = self.object_concurrency = \
sqrt(self.concurrency) sqrt(self.concurrency)

View File

@ -154,6 +154,7 @@ class Replicator(Daemon):
self.logger = logger or get_logger(conf, log_route='replicator') self.logger = logger or get_logger(conf, log_route='replicator')
self.root = conf.get('devices', '/srv/node') self.root = conf.get('devices', '/srv/node')
self.mount_check = config_true_value(conf.get('mount_check', 'true')) self.mount_check = config_true_value(conf.get('mount_check', 'true'))
self.bind_ip = conf.get('bind_ip', '0.0.0.0')
self.port = int(conf.get('bind_port', self.default_port)) self.port = int(conf.get('bind_port', self.default_port))
concurrency = int(conf.get('concurrency', 8)) concurrency = int(conf.get('concurrency', 8))
self.cpool = GreenPool(size=concurrency) self.cpool = GreenPool(size=concurrency)
@ -580,7 +581,7 @@ class Replicator(Daemon):
"""Run a replication pass once.""" """Run a replication pass once."""
self._zero_stats() self._zero_stats()
dirs = [] dirs = []
ips = whataremyips() ips = whataremyips(self.bind_ip)
if not ips: if not ips:
self.logger.error(_('ERROR Failed to get my own IPs?')) self.logger.error(_('ERROR Failed to get my own IPs?'))
return return

View File

@ -44,10 +44,29 @@ class RingData(object):
dev.setdefault("region", 1) dev.setdefault("region", 1)
@classmethod @classmethod
def deserialize_v1(cls, gz_file): def deserialize_v1(cls, gz_file, metadata_only=False):
"""
Deserialize a v1 ring file into a dictionary with `devs`, `part_shift`,
and `replica2part2dev_id` keys.
If the optional kwarg `metadata_only` is True, then the
`replica2part2dev_id` is not loaded and that key in the returned
dictionary just has the value `[]`.
:param file gz_file: An opened file-like object which has already
consumed the 6 bytes of magic and version.
:param bool metadata_only: If True, only load `devs` and `part_shift`
:returns: A dict containing `devs`, `part_shift`, and
`replica2part2dev_id`
"""
json_len, = struct.unpack('!I', gz_file.read(4)) json_len, = struct.unpack('!I', gz_file.read(4))
ring_dict = json.loads(gz_file.read(json_len)) ring_dict = json.loads(gz_file.read(json_len))
ring_dict['replica2part2dev_id'] = [] ring_dict['replica2part2dev_id'] = []
if metadata_only:
return ring_dict
partition_count = 1 << (32 - ring_dict['part_shift']) partition_count = 1 << (32 - ring_dict['part_shift'])
for x in xrange(ring_dict['replica_count']): for x in xrange(ring_dict['replica_count']):
ring_dict['replica2part2dev_id'].append( ring_dict['replica2part2dev_id'].append(
@ -55,11 +74,12 @@ class RingData(object):
return ring_dict return ring_dict
@classmethod @classmethod
def load(cls, filename): def load(cls, filename, metadata_only=False):
""" """
Load ring data from a file. Load ring data from a file.
:param filename: Path to a file serialized by the save() method. :param filename: Path to a file serialized by the save() method.
:param bool metadata_only: If True, only load `devs` and `part_shift`.
:returns: A RingData instance containing the loaded data. :returns: A RingData instance containing the loaded data.
""" """
gz_file = GzipFile(filename, 'rb') gz_file = GzipFile(filename, 'rb')
@ -70,15 +90,18 @@ class RingData(object):
# See if the file is in the new format # See if the file is in the new format
magic = gz_file.read(4) magic = gz_file.read(4)
if magic == 'R1NG': if magic == 'R1NG':
version, = struct.unpack('!H', gz_file.read(2)) format_version, = struct.unpack('!H', gz_file.read(2))
if version == 1: if format_version == 1:
ring_data = cls.deserialize_v1(gz_file) ring_data = cls.deserialize_v1(
gz_file, metadata_only=metadata_only)
else: else:
raise Exception('Unknown ring format version %d' % version) raise Exception('Unknown ring format version %d' %
format_version)
else: else:
# Assume old-style pickled ring # Assume old-style pickled ring
gz_file.seek(0) gz_file.seek(0)
ring_data = pickle.load(gz_file) ring_data = pickle.load(gz_file)
if not hasattr(ring_data, 'devs'): if not hasattr(ring_data, 'devs'):
ring_data = RingData(ring_data['replica2part2dev_id'], ring_data = RingData(ring_data['replica2part2dev_id'],
ring_data['devs'], ring_data['part_shift']) ring_data['devs'], ring_data['part_shift'])

View File

@ -235,9 +235,14 @@ def is_local_device(my_ips, my_port, dev_ip, dev_port):
Return True if the provided dev_ip and dev_port are among the IP Return True if the provided dev_ip and dev_port are among the IP
addresses specified in my_ips and my_port respectively. addresses specified in my_ips and my_port respectively.
To support accurate locality determination in the server-per-port
deployment, when my_port is None, only IP addresses are used for
determining locality (dev_port is ignored).
If dev_ip is a hostname then it is first translated to an IP If dev_ip is a hostname then it is first translated to an IP
address before checking it against my_ips. address before checking it against my_ips.
""" """
candidate_ips = []
if not is_valid_ip(dev_ip) and is_valid_hostname(dev_ip): if not is_valid_ip(dev_ip) and is_valid_hostname(dev_ip):
try: try:
# get the ip for this host; use getaddrinfo so that # get the ip for this host; use getaddrinfo so that
@ -248,12 +253,19 @@ def is_local_device(my_ips, my_port, dev_ip, dev_port):
dev_ip = addr[4][0] # get the ip-address dev_ip = addr[4][0] # get the ip-address
if family == socket.AF_INET6: if family == socket.AF_INET6:
dev_ip = expand_ipv6(dev_ip) dev_ip = expand_ipv6(dev_ip)
if dev_ip in my_ips and dev_port == my_port: candidate_ips.append(dev_ip)
return True
return False
except socket.gaierror: except socket.gaierror:
return False return False
return dev_ip in my_ips and dev_port == my_port else:
if is_valid_ipv6(dev_ip):
dev_ip = expand_ipv6(dev_ip)
candidate_ips = [dev_ip]
for dev_ip in candidate_ips:
if dev_ip in my_ips and (my_port is None or dev_port == my_port):
return True
return False
def parse_search_value(search_value): def parse_search_value(search_value):

View File

@ -12,11 +12,13 @@
# limitations under the License. # limitations under the License.
from ConfigParser import ConfigParser from ConfigParser import ConfigParser
import textwrap import os
import string import string
import textwrap
from swift.common.utils import config_true_value, SWIFT_CONF_FILE from swift.common.utils import (
from swift.common.ring import Ring config_true_value, SWIFT_CONF_FILE, whataremyips)
from swift.common.ring import Ring, RingData
from swift.common.utils import quorum_size from swift.common.utils import quorum_size
from swift.common.exceptions import RingValidationError from swift.common.exceptions import RingValidationError
from pyeclib.ec_iface import ECDriver, ECDriverError, VALID_EC_TYPES from pyeclib.ec_iface import ECDriver, ECDriverError, VALID_EC_TYPES
@ -30,6 +32,53 @@ EC_POLICY = 'erasure_coding'
DEFAULT_EC_OBJECT_SEGMENT_SIZE = 1048576 DEFAULT_EC_OBJECT_SEGMENT_SIZE = 1048576
class BindPortsCache(object):
def __init__(self, swift_dir, bind_ip):
self.swift_dir = swift_dir
self.mtimes_by_ring_path = {}
self.portsets_by_ring_path = {}
self.my_ips = set(whataremyips(bind_ip))
def all_bind_ports_for_node(self):
"""
Given an iterable of IP addresses identifying a storage backend server,
return a set of all bind ports defined in all rings for this storage
backend server.
The caller is responsible for not calling this method (which performs
at least a stat on all ring files) too frequently.
"""
# NOTE: we don't worry about disappearing rings here because you can't
# ever delete a storage policy.
for policy in POLICIES:
# NOTE: we must NOT use policy.load_ring to load the ring. Users
# of this utility function will not need the actual ring data, just
# the bind ports.
#
# This is duplicated with Ring.__init__ just a bit...
serialized_path = os.path.join(self.swift_dir,
policy.ring_name + '.ring.gz')
try:
new_mtime = os.path.getmtime(serialized_path)
except OSError:
continue
old_mtime = self.mtimes_by_ring_path.get(serialized_path)
if not old_mtime or old_mtime != new_mtime:
self.portsets_by_ring_path[serialized_path] = set(
dev['port']
for dev in RingData.load(serialized_path,
metadata_only=True).devs
if dev and dev['ip'] in self.my_ips)
self.mtimes_by_ring_path[serialized_path] = new_mtime
# No "break" here so that the above line will update the
# mtimes_by_ring_path entry for any ring that changes, not just
# the first one we notice.
# Return the requested set of ports from our (now-freshened) cache
return reduce(set.union, self.portsets_by_ring_path.values(), set())
class PolicyError(ValueError): class PolicyError(ValueError):
def __init__(self, msg, index=None): def __init__(self, msg, index=None):
@ -291,7 +340,7 @@ class ECStoragePolicy(BaseStoragePolicy):
if ec_type not in VALID_EC_TYPES: if ec_type not in VALID_EC_TYPES:
raise PolicyError('Wrong ec_type %s for policy %s, should be one' raise PolicyError('Wrong ec_type %s for policy %s, should be one'
' of "%s"' % (ec_type, self.name, ' of "%s"' % (ec_type, self.name,
', '.join(VALID_EC_TYPES))) ', '.join(VALID_EC_TYPES)))
self._ec_type = ec_type self._ec_type = ec_type
# Define _ec_ndata as the number of EC data fragments # Define _ec_ndata as the number of EC data fragments
@ -427,8 +476,9 @@ class ECStoragePolicy(BaseStoragePolicy):
if nodes_configured != (self.ec_ndata + self.ec_nparity): if nodes_configured != (self.ec_ndata + self.ec_nparity):
raise RingValidationError( raise RingValidationError(
'EC ring for policy %s needs to be configured with ' 'EC ring for policy %s needs to be configured with '
'exactly %d nodes. Got %d.' % (self.name, 'exactly %d nodes. Got %d.' % (
self.ec_ndata + self.ec_nparity, nodes_configured)) self.name, self.ec_ndata + self.ec_nparity,
nodes_configured))
@property @property
def quorum(self): def quorum(self):

View File

@ -1589,7 +1589,7 @@ def get_hub():
return None return None
def drop_privileges(user): def drop_privileges(user, call_setsid=True):
""" """
Sets the userid/groupid of the current process, get session leader, etc. Sets the userid/groupid of the current process, get session leader, etc.
@ -1602,10 +1602,11 @@ def drop_privileges(user):
os.setgid(user[3]) os.setgid(user[3])
os.setuid(user[2]) os.setuid(user[2])
os.environ['HOME'] = user[5] os.environ['HOME'] = user[5]
try: if call_setsid:
os.setsid() try:
except OSError: os.setsid()
pass except OSError:
pass
os.chdir('/') # in case you need to rmdir on where you started the daemon os.chdir('/') # in case you need to rmdir on where you started the daemon
os.umask(0o22) # ensure files are created with the correct privileges os.umask(0o22) # ensure files are created with the correct privileges
@ -1706,12 +1707,28 @@ def expand_ipv6(address):
return socket.inet_ntop(socket.AF_INET6, packed_ip) return socket.inet_ntop(socket.AF_INET6, packed_ip)
def whataremyips(): def whataremyips(bind_ip=None):
""" """
Get the machine's ip addresses Get "our" IP addresses ("us" being the set of services configured by
one *.conf file). If our REST listens on a specific address, return it.
Otherwise, if listen on '0.0.0.0' or '::' return all addresses, including
the loopback.
:param str bind_ip: Optional bind_ip from a config file; may be IP address
or hostname.
:returns: list of Strings of ip addresses :returns: list of Strings of ip addresses
""" """
if bind_ip:
# See if bind_ip is '0.0.0.0'/'::'
try:
_, _, _, _, sockaddr = socket.getaddrinfo(
bind_ip, None, 0, socket.SOCK_STREAM, 0,
socket.AI_NUMERICHOST)[0]
if sockaddr[0] not in ('0.0.0.0', '::'):
return [bind_ip]
except socket.gaierror:
pass
addresses = [] addresses = []
for interface in netifaces.interfaces(): for interface in netifaces.interfaces():
try: try:

View File

@ -29,12 +29,13 @@ from textwrap import dedent
import eventlet import eventlet
import eventlet.debug import eventlet.debug
from eventlet import greenio, GreenPool, sleep, wsgi, listen from eventlet import greenio, GreenPool, sleep, wsgi, listen, Timeout
from paste.deploy import loadwsgi from paste.deploy import loadwsgi
from eventlet.green import socket, ssl from eventlet.green import socket, ssl, os as green_os
from urllib import unquote from urllib import unquote
from swift.common import utils, constraints from swift.common import utils, constraints
from swift.common.storage_policy import BindPortsCache
from swift.common.swob import Request from swift.common.swob import Request
from swift.common.utils import capture_stdio, disable_fallocate, \ from swift.common.utils import capture_stdio, disable_fallocate, \
drop_privileges, get_logger, NullLogger, config_true_value, \ drop_privileges, get_logger, NullLogger, config_true_value, \
@ -437,10 +438,414 @@ def run_server(conf, logger, sock, global_conf=None):
pool.waitall() pool.waitall()
#TODO(clayg): pull more pieces of this to test more class WorkersStrategy(object):
"""
WSGI server management strategy object for a single bind port and listen
socket shared by a configured number of forked-off workers.
Used in :py:func:`run_wsgi`.
:param dict conf: Server configuration dictionary.
:param logger: The server's :py:class:`~swift.common.utils.LogAdaptor`
object.
"""
def __init__(self, conf, logger):
self.conf = conf
self.logger = logger
self.sock = None
self.children = []
self.worker_count = config_auto_int_value(conf.get('workers'),
CPU_COUNT)
def loop_timeout(self):
"""
:returns: None; to block in :py:func:`green.os.wait`
"""
return None
def bind_ports(self):
"""
Bind the one listen socket for this strategy and drop privileges
(since the parent process will never need to bind again).
"""
try:
self.sock = get_socket(self.conf)
except ConfigFilePortError:
msg = 'bind_port wasn\'t properly set in the config file. ' \
'It must be explicitly set to a valid port number.'
return msg
drop_privileges(self.conf.get('user', 'swift'))
def no_fork_sock(self):
"""
Return a server listen socket if the server should run in the
foreground (no fork).
"""
# Useful for profiling [no forks].
if self.worker_count == 0:
return self.sock
def new_worker_socks(self):
"""
Yield a sequence of (socket, opqaue_data) tuples for each server which
should be forked-off and started.
The opaque_data item for each socket will passed into the
:py:meth:`log_sock_exit` and :py:meth:`register_worker_start` methods
where it will be ignored.
"""
while len(self.children) < self.worker_count:
yield self.sock, None
def post_fork_hook(self):
"""
Perform any initialization in a forked-off child process prior to
starting the wsgi server.
"""
pass
def log_sock_exit(self, sock, _unused):
"""
Log a server's exit.
:param socket sock: The listen socket for the worker just started.
:param _unused: The socket's opaque_data yielded by
:py:meth:`new_worker_socks`.
"""
self.logger.notice('Child %d exiting normally' % os.getpid())
def register_worker_start(self, sock, _unused, pid):
"""
Called when a new worker is started.
:param socket sock: The listen socket for the worker just started.
:param _unused: The socket's opaque_data yielded by new_worker_socks().
:param int pid: The new worker process' PID
"""
self.logger.notice('Started child %s' % pid)
self.children.append(pid)
def register_worker_exit(self, pid):
"""
Called when a worker has exited.
:param int pid: The PID of the worker that exited.
"""
self.logger.error('Removing dead child %s' % pid)
self.children.remove(pid)
def shutdown_sockets(self):
"""
Shutdown any listen sockets.
"""
greenio.shutdown_safe(self.sock)
self.sock.close()
class PortPidState(object):
"""
A helper class for :py:class:`ServersPerPortStrategy` to track listen
sockets and PIDs for each port.
:param int servers_per_port: The configured number of servers per port.
:param logger: The server's :py:class:`~swift.common.utils.LogAdaptor`
"""
def __init__(self, servers_per_port, logger):
self.servers_per_port = servers_per_port
self.logger = logger
self.sock_data_by_port = {}
def sock_for_port(self, port):
"""
:param int port: The port whose socket is desired.
:returns: The bound listen socket for the given port.
"""
return self.sock_data_by_port[port]['sock']
def port_for_sock(self, sock):
"""
:param socket sock: A tracked bound listen socket
:returns: The port the socket is bound to.
"""
for port, sock_data in self.sock_data_by_port.iteritems():
if sock_data['sock'] == sock:
return port
def _pid_to_port_and_index(self, pid):
for port, sock_data in self.sock_data_by_port.iteritems():
for server_idx, a_pid in enumerate(sock_data['pids']):
if pid == a_pid:
return port, server_idx
def port_index_pairs(self):
"""
:returns: A set of (port, server_idx) tuples for currently-tracked
ports, sockets, and PIDs.
"""
current_port_index_pairs = set()
for port, pid_state in self.sock_data_by_port.iteritems():
current_port_index_pairs |= set(
(port, i)
for i, pid in enumerate(pid_state['pids'])
if pid is not None)
return current_port_index_pairs
def track_port(self, port, sock):
"""
Start tracking servers for the given port and listen socket.
:param int port: The port to start tracking
:param socket sock: The bound listen socket for the port.
"""
self.sock_data_by_port[port] = {
'sock': sock,
'pids': [None] * self.servers_per_port,
}
def not_tracking(self, port):
"""
Return True if the specified port is not being tracked.
:param int port: A port to check.
"""
return port not in self.sock_data_by_port
def all_socks(self):
"""
Yield all current listen sockets.
"""
for orphan_data in self.sock_data_by_port.itervalues():
yield orphan_data['sock']
def forget_port(self, port):
"""
Idempotently forget a port, closing the listen socket at most once.
"""
orphan_data = self.sock_data_by_port.pop(port, None)
if orphan_data:
greenio.shutdown_safe(orphan_data['sock'])
orphan_data['sock'].close()
self.logger.notice('Closing unnecessary sock for port %d', port)
def add_pid(self, port, index, pid):
self.sock_data_by_port[port]['pids'][index] = pid
def forget_pid(self, pid):
"""
Idempotently forget a PID. It's okay if the PID is no longer in our
data structure (it could have been removed by the "orphan port" removal
in :py:meth:`new_worker_socks`).
:param int pid: The PID which exited.
"""
port_server_idx = self._pid_to_port_and_index(pid)
if port_server_idx is None:
# This method can lose a race with the "orphan port" removal, when
# a ring reload no longer contains a port. So it's okay if we were
# unable to find a (port, server_idx) pair.
return
dead_port, server_idx = port_server_idx
self.logger.error('Removing dead child %d (PID: %s) for port %s',
server_idx, pid, dead_port)
self.sock_data_by_port[dead_port]['pids'][server_idx] = None
class ServersPerPortStrategy(object):
"""
WSGI server management strategy object for an object-server with one listen
port per unique local port in the storage policy rings. The
`servers_per_port` integer config setting determines how many workers are
run per port.
Used in :py:func:`run_wsgi`.
:param dict conf: Server configuration dictionary.
:param logger: The server's :py:class:`~swift.common.utils.LogAdaptor`
object.
:param int servers_per_port: The number of workers to run per port.
"""
def __init__(self, conf, logger, servers_per_port):
self.conf = conf
self.logger = logger
self.servers_per_port = servers_per_port
self.swift_dir = conf.get('swift_dir', '/etc/swift')
self.ring_check_interval = int(conf.get('ring_check_interval', 15))
self.port_pid_state = PortPidState(servers_per_port, logger)
bind_ip = conf.get('bind_ip', '0.0.0.0')
self.cache = BindPortsCache(self.swift_dir, bind_ip)
def _reload_bind_ports(self):
self.bind_ports = self.cache.all_bind_ports_for_node()
def _bind_port(self, port):
new_conf = self.conf.copy()
new_conf['bind_port'] = port
sock = get_socket(new_conf)
self.port_pid_state.track_port(port, sock)
def loop_timeout(self):
"""
:returns: The time to wait for a child to exit before checking for
reloaded rings (new ports).
"""
return self.ring_check_interval
def bind_ports(self):
"""
Bind one listen socket per unique local storage policy ring port. Then
do all the work of drop_privileges except the actual dropping of
privileges (each forked-off worker will do that post-fork in
:py:meth:`post_fork_hook`).
"""
self._reload_bind_ports()
for port in self.bind_ports:
self._bind_port(port)
# The workers strategy drops privileges here, which we obviously cannot
# do if we want to support binding to low ports. But we do want some
# of the actions that drop_privileges did.
try:
os.setsid()
except OSError:
pass
# In case you need to rmdir where you started the daemon:
os.chdir('/')
# Ensure files are created with the correct privileges:
os.umask(0o22)
def no_fork_sock(self):
"""
This strategy does not support running in the foreground.
"""
pass
def new_worker_socks(self):
"""
Yield a sequence of (socket, server_idx) tuples for each server which
should be forked-off and started.
Any sockets for "orphaned" ports no longer in any ring will be closed
(causing their associated workers to gracefully exit) after all new
sockets have been yielded.
The server_idx item for each socket will passed into the
:py:meth:`log_sock_exit` and :py:meth:`register_worker_start` methods.
"""
self._reload_bind_ports()
desired_port_index_pairs = set(
(p, i) for p in self.bind_ports
for i in range(self.servers_per_port))
current_port_index_pairs = self.port_pid_state.port_index_pairs()
if desired_port_index_pairs != current_port_index_pairs:
# Orphan ports are ports which had object-server processes running,
# but which no longer appear in the ring. We'll kill them after we
# start missing workers.
orphan_port_index_pairs = current_port_index_pairs - \
desired_port_index_pairs
# Fork off worker(s) for every port who's supposed to have
# worker(s) but doesn't
missing_port_index_pairs = desired_port_index_pairs - \
current_port_index_pairs
for port, server_idx in sorted(missing_port_index_pairs):
if self.port_pid_state.not_tracking(port):
try:
self._bind_port(port)
except Exception as e:
self.logger.critical('Unable to bind to port %d: %s',
port, e)
continue
yield self.port_pid_state.sock_for_port(port), server_idx
for orphan_pair in orphan_port_index_pairs:
# For any port in orphan_port_index_pairs, it is guaranteed
# that there should be no listen socket for that port, so we
# can close and forget them.
self.port_pid_state.forget_port(orphan_pair[0])
def post_fork_hook(self):
"""
Called in each child process, prior to starting the actual wsgi server,
to drop privileges.
"""
drop_privileges(self.conf.get('user', 'swift'), call_setsid=False)
def log_sock_exit(self, sock, server_idx):
"""
Log a server's exit.
"""
port = self.port_pid_state.port_for_sock(sock)
self.logger.notice('Child %d (PID %d, port %d) exiting normally',
server_idx, os.getpid(), port)
def register_worker_start(self, sock, server_idx, pid):
"""
Called when a new worker is started.
:param socket sock: The listen socket for the worker just started.
:param server_idx: The socket's server_idx as yielded by
:py:meth:`new_worker_socks`.
:param int pid: The new worker process' PID
"""
port = self.port_pid_state.port_for_sock(sock)
self.logger.notice('Started child %d (PID %d) for port %d',
server_idx, pid, port)
self.port_pid_state.add_pid(port, server_idx, pid)
def register_worker_exit(self, pid):
"""
Called when a worker has exited.
:param int pid: The PID of the worker that exited.
"""
self.port_pid_state.forget_pid(pid)
def shutdown_sockets(self):
"""
Shutdown any listen sockets.
"""
for sock in self.port_pid_state.all_socks():
greenio.shutdown_safe(sock)
sock.close()
def run_wsgi(conf_path, app_section, *args, **kwargs): def run_wsgi(conf_path, app_section, *args, **kwargs):
""" """
Runs the server using the specified number of workers. Runs the server according to some strategy. The default strategy runs a
specified number of workers in pre-fork model. The object-server (only)
may use a servers-per-port strategy if its config has a servers_per_port
setting with a value greater than zero.
:param conf_path: Path to paste.deploy style configuration file/directory :param conf_path: Path to paste.deploy style configuration file/directory
:param app_section: App name from conf file to load config from :param app_section: App name from conf file to load config from
@ -454,17 +859,22 @@ def run_wsgi(conf_path, app_section, *args, **kwargs):
print(e) print(e)
return 1 return 1
# bind to address and port servers_per_port = int(conf.get('servers_per_port', '0') or 0)
try:
sock = get_socket(conf) # NOTE: for now servers_per_port is object-server-only; future work could
except ConfigFilePortError: # be done to test and allow it to be used for account and container
msg = 'bind_port wasn\'t properly set in the config file. ' \ # servers, but that has not been done yet.
'It must be explicitly set to a valid port number.' if servers_per_port and app_section == 'object-server':
logger.error(msg) strategy = ServersPerPortStrategy(
print(msg) conf, logger, servers_per_port=servers_per_port)
else:
strategy = WorkersStrategy(conf, logger)
error_msg = strategy.bind_ports()
if error_msg:
logger.error(error_msg)
print(error_msg)
return 1 return 1
# remaining tasks should not require elevated privileges
drop_privileges(conf.get('user', 'swift'))
# Ensure the configuration and application can be loaded before proceeding. # Ensure the configuration and application can be loaded before proceeding.
global_conf = {'log_name': log_name} global_conf = {'log_name': log_name}
@ -479,11 +889,9 @@ def run_wsgi(conf_path, app_section, *args, **kwargs):
# redirect errors to logger and close stdio # redirect errors to logger and close stdio
capture_stdio(logger) capture_stdio(logger)
worker_count = config_auto_int_value(conf.get('workers'), CPU_COUNT) no_fork_sock = strategy.no_fork_sock()
if no_fork_sock:
# Useful for profiling [no forks]. run_server(conf, logger, no_fork_sock, global_conf=global_conf)
if worker_count == 0:
run_server(conf, logger, sock, global_conf=global_conf)
return 0 return 0
def kill_children(*args): def kill_children(*args):
@ -502,32 +910,42 @@ def run_wsgi(conf_path, app_section, *args, **kwargs):
running = [True] running = [True]
signal.signal(signal.SIGTERM, kill_children) signal.signal(signal.SIGTERM, kill_children)
signal.signal(signal.SIGHUP, hup) signal.signal(signal.SIGHUP, hup)
children = []
while running[0]: while running[0]:
while len(children) < worker_count: for sock, sock_info in strategy.new_worker_socks():
pid = os.fork() pid = os.fork()
if pid == 0: if pid == 0:
signal.signal(signal.SIGHUP, signal.SIG_DFL) signal.signal(signal.SIGHUP, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL) signal.signal(signal.SIGTERM, signal.SIG_DFL)
strategy.post_fork_hook()
run_server(conf, logger, sock) run_server(conf, logger, sock)
logger.notice('Child %d exiting normally' % os.getpid()) strategy.log_sock_exit(sock, sock_info)
return 0 return 0
else: else:
logger.notice('Started child %s' % pid) strategy.register_worker_start(sock, sock_info, pid)
children.append(pid)
try: # The strategy may need to pay attention to something in addition to
pid, status = os.wait() # child process exits (like new ports showing up in a ring).
if os.WIFEXITED(status) or os.WIFSIGNALED(status): #
logger.error('Removing dead child %s' % pid) # NOTE: a timeout value of None will just instantiate the Timeout
children.remove(pid) # object and not actually schedule it, which is equivalent to no
except OSError as err: # timeout for the green_os.wait().
if err.errno not in (errno.EINTR, errno.ECHILD): loop_timeout = strategy.loop_timeout()
raise
except KeyboardInterrupt: with Timeout(loop_timeout, exception=False):
logger.notice('User quit') try:
break pid, status = green_os.wait()
greenio.shutdown_safe(sock) if os.WIFEXITED(status) or os.WIFSIGNALED(status):
sock.close() strategy.register_worker_exit(pid)
except OSError as err:
if err.errno not in (errno.EINTR, errno.ECHILD):
raise
except KeyboardInterrupt:
logger.notice('User quit')
running[0] = False
break
strategy.shutdown_sockets()
logger.notice('Exited') logger.notice('Exited')
return 0 return 0

View File

@ -204,7 +204,8 @@ class ContainerSync(Daemon):
#: swift.common.ring.Ring for locating containers. #: swift.common.ring.Ring for locating containers.
self.container_ring = container_ring or Ring(self.swift_dir, self.container_ring = container_ring or Ring(self.swift_dir,
ring_name='container') ring_name='container')
self._myips = whataremyips() bind_ip = conf.get('bind_ip', '0.0.0.0')
self._myips = whataremyips(bind_ip)
self._myport = int(conf.get('bind_port', 6001)) self._myport = int(conf.get('bind_port', 6001))
swift.common.db.DB_PREALLOCATION = \ swift.common.db.DB_PREALLOCATION = \
config_true_value(conf.get('db_preallocation', 'f')) config_true_value(conf.get('db_preallocation', 'f'))

View File

@ -119,7 +119,10 @@ class ObjectReconstructor(Daemon):
self.devices_dir = conf.get('devices', '/srv/node') self.devices_dir = conf.get('devices', '/srv/node')
self.mount_check = config_true_value(conf.get('mount_check', 'true')) self.mount_check = config_true_value(conf.get('mount_check', 'true'))
self.swift_dir = conf.get('swift_dir', '/etc/swift') self.swift_dir = conf.get('swift_dir', '/etc/swift')
self.port = int(conf.get('bind_port', 6000)) self.bind_ip = conf.get('bind_ip', '0.0.0.0')
self.servers_per_port = int(conf.get('servers_per_port', '0') or 0)
self.port = None if self.servers_per_port else \
int(conf.get('bind_port', 6000))
self.concurrency = int(conf.get('concurrency', 1)) self.concurrency = int(conf.get('concurrency', 1))
self.stats_interval = int(conf.get('stats_interval', '300')) self.stats_interval = int(conf.get('stats_interval', '300'))
self.ring_check_interval = int(conf.get('ring_check_interval', 15)) self.ring_check_interval = int(conf.get('ring_check_interval', 15))
@ -764,7 +767,7 @@ class ObjectReconstructor(Daemon):
""" """
override_devices = override_devices or [] override_devices = override_devices or []
override_partitions = override_partitions or [] override_partitions = override_partitions or []
ips = whataremyips() ips = whataremyips(self.bind_ip)
for policy in POLICIES: for policy in POLICIES:
if policy.policy_type != EC_POLICY: if policy.policy_type != EC_POLICY:
continue continue
@ -776,6 +779,7 @@ class ObjectReconstructor(Daemon):
ips, self.port, ips, self.port,
dev['replication_ip'], dev['replication_port']), dev['replication_ip'], dev['replication_port']),
policy.object_ring.devs) policy.object_ring.devs)
for local_dev in local_devices: for local_dev in local_devices:
if override_devices and (local_dev['device'] not in if override_devices and (local_dev['device'] not in
override_devices): override_devices):

View File

@ -65,7 +65,10 @@ class ObjectReplicator(Daemon):
self.mount_check = config_true_value(conf.get('mount_check', 'true')) self.mount_check = config_true_value(conf.get('mount_check', 'true'))
self.vm_test_mode = config_true_value(conf.get('vm_test_mode', 'no')) self.vm_test_mode = config_true_value(conf.get('vm_test_mode', 'no'))
self.swift_dir = conf.get('swift_dir', '/etc/swift') self.swift_dir = conf.get('swift_dir', '/etc/swift')
self.port = int(conf.get('bind_port', 6000)) self.bind_ip = conf.get('bind_ip', '0.0.0.0')
self.servers_per_port = int(conf.get('servers_per_port', '0') or 0)
self.port = None if self.servers_per_port else \
int(conf.get('bind_port', 6000))
self.concurrency = int(conf.get('concurrency', 1)) self.concurrency = int(conf.get('concurrency', 1))
self.stats_interval = int(conf.get('stats_interval', '300')) self.stats_interval = int(conf.get('stats_interval', '300'))
self.ring_check_interval = int(conf.get('ring_check_interval', 15)) self.ring_check_interval = int(conf.get('ring_check_interval', 15))
@ -539,7 +542,7 @@ class ObjectReplicator(Daemon):
policies will be returned policies will be returned
""" """
jobs = [] jobs = []
ips = whataremyips() ips = whataremyips(self.bind_ip)
for policy in POLICIES: for policy in POLICIES:
if policy.policy_type == REPL_POLICY: if policy.policy_type == REPL_POLICY:
if (override_policies is not None and if (override_policies is not None and

View File

@ -39,8 +39,8 @@ for p in POLICIES:
POLICIES_BY_TYPE[p.policy_type].append(p) POLICIES_BY_TYPE[p.policy_type].append(p)
def get_server_number(port, port2server): def get_server_number(ipport, ipport2server):
server_number = port2server[port] server_number = ipport2server[ipport]
server, number = server_number[:-1], server_number[-1:] server, number = server_number[:-1], server_number[-1:]
try: try:
number = int(number) number = int(number)
@ -50,19 +50,19 @@ def get_server_number(port, port2server):
return server, number return server, number
def start_server(port, port2server, pids, check=True): def start_server(ipport, ipport2server, pids, check=True):
server, number = get_server_number(port, port2server) server, number = get_server_number(ipport, ipport2server)
err = Manager([server]).start(number=number, wait=False) err = Manager([server]).start(number=number, wait=False)
if err: if err:
raise Exception('unable to start %s' % ( raise Exception('unable to start %s' % (
server if not number else '%s%s' % (server, number))) server if not number else '%s%s' % (server, number)))
if check: if check:
return check_server(port, port2server, pids) return check_server(ipport, ipport2server, pids)
return None return None
def check_server(port, port2server, pids, timeout=CHECK_SERVER_TIMEOUT): def check_server(ipport, ipport2server, pids, timeout=CHECK_SERVER_TIMEOUT):
server = port2server[port] server = ipport2server[ipport]
if server[:-1] in ('account', 'container', 'object'): if server[:-1] in ('account', 'container', 'object'):
if int(server[-1]) > 4: if int(server[-1]) > 4:
return None return None
@ -74,7 +74,7 @@ def check_server(port, port2server, pids, timeout=CHECK_SERVER_TIMEOUT):
try_until = time() + timeout try_until = time() + timeout
while True: while True:
try: try:
conn = HTTPConnection('127.0.0.1', port) conn = HTTPConnection(*ipport)
conn.request('GET', path) conn.request('GET', path)
resp = conn.getresponse() resp = conn.getresponse()
# 404 because it's a nonsense path (and mount_check is false) # 404 because it's a nonsense path (and mount_check is false)
@ -87,14 +87,14 @@ def check_server(port, port2server, pids, timeout=CHECK_SERVER_TIMEOUT):
if time() > try_until: if time() > try_until:
print err print err
print 'Giving up on %s:%s after %s seconds.' % ( print 'Giving up on %s:%s after %s seconds.' % (
server, port, timeout) server, ipport, timeout)
raise err raise err
sleep(0.1) sleep(0.1)
else: else:
try_until = time() + timeout try_until = time() + timeout
while True: while True:
try: try:
url, token = get_auth('http://127.0.0.1:8080/auth/v1.0', url, token = get_auth('http://%s:%d/auth/v1.0' % ipport,
'test:tester', 'testing') 'test:tester', 'testing')
account = url.split('/')[-1] account = url.split('/')[-1]
head_account(url, token) head_account(url, token)
@ -108,8 +108,8 @@ def check_server(port, port2server, pids, timeout=CHECK_SERVER_TIMEOUT):
return None return None
def kill_server(port, port2server, pids): def kill_server(ipport, ipport2server, pids):
server, number = get_server_number(port, port2server) server, number = get_server_number(ipport, ipport2server)
err = Manager([server]).kill(number=number) err = Manager([server]).kill(number=number)
if err: if err:
raise Exception('unable to kill %s' % (server if not number else raise Exception('unable to kill %s' % (server if not number else
@ -117,47 +117,77 @@ def kill_server(port, port2server, pids):
try_until = time() + 30 try_until = time() + 30
while True: while True:
try: try:
conn = HTTPConnection('127.0.0.1', port) conn = HTTPConnection(*ipport)
conn.request('GET', '/') conn.request('GET', '/')
conn.getresponse() conn.getresponse()
except Exception as err: except Exception as err:
break break
if time() > try_until: if time() > try_until:
raise Exception( raise Exception(
'Still answering on port %s after 30 seconds' % port) 'Still answering on %s:%s after 30 seconds' % ipport)
sleep(0.1) sleep(0.1)
def kill_nonprimary_server(primary_nodes, port2server, pids): def kill_nonprimary_server(primary_nodes, ipport2server, pids):
primary_ports = [n['port'] for n in primary_nodes] primary_ipports = [(n['ip'], n['port']) for n in primary_nodes]
for port, server in port2server.iteritems(): for ipport, server in ipport2server.iteritems():
if port in primary_ports: if ipport in primary_ipports:
server_type = server[:-1] server_type = server[:-1]
break break
else: else:
raise Exception('Cannot figure out server type for %r' % primary_nodes) raise Exception('Cannot figure out server type for %r' % primary_nodes)
for port, server in list(port2server.iteritems()): for ipport, server in list(ipport2server.iteritems()):
if server[:-1] == server_type and port not in primary_ports: if server[:-1] == server_type and ipport not in primary_ipports:
kill_server(port, port2server, pids) kill_server(ipport, ipport2server, pids)
return port return ipport
def build_port_to_conf(server): def add_ring_devs_to_ipport2server(ring, server_type, ipport2server,
# map server to config by port servers_per_port=0):
port_to_config = {} # We'll number the servers by order of unique occurrence of:
for server_ in Manager([server]): # IP, if servers_per_port > 0 OR there > 1 IP in ring
for config_path in server_.conf_files(): # ipport, otherwise
conf = readconf(config_path, unique_ip_count = len(set(dev['ip'] for dev in ring.devs if dev))
section_name='%s-replicator' % server_.type) things_to_number = {}
port_to_config[int(conf['bind_port'])] = conf number = 0
return port_to_config for dev in filter(None, ring.devs):
ip = dev['ip']
ipport = (ip, dev['port'])
unique_by = ip if servers_per_port or unique_ip_count > 1 else ipport
if unique_by not in things_to_number:
number += 1
things_to_number[unique_by] = number
ipport2server[ipport] = '%s%d' % (server_type,
things_to_number[unique_by])
def store_config_paths(name, configs):
for server_name in (name, '%s-replicator' % name):
for server in Manager([server_name]):
for i, conf in enumerate(server.conf_files(), 1):
configs[server.server][i] = conf
def get_ring(ring_name, required_replicas, required_devices, def get_ring(ring_name, required_replicas, required_devices,
server=None, force_validate=None): server=None, force_validate=None, ipport2server=None,
config_paths=None):
if not server: if not server:
server = ring_name server = ring_name
ring = Ring('/etc/swift', ring_name=ring_name) ring = Ring('/etc/swift', ring_name=ring_name)
if ipport2server is None:
ipport2server = {} # used internally, even if not passed in
if config_paths is None:
config_paths = defaultdict(dict)
store_config_paths(server, config_paths)
repl_name = '%s-replicator' % server
repl_configs = {i: readconf(c, section_name=repl_name)
for i, c in config_paths[repl_name].iteritems()}
servers_per_port = any(int(c.get('servers_per_port', '0'))
for c in repl_configs.values())
add_ring_devs_to_ipport2server(ring, server, ipport2server,
servers_per_port=servers_per_port)
if not VALIDATE_RSYNC and not force_validate: if not VALIDATE_RSYNC and not force_validate:
return ring return ring
# easy sanity checks # easy sanity checks
@ -167,10 +197,11 @@ def get_ring(ring_name, required_replicas, required_devices,
if len(ring.devs) != required_devices: if len(ring.devs) != required_devices:
raise SkipTest('%s has %s devices instead of %s' % ( raise SkipTest('%s has %s devices instead of %s' % (
ring.serialized_path, len(ring.devs), required_devices)) ring.serialized_path, len(ring.devs), required_devices))
port_to_config = build_port_to_conf(server)
for dev in ring.devs: for dev in ring.devs:
# verify server is exposing mounted device # verify server is exposing mounted device
conf = port_to_config[dev['port']] ipport = (dev['ip'], dev['port'])
_, server_number = get_server_number(ipport, ipport2server)
conf = repl_configs[server_number]
for device in os.listdir(conf['devices']): for device in os.listdir(conf['devices']):
if device == dev['device']: if device == dev['device']:
dev_path = os.path.join(conf['devices'], device) dev_path = os.path.join(conf['devices'], device)
@ -185,7 +216,7 @@ def get_ring(ring_name, required_replicas, required_devices,
"unable to find ring device %s under %s's devices (%s)" % ( "unable to find ring device %s under %s's devices (%s)" % (
dev['device'], server, conf['devices'])) dev['device'], server, conf['devices']))
# verify server is exposing rsync device # verify server is exposing rsync device
if port_to_config[dev['port']].get('vm_test_mode', False): if conf.get('vm_test_mode', False):
rsync_export = '%s%s' % (server, dev['replication_port']) rsync_export = '%s%s' % (server, dev['replication_port'])
else: else:
rsync_export = server rsync_export = server
@ -235,46 +266,45 @@ class ProbeTest(unittest.TestCase):
Manager(['all']).stop() Manager(['all']).stop()
self.pids = {} self.pids = {}
try: try:
self.ipport2server = {}
self.configs = defaultdict(dict)
self.account_ring = get_ring( self.account_ring = get_ring(
'account', 'account',
self.acct_cont_required_replicas, self.acct_cont_required_replicas,
self.acct_cont_required_devices) self.acct_cont_required_devices,
ipport2server=self.ipport2server,
config_paths=self.configs)
self.container_ring = get_ring( self.container_ring = get_ring(
'container', 'container',
self.acct_cont_required_replicas, self.acct_cont_required_replicas,
self.acct_cont_required_devices) self.acct_cont_required_devices,
ipport2server=self.ipport2server,
config_paths=self.configs)
self.policy = get_policy(**self.policy_requirements) self.policy = get_policy(**self.policy_requirements)
self.object_ring = get_ring( self.object_ring = get_ring(
self.policy.ring_name, self.policy.ring_name,
self.obj_required_replicas, self.obj_required_replicas,
self.obj_required_devices, self.obj_required_devices,
server='object') server='object',
ipport2server=self.ipport2server,
config_paths=self.configs)
self.servers_per_port = any(
int(readconf(c, section_name='object-replicator').get(
'servers_per_port', '0'))
for c in self.configs['object-replicator'].values())
Manager(['main']).start(wait=False) Manager(['main']).start(wait=False)
self.port2server = {} for ipport in self.ipport2server:
for server, port in [('account', 6002), ('container', 6001), check_server(ipport, self.ipport2server, self.pids)
('object', 6000)]: proxy_ipport = ('127.0.0.1', 8080)
for number in xrange(1, 9): self.ipport2server[proxy_ipport] = 'proxy'
self.port2server[port + (number * 10)] = \ self.url, self.token, self.account = check_server(
'%s%d' % (server, number) proxy_ipport, self.ipport2server, self.pids)
for port in self.port2server:
check_server(port, self.port2server, self.pids)
self.port2server[8080] = 'proxy'
self.url, self.token, self.account = \
check_server(8080, self.port2server, self.pids)
self.configs = defaultdict(dict)
for name in ('account', 'container', 'object'):
for server_name in (name, '%s-replicator' % name):
for server in Manager([server_name]):
for i, conf in enumerate(server.conf_files(), 1):
self.configs[server.server][i] = conf
self.replicators = Manager( self.replicators = Manager(
['account-replicator', 'container-replicator', ['account-replicator', 'container-replicator',
'object-replicator']) 'object-replicator'])
self.updaters = Manager(['container-updater', 'object-updater']) self.updaters = Manager(['container-updater', 'object-updater'])
self.server_port_to_conf = {}
# get some configs backend daemon configs loaded up
for server in ('account', 'container', 'object'):
self.server_port_to_conf[server] = build_port_to_conf(server)
except BaseException: except BaseException:
try: try:
raise raise
@ -288,7 +318,11 @@ class ProbeTest(unittest.TestCase):
Manager(['all']).kill() Manager(['all']).kill()
def device_dir(self, server, node): def device_dir(self, server, node):
conf = self.server_port_to_conf[server][node['port']] server_type, config_number = get_server_number(
(node['ip'], node['port']), self.ipport2server)
repl_server = '%s-replicator' % server_type
conf = readconf(self.configs[repl_server][config_number],
section_name=repl_server)
return os.path.join(conf['devices'], node['device']) return os.path.join(conf['devices'], node['device'])
def storage_dir(self, server, node, part=None, policy=None): def storage_dir(self, server, node, part=None, policy=None):
@ -301,9 +335,24 @@ class ProbeTest(unittest.TestCase):
def config_number(self, node): def config_number(self, node):
_server_type, config_number = get_server_number( _server_type, config_number = get_server_number(
node['port'], self.port2server) (node['ip'], node['port']), self.ipport2server)
return config_number return config_number
def is_local_to(self, node1, node2):
"""
Return True if both ring devices are "local" to each other (on the same
"server".
"""
if self.servers_per_port:
return node1['ip'] == node2['ip']
# Without a disambiguating IP, for SAIOs, we have to assume ports
# uniquely identify "servers". SAIOs should be configured to *either*
# have unique IPs per node (e.g. 127.0.0.1, 127.0.0.2, etc.) OR unique
# ports per server (i.e. sdb1 & sdb5 would have same port numbers in
# the 8-disk EC ring).
return node1['port'] == node2['port']
def get_to_final_state(self): def get_to_final_state(self):
# these .stop()s are probably not strictly necessary, # these .stop()s are probably not strictly necessary,
# but may prevent race conditions # but may prevent race conditions

View File

@ -97,8 +97,9 @@ class TestAccountFailures(ReplProbeTest):
self.assert_(found2) self.assert_(found2)
apart, anodes = self.account_ring.get_nodes(self.account) apart, anodes = self.account_ring.get_nodes(self.account)
kill_nonprimary_server(anodes, self.port2server, self.pids) kill_nonprimary_server(anodes, self.ipport2server, self.pids)
kill_server(anodes[0]['port'], self.port2server, self.pids) kill_server((anodes[0]['ip'], anodes[0]['port']),
self.ipport2server, self.pids)
# Kill account servers excepting two of the primaries # Kill account servers excepting two of the primaries
# Delete container1 # Delete container1
@ -146,7 +147,8 @@ class TestAccountFailures(ReplProbeTest):
self.assert_(found2) self.assert_(found2)
# Restart other primary account server # Restart other primary account server
start_server(anodes[0]['port'], self.port2server, self.pids) start_server((anodes[0]['ip'], anodes[0]['port']),
self.ipport2server, self.pids)
# Assert that server doesn't know about container1's deletion or the # Assert that server doesn't know about container1's deletion or the
# new container2/object2 yet # new container2/object2 yet

View File

@ -49,14 +49,16 @@ class TestContainerFailures(ReplProbeTest):
client.put_container(self.url, self.token, container1) client.put_container(self.url, self.token, container1)
# Kill container1 servers excepting two of the primaries # Kill container1 servers excepting two of the primaries
kill_nonprimary_server(cnodes, self.port2server, self.pids) kill_nonprimary_server(cnodes, self.ipport2server, self.pids)
kill_server(cnodes[0]['port'], self.port2server, self.pids) kill_server((cnodes[0]['ip'], cnodes[0]['port']),
self.ipport2server, self.pids)
# Delete container1 # Delete container1
client.delete_container(self.url, self.token, container1) client.delete_container(self.url, self.token, container1)
# Restart other container1 primary server # Restart other container1 primary server
start_server(cnodes[0]['port'], self.port2server, self.pids) start_server((cnodes[0]['ip'], cnodes[0]['port']),
self.ipport2server, self.pids)
# Create container1/object1 (allowed because at least server thinks the # Create container1/object1 (allowed because at least server thinks the
# container exists) # container exists)
@ -87,18 +89,23 @@ class TestContainerFailures(ReplProbeTest):
client.put_container(self.url, self.token, container1) client.put_container(self.url, self.token, container1)
# Kill container1 servers excepting one of the primaries # Kill container1 servers excepting one of the primaries
cnp_port = kill_nonprimary_server(cnodes, self.port2server, self.pids) cnp_ipport = kill_nonprimary_server(cnodes, self.ipport2server,
kill_server(cnodes[0]['port'], self.port2server, self.pids) self.pids)
kill_server(cnodes[1]['port'], self.port2server, self.pids) kill_server((cnodes[0]['ip'], cnodes[0]['port']),
self.ipport2server, self.pids)
kill_server((cnodes[1]['ip'], cnodes[1]['port']),
self.ipport2server, self.pids)
# Delete container1 directly to the one primary still up # Delete container1 directly to the one primary still up
direct_client.direct_delete_container(cnodes[2], cpart, self.account, direct_client.direct_delete_container(cnodes[2], cpart, self.account,
container1) container1)
# Restart other container1 servers # Restart other container1 servers
start_server(cnodes[0]['port'], self.port2server, self.pids) start_server((cnodes[0]['ip'], cnodes[0]['port']),
start_server(cnodes[1]['port'], self.port2server, self.pids) self.ipport2server, self.pids)
start_server(cnp_port, self.port2server, self.pids) start_server((cnodes[1]['ip'], cnodes[1]['port']),
self.ipport2server, self.pids)
start_server(cnp_ipport, self.ipport2server, self.pids)
# Get to a final state # Get to a final state
self.get_to_final_state() self.get_to_final_state()

View File

@ -26,7 +26,8 @@ from swiftclient import client
from swift.common import direct_client from swift.common import direct_client
from swift.obj.diskfile import get_data_dir from swift.obj.diskfile import get_data_dir
from swift.common.exceptions import ClientException from swift.common.exceptions import ClientException
from test.probe.common import kill_server, ReplProbeTest, start_server from test.probe.common import (
kill_server, ReplProbeTest, start_server, get_server_number)
from swift.common.utils import readconf from swift.common.utils import readconf
from swift.common.manager import Manager from swift.common.manager import Manager
@ -35,7 +36,8 @@ class TestEmptyDevice(ReplProbeTest):
def _get_objects_dir(self, onode): def _get_objects_dir(self, onode):
device = onode['device'] device = onode['device']
node_id = (onode['port'] - 6000) / 10 _, node_id = get_server_number((onode['ip'], onode['port']),
self.ipport2server)
obj_server_conf = readconf(self.configs['object-server'][node_id]) obj_server_conf = readconf(self.configs['object-server'][node_id])
devices = obj_server_conf['app:object-server']['devices'] devices = obj_server_conf['app:object-server']['devices']
obj_dir = '%s/%s' % (devices, device) obj_dir = '%s/%s' % (devices, device)
@ -56,7 +58,8 @@ class TestEmptyDevice(ReplProbeTest):
onode = onodes[0] onode = onodes[0]
# Kill one container/obj primary server # Kill one container/obj primary server
kill_server(onode['port'], self.port2server, self.pids) kill_server((onode['ip'], onode['port']),
self.ipport2server, self.pids)
# Delete the default data directory for objects on the primary server # Delete the default data directory for objects on the primary server
obj_dir = '%s/%s' % (self._get_objects_dir(onode), obj_dir = '%s/%s' % (self._get_objects_dir(onode),
@ -74,7 +77,8 @@ class TestEmptyDevice(ReplProbeTest):
# Kill other two container/obj primary servers # Kill other two container/obj primary servers
# to ensure GET handoff works # to ensure GET handoff works
for node in onodes[1:]: for node in onodes[1:]:
kill_server(node['port'], self.port2server, self.pids) kill_server((node['ip'], node['port']),
self.ipport2server, self.pids)
# Indirectly through proxy assert we can get container/obj # Indirectly through proxy assert we can get container/obj
odata = client.get_object(self.url, self.token, container, obj)[-1] odata = client.get_object(self.url, self.token, container, obj)[-1]
@ -83,7 +87,8 @@ class TestEmptyDevice(ReplProbeTest):
'returned: %s' % repr(odata)) 'returned: %s' % repr(odata))
# Restart those other two container/obj primary servers # Restart those other two container/obj primary servers
for node in onodes[1:]: for node in onodes[1:]:
start_server(node['port'], self.port2server, self.pids) start_server((node['ip'], node['port']),
self.ipport2server, self.pids)
self.assertFalse(os.path.exists(obj_dir)) self.assertFalse(os.path.exists(obj_dir))
# We've indirectly verified the handoff node has the object, but # We've indirectly verified the handoff node has the object, but
# let's directly verify it. # let's directly verify it.
@ -122,7 +127,8 @@ class TestEmptyDevice(ReplProbeTest):
missing) missing)
# Bring the first container/obj primary server back up # Bring the first container/obj primary server back up
start_server(onode['port'], self.port2server, self.pids) start_server((onode['ip'], onode['port']),
self.ipport2server, self.pids)
# Assert that it doesn't have container/obj yet # Assert that it doesn't have container/obj yet
self.assertFalse(os.path.exists(obj_dir)) self.assertFalse(os.path.exists(obj_dir))
@ -136,21 +142,17 @@ class TestEmptyDevice(ReplProbeTest):
else: else:
self.fail("Expected ClientException but didn't get it") self.fail("Expected ClientException but didn't get it")
try:
port_num = onode['replication_port']
except KeyError:
port_num = onode['port']
try:
another_port_num = another_onode['replication_port']
except KeyError:
another_port_num = another_onode['port']
# Run object replication for first container/obj primary server # Run object replication for first container/obj primary server
num = (port_num - 6000) / 10 _, num = get_server_number(
(onode['ip'], onode.get('replication_port', onode['port'])),
self.ipport2server)
Manager(['object-replicator']).once(number=num) Manager(['object-replicator']).once(number=num)
# Run object replication for handoff node # Run object replication for handoff node
another_num = (another_port_num - 6000) / 10 _, another_num = get_server_number(
(another_onode['ip'],
another_onode.get('replication_port', another_onode['port'])),
self.ipport2server)
Manager(['object-replicator']).once(number=another_num) Manager(['object-replicator']).once(number=another_num)
# Assert the first container/obj primary server now has container/obj # Assert the first container/obj primary server now has container/obj

View File

@ -41,15 +41,17 @@ class TestObjectAsyncUpdate(ReplProbeTest):
# Kill container servers excepting two of the primaries # Kill container servers excepting two of the primaries
cpart, cnodes = self.container_ring.get_nodes(self.account, container) cpart, cnodes = self.container_ring.get_nodes(self.account, container)
cnode = cnodes[0] cnode = cnodes[0]
kill_nonprimary_server(cnodes, self.port2server, self.pids) kill_nonprimary_server(cnodes, self.ipport2server, self.pids)
kill_server(cnode['port'], self.port2server, self.pids) kill_server((cnode['ip'], cnode['port']),
self.ipport2server, self.pids)
# Create container/obj # Create container/obj
obj = 'object-%s' % uuid4() obj = 'object-%s' % uuid4()
client.put_object(self.url, self.token, container, obj, '') client.put_object(self.url, self.token, container, obj, '')
# Restart other primary server # Restart other primary server
start_server(cnode['port'], self.port2server, self.pids) start_server((cnode['ip'], cnode['port']),
self.ipport2server, self.pids)
# Assert it does not know about container/obj # Assert it does not know about container/obj
self.assert_(not direct_client.direct_get_container( self.assert_(not direct_client.direct_get_container(

View File

@ -41,7 +41,8 @@ class TestObjectHandoff(ReplProbeTest):
opart, onodes = self.object_ring.get_nodes( opart, onodes = self.object_ring.get_nodes(
self.account, container, obj) self.account, container, obj)
onode = onodes[0] onode = onodes[0]
kill_server(onode['port'], self.port2server, self.pids) kill_server((onode['ip'], onode['port']),
self.ipport2server, self.pids)
# Create container/obj (goes to two primary servers and one handoff) # Create container/obj (goes to two primary servers and one handoff)
client.put_object(self.url, self.token, container, obj, 'VERIFY') client.put_object(self.url, self.token, container, obj, 'VERIFY')
@ -53,7 +54,8 @@ class TestObjectHandoff(ReplProbeTest):
# Kill other two container/obj primary servers # Kill other two container/obj primary servers
# to ensure GET handoff works # to ensure GET handoff works
for node in onodes[1:]: for node in onodes[1:]:
kill_server(node['port'], self.port2server, self.pids) kill_server((node['ip'], node['port']),
self.ipport2server, self.pids)
# Indirectly through proxy assert we can get container/obj # Indirectly through proxy assert we can get container/obj
odata = client.get_object(self.url, self.token, container, obj)[-1] odata = client.get_object(self.url, self.token, container, obj)[-1]
@ -63,7 +65,8 @@ class TestObjectHandoff(ReplProbeTest):
# Restart those other two container/obj primary servers # Restart those other two container/obj primary servers
for node in onodes[1:]: for node in onodes[1:]:
start_server(node['port'], self.port2server, self.pids) start_server((node['ip'], node['port']),
self.ipport2server, self.pids)
# We've indirectly verified the handoff node has the container/object, # We've indirectly verified the handoff node has the container/object,
# but let's directly verify it. # but let's directly verify it.
@ -90,7 +93,8 @@ class TestObjectHandoff(ReplProbeTest):
(cnode['ip'], cnode['port'])) (cnode['ip'], cnode['port']))
# Bring the first container/obj primary server back up # Bring the first container/obj primary server back up
start_server(onode['port'], self.port2server, self.pids) start_server((onode['ip'], onode['port']),
self.ipport2server, self.pids)
# Assert that it doesn't have container/obj yet # Assert that it doesn't have container/obj yet
try: try:
@ -138,7 +142,8 @@ class TestObjectHandoff(ReplProbeTest):
# Kill the first container/obj primary server again (we have two # Kill the first container/obj primary server again (we have two
# primaries and the handoff up now) # primaries and the handoff up now)
kill_server(onode['port'], self.port2server, self.pids) kill_server((onode['ip'], onode['port']),
self.ipport2server, self.pids)
# Delete container/obj # Delete container/obj
try: try:
@ -175,7 +180,8 @@ class TestObjectHandoff(ReplProbeTest):
(cnode['ip'], cnode['port'])) (cnode['ip'], cnode['port']))
# Restart the first container/obj primary server again # Restart the first container/obj primary server again
start_server(onode['port'], self.port2server, self.pids) start_server((onode['ip'], onode['port']),
self.ipport2server, self.pids)
# Assert it still has container/obj # Assert it still has container/obj
direct_client.direct_get_object( direct_client.direct_get_object(

View File

@ -294,7 +294,7 @@ class TestReconstructorRevert(ECProbeTest):
# the same server # the same server
handoff_fragment_etag = None handoff_fragment_etag = None
for node in onodes: for node in onodes:
if node['port'] == hnode['port']: if self.is_local_to(node, hnode):
# we'll keep track of the etag of this fragment we're removing # we'll keep track of the etag of this fragment we're removing
# in case we need it later (queue forshadowing music)... # in case we need it later (queue forshadowing music)...
try: try:
@ -327,7 +327,7 @@ class TestReconstructorRevert(ECProbeTest):
raise raise
# partner already had it's fragment removed # partner already had it's fragment removed
if (handoff_fragment_etag is not None and if (handoff_fragment_etag is not None and
hnode['port'] == partner['port']): self.is_local_to(hnode, partner)):
# oh, well that makes sense then... # oh, well that makes sense then...
rebuilt_fragment_etag = handoff_fragment_etag rebuilt_fragment_etag = handoff_fragment_etag
else: else:

View File

@ -30,7 +30,7 @@ import eventlet
from eventlet.green import socket from eventlet.green import socket
from tempfile import mkdtemp from tempfile import mkdtemp
from shutil import rmtree from shutil import rmtree
from swift.common.utils import Timestamp from swift.common.utils import Timestamp, NOTICE
from test import get_config from test import get_config
from swift.common import swob, utils from swift.common import swob, utils
from swift.common.ring import Ring, RingData from swift.common.ring import Ring, RingData
@ -478,8 +478,18 @@ class FakeLogger(logging.Logger, object):
logging.INFO: 'info', logging.INFO: 'info',
logging.DEBUG: 'debug', logging.DEBUG: 'debug',
logging.CRITICAL: 'critical', logging.CRITICAL: 'critical',
NOTICE: 'notice',
} }
def notice(self, msg, *args, **kwargs):
"""
Convenience function for syslog priority LOG_NOTICE. The python
logging lvl is set to 25, just above info. SysLogHandler is
monkey patched to map this log lvl to the LOG_NOTICE syslog
priority.
"""
self.log(NOTICE, msg, *args, **kwargs)
def _log(self, level, msg, *args, **kwargs): def _log(self, level, msg, *args, **kwargs):
store_name = self.store_in[level] store_name = self.store_in[level]
cargs = [msg] cargs = [msg]
@ -495,7 +505,7 @@ class FakeLogger(logging.Logger, object):
def _clear(self): def _clear(self):
self.log_dict = defaultdict(list) self.log_dict = defaultdict(list)
self.lines_dict = {'critical': [], 'error': [], 'info': [], self.lines_dict = {'critical': [], 'error': [], 'info': [],
'warning': [], 'debug': []} 'warning': [], 'debug': [], 'notice': []}
def get_lines_for_level(self, level): def get_lines_for_level(self, level):
if level not in self.lines_dict: if level not in self.lines_dict:

View File

@ -77,6 +77,15 @@ class TestRingData(unittest.TestCase):
for p in xrange(pickle.HIGHEST_PROTOCOL): for p in xrange(pickle.HIGHEST_PROTOCOL):
with closing(GzipFile(ring_fname, 'wb')) as f: with closing(GzipFile(ring_fname, 'wb')) as f:
pickle.dump(rd, f, protocol=p) pickle.dump(rd, f, protocol=p)
meta_only = ring.RingData.load(ring_fname, metadata_only=True)
self.assertEqual([
{'id': 0, 'zone': 0, 'region': 1, 'ip': '10.1.1.0',
'port': 7000},
{'id': 1, 'zone': 1, 'region': 1, 'ip': '10.1.1.1',
'port': 7000},
], meta_only.devs)
# Pickled rings can't load only metadata, so you get it all
self.assert_ring_data_equal(rd, meta_only)
ring_data = ring.RingData.load(ring_fname) ring_data = ring.RingData.load(ring_fname)
self.assert_ring_data_equal(rd, ring_data) self.assert_ring_data_equal(rd, ring_data)
@ -86,6 +95,12 @@ class TestRingData(unittest.TestCase):
[array.array('H', [0, 1, 0, 1]), array.array('H', [0, 1, 0, 1])], [array.array('H', [0, 1, 0, 1]), array.array('H', [0, 1, 0, 1])],
[{'id': 0, 'zone': 0}, {'id': 1, 'zone': 1}], 30) [{'id': 0, 'zone': 0}, {'id': 1, 'zone': 1}], 30)
rd.save(ring_fname) rd.save(ring_fname)
meta_only = ring.RingData.load(ring_fname, metadata_only=True)
self.assertEqual([
{'id': 0, 'zone': 0, 'region': 1},
{'id': 1, 'zone': 1, 'region': 1},
], meta_only.devs)
self.assertEqual([], meta_only._replica2part2dev_id)
rd2 = ring.RingData.load(ring_fname) rd2 = ring.RingData.load(ring_fname)
self.assert_ring_data_equal(rd, rd2) self.assert_ring_data_equal(rd, rd2)

View File

@ -185,22 +185,41 @@ class TestUtils(unittest.TestCase):
self.assertFalse(is_valid_hostname("$blah#")) self.assertFalse(is_valid_hostname("$blah#"))
def test_is_local_device(self): def test_is_local_device(self):
my_ips = ["127.0.0.1", # localhost shows up in whataremyips() output as "::1" for IPv6
"0000:0000:0000:0000:0000:0000:0000:0001"] my_ips = ["127.0.0.1", "::1"]
my_port = 6000 my_port = 6000
self.assertTrue(is_local_device(my_ips, my_port, self.assertTrue(is_local_device(my_ips, my_port,
"localhost", "127.0.0.1", my_port))
my_port)) self.assertTrue(is_local_device(my_ips, my_port,
"::1", my_port))
self.assertTrue(is_local_device(
my_ips, my_port,
"0000:0000:0000:0000:0000:0000:0000:0001", my_port))
self.assertTrue(is_local_device(my_ips, my_port,
"localhost", my_port))
self.assertFalse(is_local_device(my_ips, my_port, self.assertFalse(is_local_device(my_ips, my_port,
"localhost", "localhost", my_port + 1))
my_port + 1))
self.assertFalse(is_local_device(my_ips, my_port, self.assertFalse(is_local_device(my_ips, my_port,
"127.0.0.2", "127.0.0.2", my_port))
my_port))
# for those that don't have a local port # for those that don't have a local port
self.assertTrue(is_local_device(my_ips, None, self.assertTrue(is_local_device(my_ips, None,
my_ips[0], None)) my_ips[0], None))
# When servers_per_port is active, the "my_port" passed in is None
# which means "don't include port in the determination of locality
# because it's not reliable in this deployment scenario"
self.assertTrue(is_local_device(my_ips, None,
"127.0.0.1", 6666))
self.assertTrue(is_local_device(my_ips, None,
"::1", 6666))
self.assertTrue(is_local_device(
my_ips, None,
"0000:0000:0000:0000:0000:0000:0000:0001", 6666))
self.assertTrue(is_local_device(my_ips, None,
"localhost", 6666))
self.assertFalse(is_local_device(my_ips, None,
"127.0.0.2", my_port))
def test_validate_and_normalize_ip(self): def test_validate_and_normalize_ip(self):
ipv4 = "10.0.0.1" ipv4 = "10.0.0.1"
self.assertEqual(ipv4, validate_and_normalize_ip(ipv4)) self.assertEqual(ipv4, validate_and_normalize_ip(ipv4))

View File

@ -477,7 +477,7 @@ class TestDBReplicator(unittest.TestCase):
def test_run_once_no_ips(self): def test_run_once_no_ips(self):
replicator = TestReplicator({}, logger=unit.FakeLogger()) replicator = TestReplicator({}, logger=unit.FakeLogger())
self._patch(patch.object, db_replicator, 'whataremyips', self._patch(patch.object, db_replicator, 'whataremyips',
lambda *args: []) lambda *a, **kw: [])
replicator.run_once() replicator.run_once()
@ -487,7 +487,9 @@ class TestDBReplicator(unittest.TestCase):
def test_run_once_node_is_not_mounted(self): def test_run_once_node_is_not_mounted(self):
db_replicator.ring = FakeRingWithSingleNode() db_replicator.ring = FakeRingWithSingleNode()
conf = {'mount_check': 'true', 'bind_port': 6000} # If a bind_ip is specified, it's plumbed into whataremyips() and
# returned by itself.
conf = {'mount_check': 'true', 'bind_ip': '1.1.1.1', 'bind_port': 6000}
replicator = TestReplicator(conf, logger=unit.FakeLogger()) replicator = TestReplicator(conf, logger=unit.FakeLogger())
self.assertEqual(replicator.mount_check, True) self.assertEqual(replicator.mount_check, True)
self.assertEqual(replicator.port, 6000) self.assertEqual(replicator.port, 6000)
@ -498,8 +500,6 @@ class TestDBReplicator(unittest.TestCase):
replicator.ring.devs[0]['device'])) replicator.ring.devs[0]['device']))
return False return False
self._patch(patch.object, db_replicator, 'whataremyips',
lambda *args: ['1.1.1.1'])
self._patch(patch.object, db_replicator, 'ismount', mock_ismount) self._patch(patch.object, db_replicator, 'ismount', mock_ismount)
replicator.run_once() replicator.run_once()
@ -528,7 +528,7 @@ class TestDBReplicator(unittest.TestCase):
self.assertEquals(1, node_id) self.assertEquals(1, node_id)
self._patch(patch.object, db_replicator, 'whataremyips', self._patch(patch.object, db_replicator, 'whataremyips',
lambda *args: ['1.1.1.1']) lambda *a, **kw: ['1.1.1.1'])
self._patch(patch.object, db_replicator, 'ismount', lambda *args: True) self._patch(patch.object, db_replicator, 'ismount', lambda *args: True)
self._patch(patch.object, db_replicator, 'unlink_older_than', self._patch(patch.object, db_replicator, 'unlink_older_than',
mock_unlink_older_than) mock_unlink_older_than)
@ -1390,7 +1390,7 @@ class TestReplicatorSync(unittest.TestCase):
return True return True
daemon._rsync_file = _rsync_file daemon._rsync_file = _rsync_file
with mock.patch('swift.common.db_replicator.whataremyips', with mock.patch('swift.common.db_replicator.whataremyips',
new=lambda: [node['replication_ip']]): new=lambda *a, **kw: [node['replication_ip']]):
daemon.run_once() daemon.run_once()
return daemon return daemon

View File

@ -15,14 +15,17 @@
import unittest import unittest
import StringIO import StringIO
from ConfigParser import ConfigParser from ConfigParser import ConfigParser
import os
import mock import mock
from functools import partial
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
from test.unit import patch_policies, FakeRing from test.unit import patch_policies, FakeRing, temptree
from swift.common.storage_policy import ( from swift.common.storage_policy import (
StoragePolicyCollection, POLICIES, PolicyError, parse_storage_policies, StoragePolicyCollection, POLICIES, PolicyError, parse_storage_policies,
reload_storage_policies, get_policy_string, split_policy_string, reload_storage_policies, get_policy_string, split_policy_string,
BaseStoragePolicy, StoragePolicy, ECStoragePolicy, REPL_POLICY, EC_POLICY, BaseStoragePolicy, StoragePolicy, ECStoragePolicy, REPL_POLICY, EC_POLICY,
VALID_EC_TYPES, DEFAULT_EC_OBJECT_SEGMENT_SIZE) VALID_EC_TYPES, DEFAULT_EC_OBJECT_SEGMENT_SIZE, BindPortsCache)
from swift.common.ring import RingData
from swift.common.exceptions import RingValidationError from swift.common.exceptions import RingValidationError
@ -740,6 +743,139 @@ class TestStoragePolicies(unittest.TestCase):
self.assertRaises(PolicyError, policies.get_object_ring, 99, self.assertRaises(PolicyError, policies.get_object_ring, 99,
'/path/not/used') '/path/not/used')
def test_bind_ports_cache(self):
test_policies = [StoragePolicy(0, 'aay', True),
StoragePolicy(1, 'bee', False),
StoragePolicy(2, 'cee', False)]
my_ips = ['1.2.3.4', '2.3.4.5']
other_ips = ['3.4.5.6', '4.5.6.7']
bind_ip = my_ips[1]
devs_by_ring_name1 = {
'object': [ # 'aay'
{'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[0],
'port': 6006},
{'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[0],
'port': 6007},
{'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[1],
'port': 6008},
None,
{'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[1],
'port': 6009}],
'object-1': [ # 'bee'
{'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[1],
'port': 6006}, # dupe
{'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[0],
'port': 6010},
{'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[1],
'port': 6011},
{'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[1],
'port': 6012}],
'object-2': [ # 'cee'
{'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[0],
'port': 6010}, # on our IP and a not-us IP
{'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[0],
'port': 6013},
None,
{'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[1],
'port': 6014},
{'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[1],
'port': 6015}],
}
devs_by_ring_name2 = {
'object': [ # 'aay'
{'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[0],
'port': 6016},
{'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[1],
'port': 6019}],
'object-1': [ # 'bee'
{'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[1],
'port': 6016}, # dupe
{'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[1],
'port': 6022}],
'object-2': [ # 'cee'
{'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[0],
'port': 6020},
{'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[1],
'port': 6025}],
}
ring_files = [ring_name + '.ring.gz'
for ring_name in sorted(devs_by_ring_name1)]
def _fake_load(gz_path, stub_objs, metadata_only=False):
return RingData(
devs=stub_objs[os.path.basename(gz_path)[:-8]],
replica2part2dev_id=[],
part_shift=24)
with mock.patch(
'swift.common.storage_policy.RingData.load'
) as mock_ld, \
patch_policies(test_policies), \
mock.patch('swift.common.storage_policy.whataremyips') \
as mock_whataremyips, \
temptree(ring_files) as tempdir:
mock_whataremyips.return_value = my_ips
cache = BindPortsCache(tempdir, bind_ip)
self.assertEqual([
mock.call(bind_ip),
], mock_whataremyips.mock_calls)
mock_whataremyips.reset_mock()
mock_ld.side_effect = partial(_fake_load,
stub_objs=devs_by_ring_name1)
self.assertEqual(set([
6006, 6008, 6011, 6010, 6014,
]), cache.all_bind_ports_for_node())
self.assertEqual([
mock.call(os.path.join(tempdir, ring_files[0]),
metadata_only=True),
mock.call(os.path.join(tempdir, ring_files[1]),
metadata_only=True),
mock.call(os.path.join(tempdir, ring_files[2]),
metadata_only=True),
], mock_ld.mock_calls)
mock_ld.reset_mock()
mock_ld.side_effect = partial(_fake_load,
stub_objs=devs_by_ring_name2)
self.assertEqual(set([
6006, 6008, 6011, 6010, 6014,
]), cache.all_bind_ports_for_node())
self.assertEqual([], mock_ld.mock_calls)
# but when all the file mtimes are made different, it'll
# reload
for gz_file in [os.path.join(tempdir, n)
for n in ring_files]:
os.utime(gz_file, (88, 88))
self.assertEqual(set([
6016, 6020,
]), cache.all_bind_ports_for_node())
self.assertEqual([
mock.call(os.path.join(tempdir, ring_files[0]),
metadata_only=True),
mock.call(os.path.join(tempdir, ring_files[1]),
metadata_only=True),
mock.call(os.path.join(tempdir, ring_files[2]),
metadata_only=True),
], mock_ld.mock_calls)
mock_ld.reset_mock()
# Don't do something stupid like crash if a ring file is missing.
os.unlink(os.path.join(tempdir, 'object-2.ring.gz'))
self.assertEqual(set([
6016, 6020,
]), cache.all_bind_ports_for_node())
self.assertEqual([], mock_ld.mock_calls)
# whataremyips() is only called in the constructor
self.assertEqual([], mock_whataremyips.mock_calls)
def test_singleton_passthrough(self): def test_singleton_passthrough(self):
test_policies = [StoragePolicy(0, 'aay', True), test_policies = [StoragePolicy(0, 'aay', True),
StoragePolicy(1, 'bee', False), StoragePolicy(1, 'bee', False),

View File

@ -1488,6 +1488,18 @@ class TestUtils(unittest.TestCase):
self.assert_(len(myips) > 1) self.assert_(len(myips) > 1)
self.assert_('127.0.0.1' in myips) self.assert_('127.0.0.1' in myips)
def test_whataremyips_bind_to_all(self):
for any_addr in ('0.0.0.0', '0000:0000:0000:0000:0000:0000:0000:0000',
'::0', '::0000', '::',
# Wacky parse-error input produces all IPs
'I am a bear'):
myips = utils.whataremyips(any_addr)
self.assert_(len(myips) > 1)
self.assert_('127.0.0.1' in myips)
def test_whataremyips_bind_ip_specific(self):
self.assertEqual(['1.2.3.4'], utils.whataremyips('1.2.3.4'))
def test_whataremyips_error(self): def test_whataremyips_error(self):
def my_interfaces(): def my_interfaces():
return ['eth0'] return ['eth0']
@ -1725,6 +1737,21 @@ log_name = %(yarr)s'''
for func in required_func_calls: for func in required_func_calls:
self.assert_(utils.os.called_funcs[func]) self.assert_(utils.os.called_funcs[func])
def test_drop_privileges_no_call_setsid(self):
user = getuser()
# over-ride os with mock
required_func_calls = ('setgroups', 'setgid', 'setuid', 'chdir',
'umask')
bad_func_calls = ('setsid',)
utils.os = MockOs(called_funcs=required_func_calls,
raise_funcs=bad_func_calls)
# exercise the code
utils.drop_privileges(user, call_setsid=False)
for func in required_func_calls:
self.assert_(utils.os.called_funcs[func])
for func in bad_func_calls:
self.assert_(func not in utils.os.called_funcs)
@reset_logger_state @reset_logger_state
def test_capture_stdio(self): def test_capture_stdio(self):
# stubs # stubs

View File

@ -42,7 +42,8 @@ from swift.common.swob import Request
from swift.common import wsgi, utils from swift.common import wsgi, utils
from swift.common.storage_policy import POLICIES from swift.common.storage_policy import POLICIES
from test.unit import temptree, with_tempdir, write_fake_ring, patch_policies from test.unit import (
temptree, with_tempdir, write_fake_ring, patch_policies, FakeLogger)
from paste.deploy import loadwsgi from paste.deploy import loadwsgi
@ -688,6 +689,65 @@ class TestWSGI(unittest.TestCase):
self.assertEqual(calls['_loadapp'], 1) self.assertEqual(calls['_loadapp'], 1)
self.assertEqual(rc, 0) self.assertEqual(rc, 0)
@mock.patch('swift.common.wsgi.run_server')
@mock.patch('swift.common.wsgi.WorkersStrategy')
@mock.patch('swift.common.wsgi.ServersPerPortStrategy')
def test_run_server_strategy_plumbing(self, mock_per_port, mock_workers,
mock_run_server):
# Make sure the right strategy gets used in a number of different
# config cases.
mock_per_port().bind_ports.return_value = 'stop early'
mock_workers().bind_ports.return_value = 'stop early'
logger = FakeLogger()
stub__initrp = [
{'__file__': 'test', 'workers': 2}, # conf
logger,
'log_name',
]
with mock.patch.object(wsgi, '_initrp', return_value=stub__initrp):
for server_type in ('account-server', 'container-server',
'object-server'):
mock_per_port.reset_mock()
mock_workers.reset_mock()
logger._clear()
self.assertEqual(1, wsgi.run_wsgi('conf_file', server_type))
self.assertEqual([
'stop early',
], logger.get_lines_for_level('error'))
self.assertEqual([], mock_per_port.mock_calls)
self.assertEqual([
mock.call(stub__initrp[0], logger),
mock.call().bind_ports(),
], mock_workers.mock_calls)
stub__initrp[0]['servers_per_port'] = 3
for server_type in ('account-server', 'container-server'):
mock_per_port.reset_mock()
mock_workers.reset_mock()
logger._clear()
self.assertEqual(1, wsgi.run_wsgi('conf_file', server_type))
self.assertEqual([
'stop early',
], logger.get_lines_for_level('error'))
self.assertEqual([], mock_per_port.mock_calls)
self.assertEqual([
mock.call(stub__initrp[0], logger),
mock.call().bind_ports(),
], mock_workers.mock_calls)
mock_per_port.reset_mock()
mock_workers.reset_mock()
logger._clear()
self.assertEqual(1, wsgi.run_wsgi('conf_file', 'object-server'))
self.assertEqual([
'stop early',
], logger.get_lines_for_level('error'))
self.assertEqual([
mock.call(stub__initrp[0], logger, servers_per_port=3),
mock.call().bind_ports(),
], mock_per_port.mock_calls)
self.assertEqual([], mock_workers.mock_calls)
def test_run_server_failure1(self): def test_run_server_failure1(self):
calls = defaultdict(lambda: 0) calls = defaultdict(lambda: 0)
@ -751,6 +811,380 @@ class TestWSGI(unittest.TestCase):
self.assertEquals(r.environ['PATH_INFO'], '/override') self.assertEquals(r.environ['PATH_INFO'], '/override')
class TestServersPerPortStrategy(unittest.TestCase):
def setUp(self):
self.logger = FakeLogger()
self.conf = {
'workers': 100, # ignored
'user': 'bob',
'swift_dir': '/jim/cricket',
'ring_check_interval': '76',
'bind_ip': '2.3.4.5',
}
self.servers_per_port = 3
self.s1, self.s2 = mock.MagicMock(), mock.MagicMock()
patcher = mock.patch('swift.common.wsgi.get_socket',
side_effect=[self.s1, self.s2])
self.mock_get_socket = patcher.start()
self.addCleanup(patcher.stop)
patcher = mock.patch('swift.common.wsgi.drop_privileges')
self.mock_drop_privileges = patcher.start()
self.addCleanup(patcher.stop)
patcher = mock.patch('swift.common.wsgi.BindPortsCache')
self.mock_cache_class = patcher.start()
self.addCleanup(patcher.stop)
patcher = mock.patch('swift.common.wsgi.os.setsid')
self.mock_setsid = patcher.start()
self.addCleanup(patcher.stop)
patcher = mock.patch('swift.common.wsgi.os.chdir')
self.mock_chdir = patcher.start()
self.addCleanup(patcher.stop)
patcher = mock.patch('swift.common.wsgi.os.umask')
self.mock_umask = patcher.start()
self.addCleanup(patcher.stop)
self.all_bind_ports_for_node = \
self.mock_cache_class().all_bind_ports_for_node
self.ports = (6006, 6007)
self.all_bind_ports_for_node.return_value = set(self.ports)
self.strategy = wsgi.ServersPerPortStrategy(self.conf, self.logger,
self.servers_per_port)
def test_loop_timeout(self):
# This strategy should loop every ring_check_interval seconds, even if
# no workers exit.
self.assertEqual(76, self.strategy.loop_timeout())
# Check the default
del self.conf['ring_check_interval']
self.strategy = wsgi.ServersPerPortStrategy(self.conf, self.logger,
self.servers_per_port)
self.assertEqual(15, self.strategy.loop_timeout())
def test_bind_ports(self):
self.strategy.bind_ports()
self.assertEqual(set((6006, 6007)), self.strategy.bind_ports)
self.assertEqual([
mock.call({'workers': 100, # ignored
'user': 'bob',
'swift_dir': '/jim/cricket',
'ring_check_interval': '76',
'bind_ip': '2.3.4.5',
'bind_port': 6006}),
mock.call({'workers': 100, # ignored
'user': 'bob',
'swift_dir': '/jim/cricket',
'ring_check_interval': '76',
'bind_ip': '2.3.4.5',
'bind_port': 6007}),
], self.mock_get_socket.mock_calls)
self.assertEqual(
6006, self.strategy.port_pid_state.port_for_sock(self.s1))
self.assertEqual(
6007, self.strategy.port_pid_state.port_for_sock(self.s2))
self.assertEqual([mock.call()], self.mock_setsid.mock_calls)
self.assertEqual([mock.call('/')], self.mock_chdir.mock_calls)
self.assertEqual([mock.call(0o22)], self.mock_umask.mock_calls)
def test_bind_ports_ignores_setsid_errors(self):
self.mock_setsid.side_effect = OSError()
self.strategy.bind_ports()
self.assertEqual(set((6006, 6007)), self.strategy.bind_ports)
self.assertEqual([
mock.call({'workers': 100, # ignored
'user': 'bob',
'swift_dir': '/jim/cricket',
'ring_check_interval': '76',
'bind_ip': '2.3.4.5',
'bind_port': 6006}),
mock.call({'workers': 100, # ignored
'user': 'bob',
'swift_dir': '/jim/cricket',
'ring_check_interval': '76',
'bind_ip': '2.3.4.5',
'bind_port': 6007}),
], self.mock_get_socket.mock_calls)
self.assertEqual(
6006, self.strategy.port_pid_state.port_for_sock(self.s1))
self.assertEqual(
6007, self.strategy.port_pid_state.port_for_sock(self.s2))
self.assertEqual([mock.call()], self.mock_setsid.mock_calls)
self.assertEqual([mock.call('/')], self.mock_chdir.mock_calls)
self.assertEqual([mock.call(0o22)], self.mock_umask.mock_calls)
def test_no_fork_sock(self):
self.assertEqual(None, self.strategy.no_fork_sock())
def test_new_worker_socks(self):
self.strategy.bind_ports()
self.all_bind_ports_for_node.reset_mock()
pid = 88
got_si = []
for s, i in self.strategy.new_worker_socks():
got_si.append((s, i))
self.strategy.register_worker_start(s, i, pid)
pid += 1
self.assertEqual([
(self.s1, 0), (self.s1, 1), (self.s1, 2),
(self.s2, 0), (self.s2, 1), (self.s2, 2),
], got_si)
self.assertEqual([
'Started child %d (PID %d) for port %d' % (0, 88, 6006),
'Started child %d (PID %d) for port %d' % (1, 89, 6006),
'Started child %d (PID %d) for port %d' % (2, 90, 6006),
'Started child %d (PID %d) for port %d' % (0, 91, 6007),
'Started child %d (PID %d) for port %d' % (1, 92, 6007),
'Started child %d (PID %d) for port %d' % (2, 93, 6007),
], self.logger.get_lines_for_level('notice'))
self.logger._clear()
# Steady-state...
self.assertEqual([], list(self.strategy.new_worker_socks()))
self.all_bind_ports_for_node.reset_mock()
# Get rid of servers for ports which disappear from the ring
self.ports = (6007,)
self.all_bind_ports_for_node.return_value = set(self.ports)
self.s1.reset_mock()
self.s2.reset_mock()
with mock.patch('swift.common.wsgi.greenio') as mock_greenio:
self.assertEqual([], list(self.strategy.new_worker_socks()))
self.assertEqual([
mock.call(), # ring_check_interval has passed...
], self.all_bind_ports_for_node.mock_calls)
self.assertEqual([
mock.call.shutdown_safe(self.s1),
], mock_greenio.mock_calls)
self.assertEqual([
mock.call.close(),
], self.s1.mock_calls)
self.assertEqual([], self.s2.mock_calls) # not closed
self.assertEqual([
'Closing unnecessary sock for port %d' % 6006,
], self.logger.get_lines_for_level('notice'))
self.logger._clear()
# Create new socket & workers for new ports that appear in ring
self.ports = (6007, 6009)
self.all_bind_ports_for_node.return_value = set(self.ports)
self.s1.reset_mock()
self.s2.reset_mock()
s3 = mock.MagicMock()
self.mock_get_socket.side_effect = Exception('ack')
# But first make sure we handle failure to bind to the requested port!
got_si = []
for s, i in self.strategy.new_worker_socks():
got_si.append((s, i))
self.strategy.register_worker_start(s, i, pid)
pid += 1
self.assertEqual([], got_si)
self.assertEqual([
'Unable to bind to port %d: %s' % (6009, Exception('ack')),
'Unable to bind to port %d: %s' % (6009, Exception('ack')),
'Unable to bind to port %d: %s' % (6009, Exception('ack')),
], self.logger.get_lines_for_level('critical'))
self.logger._clear()
# Will keep trying, so let it succeed again
self.mock_get_socket.side_effect = [s3]
got_si = []
for s, i in self.strategy.new_worker_socks():
got_si.append((s, i))
self.strategy.register_worker_start(s, i, pid)
pid += 1
self.assertEqual([
(s3, 0), (s3, 1), (s3, 2),
], got_si)
self.assertEqual([
'Started child %d (PID %d) for port %d' % (0, 94, 6009),
'Started child %d (PID %d) for port %d' % (1, 95, 6009),
'Started child %d (PID %d) for port %d' % (2, 96, 6009),
], self.logger.get_lines_for_level('notice'))
self.logger._clear()
# Steady-state...
self.assertEqual([], list(self.strategy.new_worker_socks()))
self.all_bind_ports_for_node.reset_mock()
# Restart a guy who died on us
self.strategy.register_worker_exit(95) # server_idx == 1
got_si = []
for s, i in self.strategy.new_worker_socks():
got_si.append((s, i))
self.strategy.register_worker_start(s, i, pid)
pid += 1
self.assertEqual([
(s3, 1),
], got_si)
self.assertEqual([
'Started child %d (PID %d) for port %d' % (1, 97, 6009),
], self.logger.get_lines_for_level('notice'))
self.logger._clear()
# Check log_sock_exit
self.strategy.log_sock_exit(self.s2, 2)
self.assertEqual([
'Child %d (PID %d, port %d) exiting normally' % (
2, os.getpid(), 6007),
], self.logger.get_lines_for_level('notice'))
# It's ok to register_worker_exit for a PID that's already had its
# socket closed due to orphaning.
# This is one of the workers for port 6006 that already got reaped.
self.assertEqual(None, self.strategy.register_worker_exit(89))
def test_post_fork_hook(self):
self.strategy.post_fork_hook()
self.assertEqual([
mock.call('bob', call_setsid=False),
], self.mock_drop_privileges.mock_calls)
def test_shutdown_sockets(self):
self.strategy.bind_ports()
with mock.patch('swift.common.wsgi.greenio') as mock_greenio:
self.strategy.shutdown_sockets()
self.assertEqual([
mock.call.shutdown_safe(self.s1),
mock.call.shutdown_safe(self.s2),
], mock_greenio.mock_calls)
self.assertEqual([
mock.call.close(),
], self.s1.mock_calls)
self.assertEqual([
mock.call.close(),
], self.s2.mock_calls)
class TestWorkersStrategy(unittest.TestCase):
def setUp(self):
self.logger = FakeLogger()
self.conf = {
'workers': 2,
'user': 'bob',
}
self.strategy = wsgi.WorkersStrategy(self.conf, self.logger)
patcher = mock.patch('swift.common.wsgi.get_socket',
return_value='abc')
self.mock_get_socket = patcher.start()
self.addCleanup(patcher.stop)
patcher = mock.patch('swift.common.wsgi.drop_privileges')
self.mock_drop_privileges = patcher.start()
self.addCleanup(patcher.stop)
def test_loop_timeout(self):
# This strategy should block in the green.os.wait() until a worker
# process exits.
self.assertEqual(None, self.strategy.loop_timeout())
def test_binding(self):
self.assertEqual(None, self.strategy.bind_ports())
self.assertEqual('abc', self.strategy.sock)
self.assertEqual([
mock.call(self.conf),
], self.mock_get_socket.mock_calls)
self.assertEqual([
mock.call('bob'),
], self.mock_drop_privileges.mock_calls)
self.mock_get_socket.side_effect = wsgi.ConfigFilePortError()
self.assertEqual(
'bind_port wasn\'t properly set in the config file. '
'It must be explicitly set to a valid port number.',
self.strategy.bind_ports())
def test_no_fork_sock(self):
self.strategy.bind_ports()
self.assertEqual(None, self.strategy.no_fork_sock())
self.conf['workers'] = 0
self.strategy = wsgi.WorkersStrategy(self.conf, self.logger)
self.strategy.bind_ports()
self.assertEqual('abc', self.strategy.no_fork_sock())
def test_new_worker_socks(self):
self.strategy.bind_ports()
pid = 88
sock_count = 0
for s, i in self.strategy.new_worker_socks():
self.assertEqual('abc', s)
self.assertEqual(None, i) # unused for this strategy
self.strategy.register_worker_start(s, 'unused', pid)
pid += 1
sock_count += 1
self.assertEqual([
'Started child %s' % 88,
'Started child %s' % 89,
], self.logger.get_lines_for_level('notice'))
self.assertEqual(2, sock_count)
self.assertEqual([], list(self.strategy.new_worker_socks()))
sock_count = 0
self.strategy.register_worker_exit(88)
self.assertEqual([
'Removing dead child %s' % 88,
], self.logger.get_lines_for_level('error'))
for s, i in self.strategy.new_worker_socks():
self.assertEqual('abc', s)
self.assertEqual(None, i) # unused for this strategy
self.strategy.register_worker_start(s, 'unused', pid)
pid += 1
sock_count += 1
self.assertEqual(1, sock_count)
self.assertEqual([
'Started child %s' % 88,
'Started child %s' % 89,
'Started child %s' % 90,
], self.logger.get_lines_for_level('notice'))
def test_post_fork_hook(self):
# Just don't crash or do something stupid
self.assertEqual(None, self.strategy.post_fork_hook())
def test_shutdown_sockets(self):
self.mock_get_socket.return_value = mock.MagicMock()
self.strategy.bind_ports()
with mock.patch('swift.common.wsgi.greenio') as mock_greenio:
self.strategy.shutdown_sockets()
self.assertEqual([
mock.call.shutdown_safe(self.mock_get_socket.return_value),
], mock_greenio.mock_calls)
self.assertEqual([
mock.call.close(),
], self.mock_get_socket.return_value.mock_calls)
def test_log_sock_exit(self):
self.strategy.log_sock_exit('blahblah', 'blahblah')
my_pid = os.getpid()
self.assertEqual([
'Child %d exiting normally' % my_pid,
], self.logger.get_lines_for_level('notice'))
class TestWSGIContext(unittest.TestCase): class TestWSGIContext(unittest.TestCase):
def test_app_call(self): def test_app_call(self):

View File

@ -289,7 +289,11 @@ class TestContainerSync(unittest.TestCase):
# those. # those.
cring = FakeRing() cring = FakeRing()
with mock.patch('swift.container.sync.InternalClient'): with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync({}, container_ring=cring) cs = sync.ContainerSync({
'bind_ip': '10.0.0.0',
}, container_ring=cring)
# Plumbing test for bind_ip and whataremyips()
self.assertEqual(['10.0.0.0'], cs._myips)
orig_ContainerBroker = sync.ContainerBroker orig_ContainerBroker = sync.ContainerBroker
try: try:
sync.ContainerBroker = lambda p: FakeContainerBroker( sync.ContainerBroker = lambda p: FakeContainerBroker(

View File

@ -73,16 +73,10 @@ def make_ec_archive_bodies(policy, test_body):
fragment_payloads.append(fragments) fragment_payloads.append(fragments)
# join up the fragment payloads per node # join up the fragment payloads per node
ec_archive_bodies = [''.join(fragments) ec_archive_bodies = [''.join(frags) for frags in zip(*fragment_payloads)]
for fragments in zip(*fragment_payloads)]
return ec_archive_bodies return ec_archive_bodies
def _ips():
return ['127.0.0.1']
object_reconstructor.whataremyips = _ips
def _create_test_rings(path): def _create_test_rings(path):
testgz = os.path.join(path, 'object.ring.gz') testgz = os.path.join(path, 'object.ring.gz')
intended_replica2part2dev_id = [ intended_replica2part2dev_id = [
@ -582,7 +576,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
except AssertionError as e: except AssertionError as e:
extra_info = \ extra_info = \
'\n\n... for %r in part num %s job %r' % ( '\n\n... for %r in part num %s job %r' % (
k, part_num, job_key) k, part_num, job_key)
raise AssertionError(str(e) + extra_info) raise AssertionError(str(e) + extra_info)
else: else:
self.fail( self.fail(
@ -1001,6 +995,7 @@ class TestObjectReconstructor(unittest.TestCase):
def setUp(self): def setUp(self):
self.policy = POLICIES.default self.policy = POLICIES.default
self.policy.object_ring._rtime = time.time() + 3600
self.testdir = tempfile.mkdtemp() self.testdir = tempfile.mkdtemp()
self.devices = os.path.join(self.testdir, 'devices') self.devices = os.path.join(self.testdir, 'devices')
self.local_dev = self.policy.object_ring.devs[0] self.local_dev = self.policy.object_ring.devs[0]
@ -1009,6 +1004,7 @@ class TestObjectReconstructor(unittest.TestCase):
self.conf = { self.conf = {
'devices': self.devices, 'devices': self.devices,
'mount_check': False, 'mount_check': False,
'bind_ip': self.ip,
'bind_port': self.port, 'bind_port': self.port,
} }
self.logger = debug_logger('object-reconstructor') self.logger = debug_logger('object-reconstructor')
@ -1042,9 +1038,7 @@ class TestObjectReconstructor(unittest.TestCase):
utils.mkdirs(os.path.join( utils.mkdirs(os.path.join(
self.devices, self.local_dev['device'], self.devices, self.local_dev['device'],
datadir, str(part))) datadir, str(part)))
with mock.patch('swift.obj.reconstructor.whataremyips', part_infos = list(self.reconstructor.collect_parts())
return_value=[self.ip]):
part_infos = list(self.reconstructor.collect_parts())
found_parts = sorted(int(p['partition']) for p in part_infos) found_parts = sorted(int(p['partition']) for p in part_infos)
self.assertEqual(found_parts, sorted(stub_parts)) self.assertEqual(found_parts, sorted(stub_parts))
for part_info in part_infos: for part_info in part_infos:
@ -1056,10 +1050,112 @@ class TestObjectReconstructor(unittest.TestCase):
diskfile.get_data_dir(self.policy), diskfile.get_data_dir(self.policy),
str(part_info['partition']))) str(part_info['partition'])))
def test_collect_parts_skips_non_local_devs_servers_per_port(self):
self._configure_reconstructor(devices=self.devices, mount_check=False,
bind_ip=self.ip, bind_port=self.port,
servers_per_port=2)
device_parts = {
'sda': (374,),
'sdb': (179, 807), # w/one-serv-per-port, same IP alone is local
'sdc': (363, 468, 843),
'sdd': (912,), # "not local" via different IP
}
for policy in POLICIES:
datadir = diskfile.get_data_dir(policy)
for dev, parts in device_parts.items():
for part in parts:
utils.mkdirs(os.path.join(
self.devices, dev,
datadir, str(part)))
# we're only going to add sda and sdc into the ring
local_devs = ('sda', 'sdb', 'sdc')
stub_ring_devs = [{
'device': dev,
'replication_ip': self.ip,
'replication_port': self.port + 1 if dev == 'sdb' else self.port,
} for dev in local_devs]
stub_ring_devs.append({
'device': 'sdd',
'replication_ip': '127.0.0.88', # not local via IP
'replication_port': self.port,
})
self.reconstructor.bind_ip = '0.0.0.0' # use whataremyips
with nested(mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]),
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs)):
part_infos = list(self.reconstructor.collect_parts())
found_parts = sorted(int(p['partition']) for p in part_infos)
expected_parts = sorted(itertools.chain(
*(device_parts[d] for d in local_devs)))
self.assertEqual(found_parts, expected_parts)
for part_info in part_infos:
self.assertEqual(part_info['policy'], self.policy)
self.assertTrue(part_info['local_dev'] in stub_ring_devs)
dev = part_info['local_dev']
self.assertEqual(part_info['part_path'],
os.path.join(self.devices,
dev['device'],
diskfile.get_data_dir(self.policy),
str(part_info['partition'])))
def test_collect_parts_multi_device_skips_non_non_local_devs(self):
device_parts = {
'sda': (374,),
'sdb': (179, 807), # "not local" via different port
'sdc': (363, 468, 843),
'sdd': (912,), # "not local" via different IP
}
for policy in POLICIES:
datadir = diskfile.get_data_dir(policy)
for dev, parts in device_parts.items():
for part in parts:
utils.mkdirs(os.path.join(
self.devices, dev,
datadir, str(part)))
# we're only going to add sda and sdc into the ring
local_devs = ('sda', 'sdc')
stub_ring_devs = [{
'device': dev,
'replication_ip': self.ip,
'replication_port': self.port,
} for dev in local_devs]
stub_ring_devs.append({
'device': 'sdb',
'replication_ip': self.ip,
'replication_port': self.port + 1, # not local via port
})
stub_ring_devs.append({
'device': 'sdd',
'replication_ip': '127.0.0.88', # not local via IP
'replication_port': self.port,
})
self.reconstructor.bind_ip = '0.0.0.0' # use whataremyips
with nested(mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]),
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs)):
part_infos = list(self.reconstructor.collect_parts())
found_parts = sorted(int(p['partition']) for p in part_infos)
expected_parts = sorted(itertools.chain(
*(device_parts[d] for d in local_devs)))
self.assertEqual(found_parts, expected_parts)
for part_info in part_infos:
self.assertEqual(part_info['policy'], self.policy)
self.assertTrue(part_info['local_dev'] in stub_ring_devs)
dev = part_info['local_dev']
self.assertEqual(part_info['part_path'],
os.path.join(self.devices,
dev['device'],
diskfile.get_data_dir(self.policy),
str(part_info['partition'])))
def test_collect_parts_multi_device_skips_non_ring_devices(self): def test_collect_parts_multi_device_skips_non_ring_devices(self):
device_parts = { device_parts = {
'sda': (374,), 'sda': (374,),
'sdb': (179, 807),
'sdc': (363, 468, 843), 'sdc': (363, 468, 843),
} }
for policy in POLICIES: for policy in POLICIES:
@ -1075,8 +1171,9 @@ class TestObjectReconstructor(unittest.TestCase):
stub_ring_devs = [{ stub_ring_devs = [{
'device': dev, 'device': dev,
'replication_ip': self.ip, 'replication_ip': self.ip,
'replication_port': self.port 'replication_port': self.port,
} for dev in local_devs] } for dev in local_devs]
self.reconstructor.bind_ip = '0.0.0.0' # use whataremyips
with nested(mock.patch('swift.obj.reconstructor.whataremyips', with nested(mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]), return_value=[self.ip]),
mock.patch.object(self.policy.object_ring, '_devs', mock.patch.object(self.policy.object_ring, '_devs',

View File

@ -36,9 +36,8 @@ from swift.obj import diskfile, replicator as object_replicator
from swift.common.storage_policy import StoragePolicy, POLICIES from swift.common.storage_policy import StoragePolicy, POLICIES
def _ips(): def _ips(*args, **kwargs):
return ['127.0.0.0'] return ['127.0.0.0']
object_replicator.whataremyips = _ips
def mock_http_connect(status): def mock_http_connect(status):
@ -171,34 +170,46 @@ class TestObjectReplicator(unittest.TestCase):
rmtree(self.testdir, ignore_errors=1) rmtree(self.testdir, ignore_errors=1)
os.mkdir(self.testdir) os.mkdir(self.testdir)
os.mkdir(self.devices) os.mkdir(self.devices)
os.mkdir(os.path.join(self.devices, 'sda'))
self.objects = os.path.join(self.devices, 'sda', self.objects, self.objects_1, self.parts, self.parts_1 = \
diskfile.get_data_dir(POLICIES[0])) self._write_disk_data('sda')
self.objects_1 = os.path.join(self.devices, 'sda',
diskfile.get_data_dir(POLICIES[1]))
os.mkdir(self.objects)
os.mkdir(self.objects_1)
self.parts = {}
self.parts_1 = {}
for part in ['0', '1', '2', '3']:
self.parts[part] = os.path.join(self.objects, part)
os.mkdir(self.parts[part])
self.parts_1[part] = os.path.join(self.objects_1, part)
os.mkdir(self.parts_1[part])
_create_test_rings(self.testdir) _create_test_rings(self.testdir)
self.logger = debug_logger('test-replicator')
self.conf = dict( self.conf = dict(
bind_ip=_ips()[0], bind_port=6000,
swift_dir=self.testdir, devices=self.devices, mount_check='false', swift_dir=self.testdir, devices=self.devices, mount_check='false',
timeout='300', stats_interval='1', sync_method='rsync') timeout='300', stats_interval='1', sync_method='rsync')
self.replicator = object_replicator.ObjectReplicator(self.conf) self._create_replicator()
self.logger = self.replicator.logger = debug_logger('test-replicator')
self.df_mgr = diskfile.DiskFileManager(self.conf,
self.replicator.logger)
def tearDown(self): def tearDown(self):
rmtree(self.testdir, ignore_errors=1) rmtree(self.testdir, ignore_errors=1)
def _write_disk_data(self, disk_name):
os.mkdir(os.path.join(self.devices, disk_name))
objects = os.path.join(self.devices, disk_name,
diskfile.get_data_dir(POLICIES[0]))
objects_1 = os.path.join(self.devices, disk_name,
diskfile.get_data_dir(POLICIES[1]))
os.mkdir(objects)
os.mkdir(objects_1)
parts = {}
parts_1 = {}
for part in ['0', '1', '2', '3']:
parts[part] = os.path.join(objects, part)
os.mkdir(parts[part])
parts_1[part] = os.path.join(objects_1, part)
os.mkdir(parts_1[part])
return objects, objects_1, parts, parts_1
def _create_replicator(self):
self.replicator = object_replicator.ObjectReplicator(self.conf)
self.replicator.logger = self.logger
self.df_mgr = diskfile.DiskFileManager(self.conf, self.logger)
def test_run_once(self): def test_run_once(self):
conf = dict(swift_dir=self.testdir, devices=self.devices, conf = dict(swift_dir=self.testdir, devices=self.devices,
bind_ip=_ips()[0],
mount_check='false', timeout='300', stats_interval='1') mount_check='false', timeout='300', stats_interval='1')
replicator = object_replicator.ObjectReplicator(conf) replicator = object_replicator.ObjectReplicator(conf)
was_connector = object_replicator.http_connect was_connector = object_replicator.http_connect
@ -260,7 +271,9 @@ class TestObjectReplicator(unittest.TestCase):
process_arg_checker.append( process_arg_checker.append(
(0, '', ['rsync', whole_path_from, rsync_mods])) (0, '', ['rsync', whole_path_from, rsync_mods]))
with _mock_process(process_arg_checker): with _mock_process(process_arg_checker):
replicator.run_once() with mock.patch('swift.obj.replicator.whataremyips',
side_effect=_ips):
replicator.run_once()
self.assertFalse(process_errors) self.assertFalse(process_errors)
object_replicator.http_connect = was_connector object_replicator.http_connect = was_connector
@ -321,17 +334,306 @@ class TestObjectReplicator(unittest.TestCase):
[node['id'] for node in jobs_by_pol_part['12']['nodes']], [2, 3]) [node['id'] for node in jobs_by_pol_part['12']['nodes']], [2, 3])
self.assertEquals( self.assertEquals(
[node['id'] for node in jobs_by_pol_part['13']['nodes']], [3, 1]) [node['id'] for node in jobs_by_pol_part['13']['nodes']], [3, 1])
for part in ['00', '01', '02', '03', ]: for part in ['00', '01', '02', '03']:
for node in jobs_by_pol_part[part]['nodes']: for node in jobs_by_pol_part[part]['nodes']:
self.assertEquals(node['device'], 'sda') self.assertEquals(node['device'], 'sda')
self.assertEquals(jobs_by_pol_part[part]['path'], self.assertEquals(jobs_by_pol_part[part]['path'],
os.path.join(self.objects, part[1:])) os.path.join(self.objects, part[1:]))
for part in ['10', '11', '12', '13', ]: for part in ['10', '11', '12', '13']:
for node in jobs_by_pol_part[part]['nodes']: for node in jobs_by_pol_part[part]['nodes']:
self.assertEquals(node['device'], 'sda') self.assertEquals(node['device'], 'sda')
self.assertEquals(jobs_by_pol_part[part]['path'], self.assertEquals(jobs_by_pol_part[part]['path'],
os.path.join(self.objects_1, part[1:])) os.path.join(self.objects_1, part[1:]))
@mock.patch('swift.obj.replicator.random.shuffle', side_effect=lambda l: l)
def test_collect_jobs_multi_disk(self, mock_shuffle):
devs = [
# Two disks on same IP/port
{'id': 0, 'device': 'sda', 'zone': 0,
'region': 1, 'ip': '1.1.1.1', 'port': 1111,
'replication_ip': '127.0.0.0', 'replication_port': 6000},
{'id': 1, 'device': 'sdb', 'zone': 1,
'region': 1, 'ip': '1.1.1.1', 'port': 1111,
'replication_ip': '127.0.0.0', 'replication_port': 6000},
# Two disks on same server, different ports
{'id': 2, 'device': 'sdc', 'zone': 2,
'region': 2, 'ip': '1.1.1.2', 'port': 1112,
'replication_ip': '127.0.0.1', 'replication_port': 6000},
{'id': 3, 'device': 'sdd', 'zone': 4,
'region': 2, 'ip': '1.1.1.2', 'port': 1112,
'replication_ip': '127.0.0.1', 'replication_port': 6001},
]
objects_sdb, objects_1_sdb, _, _ = self._write_disk_data('sdb')
objects_sdc, objects_1_sdc, _, _ = self._write_disk_data('sdc')
objects_sdd, objects_1_sdd, _, _ = self._write_disk_data('sdd')
_create_test_rings(self.testdir, devs)
jobs = self.replicator.collect_jobs()
self.assertEqual([mock.call(jobs)], mock_shuffle.mock_calls)
jobs_to_delete = [j for j in jobs if j['delete']]
self.assertEquals(len(jobs_to_delete), 4)
self.assertEqual([
'1', '2', # policy 0; 1 not on sda, 2 not on sdb
'1', '2', # policy 1; 1 not on sda, 2 not on sdb
], [j['partition'] for j in jobs_to_delete])
jobs_by_pol_part_dev = {}
for job in jobs:
# There should be no jobs with a device not in just sda & sdb
self.assertTrue(job['device'] in ('sda', 'sdb'))
jobs_by_pol_part_dev[
str(int(job['policy'])) + job['partition'] + job['device']
] = job
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['00sda']['nodes']],
[1, 2])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['00sdb']['nodes']],
[0, 2])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['01sda']['nodes']],
[1, 2, 3])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['01sdb']['nodes']],
[2, 3])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['02sda']['nodes']],
[2, 3])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['02sdb']['nodes']],
[2, 3, 0])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['03sda']['nodes']],
[3, 1])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['03sdb']['nodes']],
[3, 0])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['10sda']['nodes']],
[1, 2])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['10sdb']['nodes']],
[0, 2])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['11sda']['nodes']],
[1, 2, 3])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['11sdb']['nodes']],
[2, 3])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['12sda']['nodes']],
[2, 3])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['12sdb']['nodes']],
[2, 3, 0])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['13sda']['nodes']],
[3, 1])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['13sdb']['nodes']],
[3, 0])
for part in ['00', '01', '02', '03']:
self.assertEquals(jobs_by_pol_part_dev[part + 'sda']['path'],
os.path.join(self.objects, part[1:]))
self.assertEquals(jobs_by_pol_part_dev[part + 'sdb']['path'],
os.path.join(objects_sdb, part[1:]))
for part in ['10', '11', '12', '13']:
self.assertEquals(jobs_by_pol_part_dev[part + 'sda']['path'],
os.path.join(self.objects_1, part[1:]))
self.assertEquals(jobs_by_pol_part_dev[part + 'sdb']['path'],
os.path.join(objects_1_sdb, part[1:]))
@mock.patch('swift.obj.replicator.random.shuffle', side_effect=lambda l: l)
def test_collect_jobs_multi_disk_diff_ports_normal(self, mock_shuffle):
# Normally (servers_per_port=0), replication_ip AND replication_port
# are used to determine local ring device entries. Here we show that
# with bind_ip='127.0.0.1', bind_port=6000, only "sdc" is local.
devs = [
# Two disks on same IP/port
{'id': 0, 'device': 'sda', 'zone': 0,
'region': 1, 'ip': '1.1.1.1', 'port': 1111,
'replication_ip': '127.0.0.0', 'replication_port': 6000},
{'id': 1, 'device': 'sdb', 'zone': 1,
'region': 1, 'ip': '1.1.1.1', 'port': 1111,
'replication_ip': '127.0.0.0', 'replication_port': 6000},
# Two disks on same server, different ports
{'id': 2, 'device': 'sdc', 'zone': 2,
'region': 2, 'ip': '1.1.1.2', 'port': 1112,
'replication_ip': '127.0.0.1', 'replication_port': 6000},
{'id': 3, 'device': 'sdd', 'zone': 4,
'region': 2, 'ip': '1.1.1.2', 'port': 1112,
'replication_ip': '127.0.0.1', 'replication_port': 6001},
]
objects_sdb, objects_1_sdb, _, _ = self._write_disk_data('sdb')
objects_sdc, objects_1_sdc, _, _ = self._write_disk_data('sdc')
objects_sdd, objects_1_sdd, _, _ = self._write_disk_data('sdd')
_create_test_rings(self.testdir, devs)
self.conf['bind_ip'] = '127.0.0.1'
self._create_replicator()
jobs = self.replicator.collect_jobs()
self.assertEqual([mock.call(jobs)], mock_shuffle.mock_calls)
jobs_to_delete = [j for j in jobs if j['delete']]
self.assertEquals(len(jobs_to_delete), 2)
self.assertEqual([
'3', # policy 0; 3 not on sdc
'3', # policy 1; 3 not on sdc
], [j['partition'] for j in jobs_to_delete])
jobs_by_pol_part_dev = {}
for job in jobs:
# There should be no jobs with a device not sdc
self.assertEqual(job['device'], 'sdc')
jobs_by_pol_part_dev[
str(int(job['policy'])) + job['partition'] + job['device']
] = job
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['00sdc']['nodes']],
[0, 1])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['01sdc']['nodes']],
[1, 3])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['02sdc']['nodes']],
[3, 0])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['03sdc']['nodes']],
[3, 0, 1])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['10sdc']['nodes']],
[0, 1])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['11sdc']['nodes']],
[1, 3])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['12sdc']['nodes']],
[3, 0])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['13sdc']['nodes']],
[3, 0, 1])
for part in ['00', '01', '02', '03']:
self.assertEquals(jobs_by_pol_part_dev[part + 'sdc']['path'],
os.path.join(objects_sdc, part[1:]))
for part in ['10', '11', '12', '13']:
self.assertEquals(jobs_by_pol_part_dev[part + 'sdc']['path'],
os.path.join(objects_1_sdc, part[1:]))
@mock.patch('swift.obj.replicator.random.shuffle', side_effect=lambda l: l)
def test_collect_jobs_multi_disk_servers_per_port(self, mock_shuffle):
# Normally (servers_per_port=0), replication_ip AND replication_port
# are used to determine local ring device entries. Here we show that
# with servers_per_port > 0 and bind_ip='127.0.0.1', bind_port=6000,
# then both "sdc" and "sdd" are local.
devs = [
# Two disks on same IP/port
{'id': 0, 'device': 'sda', 'zone': 0,
'region': 1, 'ip': '1.1.1.1', 'port': 1111,
'replication_ip': '127.0.0.0', 'replication_port': 6000},
{'id': 1, 'device': 'sdb', 'zone': 1,
'region': 1, 'ip': '1.1.1.1', 'port': 1111,
'replication_ip': '127.0.0.0', 'replication_port': 6000},
# Two disks on same server, different ports
{'id': 2, 'device': 'sdc', 'zone': 2,
'region': 2, 'ip': '1.1.1.2', 'port': 1112,
'replication_ip': '127.0.0.1', 'replication_port': 6000},
{'id': 3, 'device': 'sdd', 'zone': 4,
'region': 2, 'ip': '1.1.1.2', 'port': 1112,
'replication_ip': '127.0.0.1', 'replication_port': 6001},
]
objects_sdb, objects_1_sdb, _, _ = self._write_disk_data('sdb')
objects_sdc, objects_1_sdc, _, _ = self._write_disk_data('sdc')
objects_sdd, objects_1_sdd, _, _ = self._write_disk_data('sdd')
_create_test_rings(self.testdir, devs)
self.conf['bind_ip'] = '127.0.0.1'
self.conf['servers_per_port'] = 1 # diff port ok
self._create_replicator()
jobs = self.replicator.collect_jobs()
self.assertEqual([mock.call(jobs)], mock_shuffle.mock_calls)
jobs_to_delete = [j for j in jobs if j['delete']]
self.assertEquals(len(jobs_to_delete), 4)
self.assertEqual([
'3', '0', # policy 0; 3 not on sdc, 0 not on sdd
'3', '0', # policy 1; 3 not on sdc, 0 not on sdd
], [j['partition'] for j in jobs_to_delete])
jobs_by_pol_part_dev = {}
for job in jobs:
# There should be no jobs with a device not in just sdc & sdd
self.assertTrue(job['device'] in ('sdc', 'sdd'))
jobs_by_pol_part_dev[
str(int(job['policy'])) + job['partition'] + job['device']
] = job
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['00sdc']['nodes']],
[0, 1])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['00sdd']['nodes']],
[0, 1, 2])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['01sdc']['nodes']],
[1, 3])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['01sdd']['nodes']],
[1, 2])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['02sdc']['nodes']],
[3, 0])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['02sdd']['nodes']],
[2, 0])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['03sdc']['nodes']],
[3, 0, 1])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['03sdd']['nodes']],
[0, 1])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['10sdc']['nodes']],
[0, 1])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['10sdd']['nodes']],
[0, 1, 2])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['11sdc']['nodes']],
[1, 3])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['11sdd']['nodes']],
[1, 2])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['12sdc']['nodes']],
[3, 0])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['12sdd']['nodes']],
[2, 0])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['13sdc']['nodes']],
[3, 0, 1])
self.assertEquals([node['id']
for node in jobs_by_pol_part_dev['13sdd']['nodes']],
[0, 1])
for part in ['00', '01', '02', '03']:
self.assertEquals(jobs_by_pol_part_dev[part + 'sdc']['path'],
os.path.join(objects_sdc, part[1:]))
self.assertEquals(jobs_by_pol_part_dev[part + 'sdd']['path'],
os.path.join(objects_sdd, part[1:]))
for part in ['10', '11', '12', '13']:
self.assertEquals(jobs_by_pol_part_dev[part + 'sdc']['path'],
os.path.join(objects_1_sdc, part[1:]))
self.assertEquals(jobs_by_pol_part_dev[part + 'sdd']['path'],
os.path.join(objects_1_sdd, part[1:]))
def test_collect_jobs_handoffs_first(self): def test_collect_jobs_handoffs_first(self):
self.replicator.handoffs_first = True self.replicator.handoffs_first = True
jobs = self.replicator.collect_jobs() jobs = self.replicator.collect_jobs()
@ -929,6 +1231,7 @@ class TestObjectReplicator(unittest.TestCase):
def test_run_once_recover_from_failure(self): def test_run_once_recover_from_failure(self):
conf = dict(swift_dir=self.testdir, devices=self.devices, conf = dict(swift_dir=self.testdir, devices=self.devices,
bind_ip=_ips()[0],
mount_check='false', timeout='300', stats_interval='1') mount_check='false', timeout='300', stats_interval='1')
replicator = object_replicator.ObjectReplicator(conf) replicator = object_replicator.ObjectReplicator(conf)
was_connector = object_replicator.http_connect was_connector = object_replicator.http_connect
@ -975,6 +1278,7 @@ class TestObjectReplicator(unittest.TestCase):
def test_run_once_recover_from_timeout(self): def test_run_once_recover_from_timeout(self):
conf = dict(swift_dir=self.testdir, devices=self.devices, conf = dict(swift_dir=self.testdir, devices=self.devices,
bind_ips=_ips()[0],
mount_check='false', timeout='300', stats_interval='1') mount_check='false', timeout='300', stats_interval='1')
replicator = object_replicator.ObjectReplicator(conf) replicator = object_replicator.ObjectReplicator(conf)
was_connector = object_replicator.http_connect was_connector = object_replicator.http_connect