diff --git a/doc/source/deployment_guide.rst b/doc/source/deployment_guide.rst index 552dfea314..a137d786b0 100644 --- a/doc/source/deployment_guide.rst +++ b/doc/source/deployment_guide.rst @@ -139,6 +139,72 @@ swift-ring-builder with no options will display help text with available commands and options. More information on how the ring works internally can be found in the :doc:`Ring Overview `. +.. _server-per-port-configuration: + +------------------------------- +Running object-servers Per Disk +------------------------------- + +The lack of true asynchronous file I/O on Linux leaves the object-server +workers vulnerable to misbehaving disks. Because any object-server worker can +service a request for any disk, and a slow I/O request blocks the eventlet hub, +a single slow disk can impair an entire storage node. This also prevents +object servers from fully utilizing all their disks during heavy load. + +The :ref:`threads_per_disk ` option was one way to +address this, but came with severe performance overhead which was worse +than the benefit of I/O isolation. Any clusters using threads_per_disk should +switch to using `servers_per_port`. + +Another way to get full I/O isolation is to give each disk on a storage node a +different port in the storage policy rings. Then set the +:ref:`servers_per_port ` +option in the object-server config. NOTE: while the purpose of this config +setting is to run one or more object-server worker processes per *disk*, the +implementation just runs object-servers per unique port of local devices in the +rings. The deployer must combine this option with appropriately-configured +rings to benefit from this feature. + +Here's an example (abbreviated) old-style ring (2 node cluster with 2 disks +each):: + + Devices: id region zone ip address port replication ip replication port name + 0 1 1 1.1.0.1 6000 1.1.0.1 6000 d1 + 1 1 1 1.1.0.1 6000 1.1.0.1 6000 d2 + 2 1 2 1.1.0.2 6000 1.1.0.2 6000 d3 + 3 1 2 1.1.0.2 6000 1.1.0.2 6000 d4 + +And here's the same ring set up for `servers_per_port`:: + + Devices: id region zone ip address port replication ip replication port name + 0 1 1 1.1.0.1 6000 1.1.0.1 6000 d1 + 1 1 1 1.1.0.1 6001 1.1.0.1 6001 d2 + 2 1 2 1.1.0.2 6000 1.1.0.2 6000 d3 + 3 1 2 1.1.0.2 6001 1.1.0.2 6001 d4 + +When migrating from normal to `servers_per_port`, perform these steps in order: + + #. Upgrade Swift code to a version capable of doing `servers_per_port`. + + #. Enable `servers_per_port` with a > 0 value + + #. Restart `swift-object-server` processes with a SIGHUP. At this point, you + will have the `servers_per_port` number of `swift-object-server` processes + serving all requests for all disks on each node. This preserves + availability, but you should perform the next step as quickly as possible. + + #. Push out new rings that actually have different ports per disk on each + server. One of the ports in the new ring should be the same as the port + used in the old ring ("6000" in the example above). This will cover + existing proxy-server processes who haven't loaded the new ring yet. They + can still talk to any storage node regardless of whether or not that + storage node has loaded the ring and started object-server processes on the + new ports. + +If you do not run a separate object-server for replication, then this setting +must be available to the object-replicator and object-reconstructor (i.e. +appear in the [DEFAULT] config section). + .. _general-service-configuration: ----------------------------- @@ -149,14 +215,14 @@ Most Swift services fall into two categories. Swift's wsgi servers and background daemons. For more information specific to the configuration of Swift's wsgi servers -with paste deploy see :ref:`general-server-configuration` +with paste deploy see :ref:`general-server-configuration`. Configuration for servers and daemons can be expressed together in the same file for each type of server, or separately. If a required section for the service trying to start is missing there will be an error. The sections not used by the service are ignored. -Consider the example of an object storage node. By convention configuration +Consider the example of an object storage node. By convention, configuration for the object-server, object-updater, object-replicator, and object-auditor exist in a single file ``/etc/swift/object-server.conf``:: @@ -323,7 +389,7 @@ max_header_size 8192 max_header_size is the max number of bytes in tokens including more than 7 catalog entries. See also include_service_catalog in proxy-server.conf-sample (documented in - overview_auth.rst) + overview_auth.rst). =================== ========== ============================================= --------------------------- @@ -335,6 +401,8 @@ etc/object-server.conf-sample in the source code repository. The following configuration options are available: +.. _object-server-default-options: + [DEFAULT] =================== ========== ============================================= @@ -353,12 +421,30 @@ workers auto Override the number of pre-forked workers should be an integer, zero means no fork. If unset, it will try to default to the number of effective cpu cores and fallback to one. - Increasing the number of workers may reduce - the possibility of slow file system - operations in one request from negatively - impacting other requests, but may not be as - efficient as tuning :ref:`threads_per_disk - ` + Increasing the number of workers helps slow + filesystem operations in one request from + negatively impacting other requests, but only + the :ref:`servers_per_port + ` + option provides complete I/O isolation with + no measurable overhead. +servers_per_port 0 If each disk in each storage policy ring has + unique port numbers for its "ip" value, you + can use this setting to have each + object-server worker only service requests + for the single disk matching the port in the + ring. The value of this setting determines + how many worker processes run for each port + (disk) in the ring. If you have 24 disks + per server, and this setting is 4, then + each storage node will have 1 + (24 * 4) = + 97 total object-server processes running. + This gives complete I/O isolation, drastically + reducing the impact of slow disks on storage + node performance. The object-replicator and + object-reconstructor need to see this setting + too, so it must be in the [DEFAULT] section. + See :ref:`server-per-port-configuration`. max_clients 1024 Maximum number of clients one worker can process simultaneously (it will actually accept(2) N + 1). Setting this to one (1) @@ -421,13 +507,12 @@ keep_cache_private false Allow non-public objects to stay threads_per_disk 0 Size of the per-disk thread pool used for performing disk I/O. The default of 0 means to not use a - per-disk thread pool. It is - recommended to keep this value - small, as large values can result - in high read latencies due to - large queue depths. A good - starting point is 4 threads per - disk. + per-disk thread pool. + This option is no longer + recommended and the + :ref:`servers_per_port + ` + should be used instead. replication_concurrency 4 Set to restrict the number of concurrent incoming REPLICATION requests; set to 0 for unlimited @@ -562,7 +647,7 @@ workers auto Override the number of pre-forked workers the possibility of slow file system operations in one request from negatively impacting other requests. See - :ref:`general-service-tuning` + :ref:`general-service-tuning`. max_clients 1024 Maximum number of clients one worker can process simultaneously (it will actually accept(2) N + 1). Setting this to one (1) @@ -690,7 +775,7 @@ workers auto Override the number of pre-forked workers the possibility of slow file system operations in one request from negatively impacting other requests. See - :ref:`general-service-tuning` + :ref:`general-service-tuning`. max_clients 1024 Maximum number of clients one worker can process simultaneously (it will actually accept(2) N + 1). Setting this to one (1) @@ -813,7 +898,7 @@ workers auto Override the number of will try to default to the number of effective cpu cores and fallback to one. See - :ref:`general-service-tuning` + :ref:`general-service-tuning`. max_clients 1024 Maximum number of clients one worker can process simultaneously (it will diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index 4fafa7c18b..b36ec29aa6 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -12,9 +12,16 @@ bind_port = 6000 # expiring_objects_account_name = expiring_objects # # Use an integer to override the number of pre-forked processes that will -# accept connections. +# accept connections. NOTE: if servers_per_port is set, this setting is +# ignored. # workers = auto # +# Make object-server run this many worker processes per unique port of +# "local" ring devices across all storage policies. This can help provide +# the isolation of threads_per_disk without the severe overhead. The default +# value of 0 disables this feature. +# servers_per_port = 0 +# # Maximum concurrent requests per worker # max_clients = 1024 # diff --git a/swift/account/reaper.py b/swift/account/reaper.py index 9eaee561ec..c121bf0ea5 100644 --- a/swift/account/reaper.py +++ b/swift/account/reaper.py @@ -69,7 +69,7 @@ class AccountReaper(Daemon): self.object_ring = None self.node_timeout = int(conf.get('node_timeout', 10)) self.conn_timeout = float(conf.get('conn_timeout', 0.5)) - self.myips = whataremyips() + self.myips = whataremyips(conf.get('bind_ip', '0.0.0.0')) self.concurrency = int(conf.get('concurrency', 25)) self.container_concurrency = self.object_concurrency = \ sqrt(self.concurrency) diff --git a/swift/common/db_replicator.py b/swift/common/db_replicator.py index aa91faaf33..151a070c07 100644 --- a/swift/common/db_replicator.py +++ b/swift/common/db_replicator.py @@ -154,6 +154,7 @@ class Replicator(Daemon): self.logger = logger or get_logger(conf, log_route='replicator') self.root = conf.get('devices', '/srv/node') self.mount_check = config_true_value(conf.get('mount_check', 'true')) + self.bind_ip = conf.get('bind_ip', '0.0.0.0') self.port = int(conf.get('bind_port', self.default_port)) concurrency = int(conf.get('concurrency', 8)) self.cpool = GreenPool(size=concurrency) @@ -580,7 +581,7 @@ class Replicator(Daemon): """Run a replication pass once.""" self._zero_stats() dirs = [] - ips = whataremyips() + ips = whataremyips(self.bind_ip) if not ips: self.logger.error(_('ERROR Failed to get my own IPs?')) return diff --git a/swift/common/ring/ring.py b/swift/common/ring/ring.py index d4feaa8e23..461ccae640 100644 --- a/swift/common/ring/ring.py +++ b/swift/common/ring/ring.py @@ -44,10 +44,29 @@ class RingData(object): dev.setdefault("region", 1) @classmethod - def deserialize_v1(cls, gz_file): + def deserialize_v1(cls, gz_file, metadata_only=False): + """ + Deserialize a v1 ring file into a dictionary with `devs`, `part_shift`, + and `replica2part2dev_id` keys. + + If the optional kwarg `metadata_only` is True, then the + `replica2part2dev_id` is not loaded and that key in the returned + dictionary just has the value `[]`. + + :param file gz_file: An opened file-like object which has already + consumed the 6 bytes of magic and version. + :param bool metadata_only: If True, only load `devs` and `part_shift` + :returns: A dict containing `devs`, `part_shift`, and + `replica2part2dev_id` + """ + json_len, = struct.unpack('!I', gz_file.read(4)) ring_dict = json.loads(gz_file.read(json_len)) ring_dict['replica2part2dev_id'] = [] + + if metadata_only: + return ring_dict + partition_count = 1 << (32 - ring_dict['part_shift']) for x in xrange(ring_dict['replica_count']): ring_dict['replica2part2dev_id'].append( @@ -55,11 +74,12 @@ class RingData(object): return ring_dict @classmethod - def load(cls, filename): + def load(cls, filename, metadata_only=False): """ Load ring data from a file. :param filename: Path to a file serialized by the save() method. + :param bool metadata_only: If True, only load `devs` and `part_shift`. :returns: A RingData instance containing the loaded data. """ gz_file = GzipFile(filename, 'rb') @@ -70,15 +90,18 @@ class RingData(object): # See if the file is in the new format magic = gz_file.read(4) if magic == 'R1NG': - version, = struct.unpack('!H', gz_file.read(2)) - if version == 1: - ring_data = cls.deserialize_v1(gz_file) + format_version, = struct.unpack('!H', gz_file.read(2)) + if format_version == 1: + ring_data = cls.deserialize_v1( + gz_file, metadata_only=metadata_only) else: - raise Exception('Unknown ring format version %d' % version) + raise Exception('Unknown ring format version %d' % + format_version) else: # Assume old-style pickled ring gz_file.seek(0) ring_data = pickle.load(gz_file) + if not hasattr(ring_data, 'devs'): ring_data = RingData(ring_data['replica2part2dev_id'], ring_data['devs'], ring_data['part_shift']) diff --git a/swift/common/ring/utils.py b/swift/common/ring/utils.py index 4fcee2eb24..7d7856ebfc 100644 --- a/swift/common/ring/utils.py +++ b/swift/common/ring/utils.py @@ -235,9 +235,14 @@ def is_local_device(my_ips, my_port, dev_ip, dev_port): Return True if the provided dev_ip and dev_port are among the IP addresses specified in my_ips and my_port respectively. + To support accurate locality determination in the server-per-port + deployment, when my_port is None, only IP addresses are used for + determining locality (dev_port is ignored). + If dev_ip is a hostname then it is first translated to an IP address before checking it against my_ips. """ + candidate_ips = [] if not is_valid_ip(dev_ip) and is_valid_hostname(dev_ip): try: # get the ip for this host; use getaddrinfo so that @@ -248,12 +253,19 @@ def is_local_device(my_ips, my_port, dev_ip, dev_port): dev_ip = addr[4][0] # get the ip-address if family == socket.AF_INET6: dev_ip = expand_ipv6(dev_ip) - if dev_ip in my_ips and dev_port == my_port: - return True - return False + candidate_ips.append(dev_ip) except socket.gaierror: return False - return dev_ip in my_ips and dev_port == my_port + else: + if is_valid_ipv6(dev_ip): + dev_ip = expand_ipv6(dev_ip) + candidate_ips = [dev_ip] + + for dev_ip in candidate_ips: + if dev_ip in my_ips and (my_port is None or dev_port == my_port): + return True + + return False def parse_search_value(search_value): diff --git a/swift/common/storage_policy.py b/swift/common/storage_policy.py index e45ab018c5..fcda344b56 100644 --- a/swift/common/storage_policy.py +++ b/swift/common/storage_policy.py @@ -12,11 +12,13 @@ # limitations under the License. from ConfigParser import ConfigParser -import textwrap +import os import string +import textwrap -from swift.common.utils import config_true_value, SWIFT_CONF_FILE -from swift.common.ring import Ring +from swift.common.utils import ( + config_true_value, SWIFT_CONF_FILE, whataremyips) +from swift.common.ring import Ring, RingData from swift.common.utils import quorum_size from swift.common.exceptions import RingValidationError from pyeclib.ec_iface import ECDriver, ECDriverError, VALID_EC_TYPES @@ -30,6 +32,53 @@ EC_POLICY = 'erasure_coding' DEFAULT_EC_OBJECT_SEGMENT_SIZE = 1048576 +class BindPortsCache(object): + def __init__(self, swift_dir, bind_ip): + self.swift_dir = swift_dir + self.mtimes_by_ring_path = {} + self.portsets_by_ring_path = {} + self.my_ips = set(whataremyips(bind_ip)) + + def all_bind_ports_for_node(self): + """ + Given an iterable of IP addresses identifying a storage backend server, + return a set of all bind ports defined in all rings for this storage + backend server. + + The caller is responsible for not calling this method (which performs + at least a stat on all ring files) too frequently. + """ + # NOTE: we don't worry about disappearing rings here because you can't + # ever delete a storage policy. + + for policy in POLICIES: + # NOTE: we must NOT use policy.load_ring to load the ring. Users + # of this utility function will not need the actual ring data, just + # the bind ports. + # + # This is duplicated with Ring.__init__ just a bit... + serialized_path = os.path.join(self.swift_dir, + policy.ring_name + '.ring.gz') + try: + new_mtime = os.path.getmtime(serialized_path) + except OSError: + continue + old_mtime = self.mtimes_by_ring_path.get(serialized_path) + if not old_mtime or old_mtime != new_mtime: + self.portsets_by_ring_path[serialized_path] = set( + dev['port'] + for dev in RingData.load(serialized_path, + metadata_only=True).devs + if dev and dev['ip'] in self.my_ips) + self.mtimes_by_ring_path[serialized_path] = new_mtime + # No "break" here so that the above line will update the + # mtimes_by_ring_path entry for any ring that changes, not just + # the first one we notice. + + # Return the requested set of ports from our (now-freshened) cache + return reduce(set.union, self.portsets_by_ring_path.values(), set()) + + class PolicyError(ValueError): def __init__(self, msg, index=None): @@ -291,7 +340,7 @@ class ECStoragePolicy(BaseStoragePolicy): if ec_type not in VALID_EC_TYPES: raise PolicyError('Wrong ec_type %s for policy %s, should be one' ' of "%s"' % (ec_type, self.name, - ', '.join(VALID_EC_TYPES))) + ', '.join(VALID_EC_TYPES))) self._ec_type = ec_type # Define _ec_ndata as the number of EC data fragments @@ -427,8 +476,9 @@ class ECStoragePolicy(BaseStoragePolicy): if nodes_configured != (self.ec_ndata + self.ec_nparity): raise RingValidationError( 'EC ring for policy %s needs to be configured with ' - 'exactly %d nodes. Got %d.' % (self.name, - self.ec_ndata + self.ec_nparity, nodes_configured)) + 'exactly %d nodes. Got %d.' % ( + self.name, self.ec_ndata + self.ec_nparity, + nodes_configured)) @property def quorum(self): diff --git a/swift/common/utils.py b/swift/common/utils.py index d470fb9970..63919af1ec 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -1589,7 +1589,7 @@ def get_hub(): return None -def drop_privileges(user): +def drop_privileges(user, call_setsid=True): """ Sets the userid/groupid of the current process, get session leader, etc. @@ -1602,10 +1602,11 @@ def drop_privileges(user): os.setgid(user[3]) os.setuid(user[2]) os.environ['HOME'] = user[5] - try: - os.setsid() - except OSError: - pass + if call_setsid: + try: + os.setsid() + except OSError: + pass os.chdir('/') # in case you need to rmdir on where you started the daemon os.umask(0o22) # ensure files are created with the correct privileges @@ -1706,12 +1707,28 @@ def expand_ipv6(address): return socket.inet_ntop(socket.AF_INET6, packed_ip) -def whataremyips(): +def whataremyips(bind_ip=None): """ - Get the machine's ip addresses + Get "our" IP addresses ("us" being the set of services configured by + one *.conf file). If our REST listens on a specific address, return it. + Otherwise, if listen on '0.0.0.0' or '::' return all addresses, including + the loopback. + :param str bind_ip: Optional bind_ip from a config file; may be IP address + or hostname. :returns: list of Strings of ip addresses """ + if bind_ip: + # See if bind_ip is '0.0.0.0'/'::' + try: + _, _, _, _, sockaddr = socket.getaddrinfo( + bind_ip, None, 0, socket.SOCK_STREAM, 0, + socket.AI_NUMERICHOST)[0] + if sockaddr[0] not in ('0.0.0.0', '::'): + return [bind_ip] + except socket.gaierror: + pass + addresses = [] for interface in netifaces.interfaces(): try: diff --git a/swift/common/wsgi.py b/swift/common/wsgi.py index b87fde4a02..d7a6102d62 100644 --- a/swift/common/wsgi.py +++ b/swift/common/wsgi.py @@ -29,12 +29,13 @@ from textwrap import dedent import eventlet import eventlet.debug -from eventlet import greenio, GreenPool, sleep, wsgi, listen +from eventlet import greenio, GreenPool, sleep, wsgi, listen, Timeout from paste.deploy import loadwsgi -from eventlet.green import socket, ssl +from eventlet.green import socket, ssl, os as green_os from urllib import unquote from swift.common import utils, constraints +from swift.common.storage_policy import BindPortsCache from swift.common.swob import Request from swift.common.utils import capture_stdio, disable_fallocate, \ drop_privileges, get_logger, NullLogger, config_true_value, \ @@ -437,10 +438,414 @@ def run_server(conf, logger, sock, global_conf=None): pool.waitall() -#TODO(clayg): pull more pieces of this to test more +class WorkersStrategy(object): + """ + WSGI server management strategy object for a single bind port and listen + socket shared by a configured number of forked-off workers. + + Used in :py:func:`run_wsgi`. + + :param dict conf: Server configuration dictionary. + :param logger: The server's :py:class:`~swift.common.utils.LogAdaptor` + object. + """ + + def __init__(self, conf, logger): + self.conf = conf + self.logger = logger + self.sock = None + self.children = [] + self.worker_count = config_auto_int_value(conf.get('workers'), + CPU_COUNT) + + def loop_timeout(self): + """ + :returns: None; to block in :py:func:`green.os.wait` + """ + + return None + + def bind_ports(self): + """ + Bind the one listen socket for this strategy and drop privileges + (since the parent process will never need to bind again). + """ + + try: + self.sock = get_socket(self.conf) + except ConfigFilePortError: + msg = 'bind_port wasn\'t properly set in the config file. ' \ + 'It must be explicitly set to a valid port number.' + return msg + drop_privileges(self.conf.get('user', 'swift')) + + def no_fork_sock(self): + """ + Return a server listen socket if the server should run in the + foreground (no fork). + """ + + # Useful for profiling [no forks]. + if self.worker_count == 0: + return self.sock + + def new_worker_socks(self): + """ + Yield a sequence of (socket, opqaue_data) tuples for each server which + should be forked-off and started. + + The opaque_data item for each socket will passed into the + :py:meth:`log_sock_exit` and :py:meth:`register_worker_start` methods + where it will be ignored. + """ + + while len(self.children) < self.worker_count: + yield self.sock, None + + def post_fork_hook(self): + """ + Perform any initialization in a forked-off child process prior to + starting the wsgi server. + """ + + pass + + def log_sock_exit(self, sock, _unused): + """ + Log a server's exit. + + :param socket sock: The listen socket for the worker just started. + :param _unused: The socket's opaque_data yielded by + :py:meth:`new_worker_socks`. + """ + + self.logger.notice('Child %d exiting normally' % os.getpid()) + + def register_worker_start(self, sock, _unused, pid): + """ + Called when a new worker is started. + + :param socket sock: The listen socket for the worker just started. + :param _unused: The socket's opaque_data yielded by new_worker_socks(). + :param int pid: The new worker process' PID + """ + + self.logger.notice('Started child %s' % pid) + self.children.append(pid) + + def register_worker_exit(self, pid): + """ + Called when a worker has exited. + + :param int pid: The PID of the worker that exited. + """ + + self.logger.error('Removing dead child %s' % pid) + self.children.remove(pid) + + def shutdown_sockets(self): + """ + Shutdown any listen sockets. + """ + + greenio.shutdown_safe(self.sock) + self.sock.close() + + +class PortPidState(object): + """ + A helper class for :py:class:`ServersPerPortStrategy` to track listen + sockets and PIDs for each port. + + :param int servers_per_port: The configured number of servers per port. + :param logger: The server's :py:class:`~swift.common.utils.LogAdaptor` + """ + + def __init__(self, servers_per_port, logger): + self.servers_per_port = servers_per_port + self.logger = logger + self.sock_data_by_port = {} + + def sock_for_port(self, port): + """ + :param int port: The port whose socket is desired. + :returns: The bound listen socket for the given port. + """ + + return self.sock_data_by_port[port]['sock'] + + def port_for_sock(self, sock): + """ + :param socket sock: A tracked bound listen socket + :returns: The port the socket is bound to. + """ + + for port, sock_data in self.sock_data_by_port.iteritems(): + if sock_data['sock'] == sock: + return port + + def _pid_to_port_and_index(self, pid): + for port, sock_data in self.sock_data_by_port.iteritems(): + for server_idx, a_pid in enumerate(sock_data['pids']): + if pid == a_pid: + return port, server_idx + + def port_index_pairs(self): + """ + :returns: A set of (port, server_idx) tuples for currently-tracked + ports, sockets, and PIDs. + """ + + current_port_index_pairs = set() + for port, pid_state in self.sock_data_by_port.iteritems(): + current_port_index_pairs |= set( + (port, i) + for i, pid in enumerate(pid_state['pids']) + if pid is not None) + return current_port_index_pairs + + def track_port(self, port, sock): + """ + Start tracking servers for the given port and listen socket. + + :param int port: The port to start tracking + :param socket sock: The bound listen socket for the port. + """ + + self.sock_data_by_port[port] = { + 'sock': sock, + 'pids': [None] * self.servers_per_port, + } + + def not_tracking(self, port): + """ + Return True if the specified port is not being tracked. + + :param int port: A port to check. + """ + + return port not in self.sock_data_by_port + + def all_socks(self): + """ + Yield all current listen sockets. + """ + + for orphan_data in self.sock_data_by_port.itervalues(): + yield orphan_data['sock'] + + def forget_port(self, port): + """ + Idempotently forget a port, closing the listen socket at most once. + """ + + orphan_data = self.sock_data_by_port.pop(port, None) + if orphan_data: + greenio.shutdown_safe(orphan_data['sock']) + orphan_data['sock'].close() + self.logger.notice('Closing unnecessary sock for port %d', port) + + def add_pid(self, port, index, pid): + self.sock_data_by_port[port]['pids'][index] = pid + + def forget_pid(self, pid): + """ + Idempotently forget a PID. It's okay if the PID is no longer in our + data structure (it could have been removed by the "orphan port" removal + in :py:meth:`new_worker_socks`). + + :param int pid: The PID which exited. + """ + + port_server_idx = self._pid_to_port_and_index(pid) + if port_server_idx is None: + # This method can lose a race with the "orphan port" removal, when + # a ring reload no longer contains a port. So it's okay if we were + # unable to find a (port, server_idx) pair. + return + dead_port, server_idx = port_server_idx + self.logger.error('Removing dead child %d (PID: %s) for port %s', + server_idx, pid, dead_port) + self.sock_data_by_port[dead_port]['pids'][server_idx] = None + + +class ServersPerPortStrategy(object): + """ + WSGI server management strategy object for an object-server with one listen + port per unique local port in the storage policy rings. The + `servers_per_port` integer config setting determines how many workers are + run per port. + + Used in :py:func:`run_wsgi`. + + :param dict conf: Server configuration dictionary. + :param logger: The server's :py:class:`~swift.common.utils.LogAdaptor` + object. + :param int servers_per_port: The number of workers to run per port. + """ + + def __init__(self, conf, logger, servers_per_port): + self.conf = conf + self.logger = logger + self.servers_per_port = servers_per_port + self.swift_dir = conf.get('swift_dir', '/etc/swift') + self.ring_check_interval = int(conf.get('ring_check_interval', 15)) + self.port_pid_state = PortPidState(servers_per_port, logger) + + bind_ip = conf.get('bind_ip', '0.0.0.0') + self.cache = BindPortsCache(self.swift_dir, bind_ip) + + def _reload_bind_ports(self): + self.bind_ports = self.cache.all_bind_ports_for_node() + + def _bind_port(self, port): + new_conf = self.conf.copy() + new_conf['bind_port'] = port + sock = get_socket(new_conf) + self.port_pid_state.track_port(port, sock) + + def loop_timeout(self): + """ + :returns: The time to wait for a child to exit before checking for + reloaded rings (new ports). + """ + + return self.ring_check_interval + + def bind_ports(self): + """ + Bind one listen socket per unique local storage policy ring port. Then + do all the work of drop_privileges except the actual dropping of + privileges (each forked-off worker will do that post-fork in + :py:meth:`post_fork_hook`). + """ + + self._reload_bind_ports() + for port in self.bind_ports: + self._bind_port(port) + + # The workers strategy drops privileges here, which we obviously cannot + # do if we want to support binding to low ports. But we do want some + # of the actions that drop_privileges did. + try: + os.setsid() + except OSError: + pass + # In case you need to rmdir where you started the daemon: + os.chdir('/') + # Ensure files are created with the correct privileges: + os.umask(0o22) + + def no_fork_sock(self): + """ + This strategy does not support running in the foreground. + """ + + pass + + def new_worker_socks(self): + """ + Yield a sequence of (socket, server_idx) tuples for each server which + should be forked-off and started. + + Any sockets for "orphaned" ports no longer in any ring will be closed + (causing their associated workers to gracefully exit) after all new + sockets have been yielded. + + The server_idx item for each socket will passed into the + :py:meth:`log_sock_exit` and :py:meth:`register_worker_start` methods. + """ + + self._reload_bind_ports() + desired_port_index_pairs = set( + (p, i) for p in self.bind_ports + for i in range(self.servers_per_port)) + + current_port_index_pairs = self.port_pid_state.port_index_pairs() + + if desired_port_index_pairs != current_port_index_pairs: + # Orphan ports are ports which had object-server processes running, + # but which no longer appear in the ring. We'll kill them after we + # start missing workers. + orphan_port_index_pairs = current_port_index_pairs - \ + desired_port_index_pairs + + # Fork off worker(s) for every port who's supposed to have + # worker(s) but doesn't + missing_port_index_pairs = desired_port_index_pairs - \ + current_port_index_pairs + for port, server_idx in sorted(missing_port_index_pairs): + if self.port_pid_state.not_tracking(port): + try: + self._bind_port(port) + except Exception as e: + self.logger.critical('Unable to bind to port %d: %s', + port, e) + continue + yield self.port_pid_state.sock_for_port(port), server_idx + + for orphan_pair in orphan_port_index_pairs: + # For any port in orphan_port_index_pairs, it is guaranteed + # that there should be no listen socket for that port, so we + # can close and forget them. + self.port_pid_state.forget_port(orphan_pair[0]) + + def post_fork_hook(self): + """ + Called in each child process, prior to starting the actual wsgi server, + to drop privileges. + """ + + drop_privileges(self.conf.get('user', 'swift'), call_setsid=False) + + def log_sock_exit(self, sock, server_idx): + """ + Log a server's exit. + """ + + port = self.port_pid_state.port_for_sock(sock) + self.logger.notice('Child %d (PID %d, port %d) exiting normally', + server_idx, os.getpid(), port) + + def register_worker_start(self, sock, server_idx, pid): + """ + Called when a new worker is started. + + :param socket sock: The listen socket for the worker just started. + :param server_idx: The socket's server_idx as yielded by + :py:meth:`new_worker_socks`. + :param int pid: The new worker process' PID + """ + port = self.port_pid_state.port_for_sock(sock) + self.logger.notice('Started child %d (PID %d) for port %d', + server_idx, pid, port) + self.port_pid_state.add_pid(port, server_idx, pid) + + def register_worker_exit(self, pid): + """ + Called when a worker has exited. + + :param int pid: The PID of the worker that exited. + """ + + self.port_pid_state.forget_pid(pid) + + def shutdown_sockets(self): + """ + Shutdown any listen sockets. + """ + + for sock in self.port_pid_state.all_socks(): + greenio.shutdown_safe(sock) + sock.close() + + def run_wsgi(conf_path, app_section, *args, **kwargs): """ - Runs the server using the specified number of workers. + Runs the server according to some strategy. The default strategy runs a + specified number of workers in pre-fork model. The object-server (only) + may use a servers-per-port strategy if its config has a servers_per_port + setting with a value greater than zero. :param conf_path: Path to paste.deploy style configuration file/directory :param app_section: App name from conf file to load config from @@ -454,17 +859,22 @@ def run_wsgi(conf_path, app_section, *args, **kwargs): print(e) return 1 - # bind to address and port - try: - sock = get_socket(conf) - except ConfigFilePortError: - msg = 'bind_port wasn\'t properly set in the config file. ' \ - 'It must be explicitly set to a valid port number.' - logger.error(msg) - print(msg) + servers_per_port = int(conf.get('servers_per_port', '0') or 0) + + # NOTE: for now servers_per_port is object-server-only; future work could + # be done to test and allow it to be used for account and container + # servers, but that has not been done yet. + if servers_per_port and app_section == 'object-server': + strategy = ServersPerPortStrategy( + conf, logger, servers_per_port=servers_per_port) + else: + strategy = WorkersStrategy(conf, logger) + + error_msg = strategy.bind_ports() + if error_msg: + logger.error(error_msg) + print(error_msg) return 1 - # remaining tasks should not require elevated privileges - drop_privileges(conf.get('user', 'swift')) # Ensure the configuration and application can be loaded before proceeding. global_conf = {'log_name': log_name} @@ -479,11 +889,9 @@ def run_wsgi(conf_path, app_section, *args, **kwargs): # redirect errors to logger and close stdio capture_stdio(logger) - worker_count = config_auto_int_value(conf.get('workers'), CPU_COUNT) - - # Useful for profiling [no forks]. - if worker_count == 0: - run_server(conf, logger, sock, global_conf=global_conf) + no_fork_sock = strategy.no_fork_sock() + if no_fork_sock: + run_server(conf, logger, no_fork_sock, global_conf=global_conf) return 0 def kill_children(*args): @@ -502,32 +910,42 @@ def run_wsgi(conf_path, app_section, *args, **kwargs): running = [True] signal.signal(signal.SIGTERM, kill_children) signal.signal(signal.SIGHUP, hup) - children = [] + while running[0]: - while len(children) < worker_count: + for sock, sock_info in strategy.new_worker_socks(): pid = os.fork() if pid == 0: signal.signal(signal.SIGHUP, signal.SIG_DFL) signal.signal(signal.SIGTERM, signal.SIG_DFL) + strategy.post_fork_hook() run_server(conf, logger, sock) - logger.notice('Child %d exiting normally' % os.getpid()) + strategy.log_sock_exit(sock, sock_info) return 0 else: - logger.notice('Started child %s' % pid) - children.append(pid) - try: - pid, status = os.wait() - if os.WIFEXITED(status) or os.WIFSIGNALED(status): - logger.error('Removing dead child %s' % pid) - children.remove(pid) - except OSError as err: - if err.errno not in (errno.EINTR, errno.ECHILD): - raise - except KeyboardInterrupt: - logger.notice('User quit') - break - greenio.shutdown_safe(sock) - sock.close() + strategy.register_worker_start(sock, sock_info, pid) + + # The strategy may need to pay attention to something in addition to + # child process exits (like new ports showing up in a ring). + # + # NOTE: a timeout value of None will just instantiate the Timeout + # object and not actually schedule it, which is equivalent to no + # timeout for the green_os.wait(). + loop_timeout = strategy.loop_timeout() + + with Timeout(loop_timeout, exception=False): + try: + pid, status = green_os.wait() + if os.WIFEXITED(status) or os.WIFSIGNALED(status): + strategy.register_worker_exit(pid) + except OSError as err: + if err.errno not in (errno.EINTR, errno.ECHILD): + raise + except KeyboardInterrupt: + logger.notice('User quit') + running[0] = False + break + + strategy.shutdown_sockets() logger.notice('Exited') return 0 diff --git a/swift/container/sync.py b/swift/container/sync.py index a409de4ac7..c6161883c4 100644 --- a/swift/container/sync.py +++ b/swift/container/sync.py @@ -204,7 +204,8 @@ class ContainerSync(Daemon): #: swift.common.ring.Ring for locating containers. self.container_ring = container_ring or Ring(self.swift_dir, ring_name='container') - self._myips = whataremyips() + bind_ip = conf.get('bind_ip', '0.0.0.0') + self._myips = whataremyips(bind_ip) self._myport = int(conf.get('bind_port', 6001)) swift.common.db.DB_PREALLOCATION = \ config_true_value(conf.get('db_preallocation', 'f')) diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index f9aa5f15d8..8f84b06c7a 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -119,7 +119,10 @@ class ObjectReconstructor(Daemon): self.devices_dir = conf.get('devices', '/srv/node') self.mount_check = config_true_value(conf.get('mount_check', 'true')) self.swift_dir = conf.get('swift_dir', '/etc/swift') - self.port = int(conf.get('bind_port', 6000)) + self.bind_ip = conf.get('bind_ip', '0.0.0.0') + self.servers_per_port = int(conf.get('servers_per_port', '0') or 0) + self.port = None if self.servers_per_port else \ + int(conf.get('bind_port', 6000)) self.concurrency = int(conf.get('concurrency', 1)) self.stats_interval = int(conf.get('stats_interval', '300')) self.ring_check_interval = int(conf.get('ring_check_interval', 15)) @@ -764,7 +767,7 @@ class ObjectReconstructor(Daemon): """ override_devices = override_devices or [] override_partitions = override_partitions or [] - ips = whataremyips() + ips = whataremyips(self.bind_ip) for policy in POLICIES: if policy.policy_type != EC_POLICY: continue @@ -776,6 +779,7 @@ class ObjectReconstructor(Daemon): ips, self.port, dev['replication_ip'], dev['replication_port']), policy.object_ring.devs) + for local_dev in local_devices: if override_devices and (local_dev['device'] not in override_devices): diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index d23624b382..de2ec8d85f 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -65,7 +65,10 @@ class ObjectReplicator(Daemon): self.mount_check = config_true_value(conf.get('mount_check', 'true')) self.vm_test_mode = config_true_value(conf.get('vm_test_mode', 'no')) self.swift_dir = conf.get('swift_dir', '/etc/swift') - self.port = int(conf.get('bind_port', 6000)) + self.bind_ip = conf.get('bind_ip', '0.0.0.0') + self.servers_per_port = int(conf.get('servers_per_port', '0') or 0) + self.port = None if self.servers_per_port else \ + int(conf.get('bind_port', 6000)) self.concurrency = int(conf.get('concurrency', 1)) self.stats_interval = int(conf.get('stats_interval', '300')) self.ring_check_interval = int(conf.get('ring_check_interval', 15)) @@ -539,7 +542,7 @@ class ObjectReplicator(Daemon): policies will be returned """ jobs = [] - ips = whataremyips() + ips = whataremyips(self.bind_ip) for policy in POLICIES: if policy.policy_type == REPL_POLICY: if (override_policies is not None and diff --git a/test/probe/common.py b/test/probe/common.py index ca1225f9fb..467598caec 100644 --- a/test/probe/common.py +++ b/test/probe/common.py @@ -39,8 +39,8 @@ for p in POLICIES: POLICIES_BY_TYPE[p.policy_type].append(p) -def get_server_number(port, port2server): - server_number = port2server[port] +def get_server_number(ipport, ipport2server): + server_number = ipport2server[ipport] server, number = server_number[:-1], server_number[-1:] try: number = int(number) @@ -50,19 +50,19 @@ def get_server_number(port, port2server): return server, number -def start_server(port, port2server, pids, check=True): - server, number = get_server_number(port, port2server) +def start_server(ipport, ipport2server, pids, check=True): + server, number = get_server_number(ipport, ipport2server) err = Manager([server]).start(number=number, wait=False) if err: raise Exception('unable to start %s' % ( server if not number else '%s%s' % (server, number))) if check: - return check_server(port, port2server, pids) + return check_server(ipport, ipport2server, pids) return None -def check_server(port, port2server, pids, timeout=CHECK_SERVER_TIMEOUT): - server = port2server[port] +def check_server(ipport, ipport2server, pids, timeout=CHECK_SERVER_TIMEOUT): + server = ipport2server[ipport] if server[:-1] in ('account', 'container', 'object'): if int(server[-1]) > 4: return None @@ -74,7 +74,7 @@ def check_server(port, port2server, pids, timeout=CHECK_SERVER_TIMEOUT): try_until = time() + timeout while True: try: - conn = HTTPConnection('127.0.0.1', port) + conn = HTTPConnection(*ipport) conn.request('GET', path) resp = conn.getresponse() # 404 because it's a nonsense path (and mount_check is false) @@ -87,14 +87,14 @@ def check_server(port, port2server, pids, timeout=CHECK_SERVER_TIMEOUT): if time() > try_until: print err print 'Giving up on %s:%s after %s seconds.' % ( - server, port, timeout) + server, ipport, timeout) raise err sleep(0.1) else: try_until = time() + timeout while True: try: - url, token = get_auth('http://127.0.0.1:8080/auth/v1.0', + url, token = get_auth('http://%s:%d/auth/v1.0' % ipport, 'test:tester', 'testing') account = url.split('/')[-1] head_account(url, token) @@ -108,8 +108,8 @@ def check_server(port, port2server, pids, timeout=CHECK_SERVER_TIMEOUT): return None -def kill_server(port, port2server, pids): - server, number = get_server_number(port, port2server) +def kill_server(ipport, ipport2server, pids): + server, number = get_server_number(ipport, ipport2server) err = Manager([server]).kill(number=number) if err: raise Exception('unable to kill %s' % (server if not number else @@ -117,47 +117,77 @@ def kill_server(port, port2server, pids): try_until = time() + 30 while True: try: - conn = HTTPConnection('127.0.0.1', port) + conn = HTTPConnection(*ipport) conn.request('GET', '/') conn.getresponse() except Exception as err: break if time() > try_until: raise Exception( - 'Still answering on port %s after 30 seconds' % port) + 'Still answering on %s:%s after 30 seconds' % ipport) sleep(0.1) -def kill_nonprimary_server(primary_nodes, port2server, pids): - primary_ports = [n['port'] for n in primary_nodes] - for port, server in port2server.iteritems(): - if port in primary_ports: +def kill_nonprimary_server(primary_nodes, ipport2server, pids): + primary_ipports = [(n['ip'], n['port']) for n in primary_nodes] + for ipport, server in ipport2server.iteritems(): + if ipport in primary_ipports: server_type = server[:-1] break else: raise Exception('Cannot figure out server type for %r' % primary_nodes) - for port, server in list(port2server.iteritems()): - if server[:-1] == server_type and port not in primary_ports: - kill_server(port, port2server, pids) - return port + for ipport, server in list(ipport2server.iteritems()): + if server[:-1] == server_type and ipport not in primary_ipports: + kill_server(ipport, ipport2server, pids) + return ipport -def build_port_to_conf(server): - # map server to config by port - port_to_config = {} - for server_ in Manager([server]): - for config_path in server_.conf_files(): - conf = readconf(config_path, - section_name='%s-replicator' % server_.type) - port_to_config[int(conf['bind_port'])] = conf - return port_to_config +def add_ring_devs_to_ipport2server(ring, server_type, ipport2server, + servers_per_port=0): + # We'll number the servers by order of unique occurrence of: + # IP, if servers_per_port > 0 OR there > 1 IP in ring + # ipport, otherwise + unique_ip_count = len(set(dev['ip'] for dev in ring.devs if dev)) + things_to_number = {} + number = 0 + for dev in filter(None, ring.devs): + ip = dev['ip'] + ipport = (ip, dev['port']) + unique_by = ip if servers_per_port or unique_ip_count > 1 else ipport + if unique_by not in things_to_number: + number += 1 + things_to_number[unique_by] = number + ipport2server[ipport] = '%s%d' % (server_type, + things_to_number[unique_by]) + + +def store_config_paths(name, configs): + for server_name in (name, '%s-replicator' % name): + for server in Manager([server_name]): + for i, conf in enumerate(server.conf_files(), 1): + configs[server.server][i] = conf def get_ring(ring_name, required_replicas, required_devices, - server=None, force_validate=None): + server=None, force_validate=None, ipport2server=None, + config_paths=None): if not server: server = ring_name ring = Ring('/etc/swift', ring_name=ring_name) + if ipport2server is None: + ipport2server = {} # used internally, even if not passed in + if config_paths is None: + config_paths = defaultdict(dict) + store_config_paths(server, config_paths) + + repl_name = '%s-replicator' % server + repl_configs = {i: readconf(c, section_name=repl_name) + for i, c in config_paths[repl_name].iteritems()} + servers_per_port = any(int(c.get('servers_per_port', '0')) + for c in repl_configs.values()) + + add_ring_devs_to_ipport2server(ring, server, ipport2server, + servers_per_port=servers_per_port) if not VALIDATE_RSYNC and not force_validate: return ring # easy sanity checks @@ -167,10 +197,11 @@ def get_ring(ring_name, required_replicas, required_devices, if len(ring.devs) != required_devices: raise SkipTest('%s has %s devices instead of %s' % ( ring.serialized_path, len(ring.devs), required_devices)) - port_to_config = build_port_to_conf(server) for dev in ring.devs: # verify server is exposing mounted device - conf = port_to_config[dev['port']] + ipport = (dev['ip'], dev['port']) + _, server_number = get_server_number(ipport, ipport2server) + conf = repl_configs[server_number] for device in os.listdir(conf['devices']): if device == dev['device']: dev_path = os.path.join(conf['devices'], device) @@ -185,7 +216,7 @@ def get_ring(ring_name, required_replicas, required_devices, "unable to find ring device %s under %s's devices (%s)" % ( dev['device'], server, conf['devices'])) # verify server is exposing rsync device - if port_to_config[dev['port']].get('vm_test_mode', False): + if conf.get('vm_test_mode', False): rsync_export = '%s%s' % (server, dev['replication_port']) else: rsync_export = server @@ -235,46 +266,45 @@ class ProbeTest(unittest.TestCase): Manager(['all']).stop() self.pids = {} try: + self.ipport2server = {} + self.configs = defaultdict(dict) self.account_ring = get_ring( 'account', self.acct_cont_required_replicas, - self.acct_cont_required_devices) + self.acct_cont_required_devices, + ipport2server=self.ipport2server, + config_paths=self.configs) self.container_ring = get_ring( 'container', self.acct_cont_required_replicas, - self.acct_cont_required_devices) + self.acct_cont_required_devices, + ipport2server=self.ipport2server, + config_paths=self.configs) self.policy = get_policy(**self.policy_requirements) self.object_ring = get_ring( self.policy.ring_name, self.obj_required_replicas, self.obj_required_devices, - server='object') + server='object', + ipport2server=self.ipport2server, + config_paths=self.configs) + + self.servers_per_port = any( + int(readconf(c, section_name='object-replicator').get( + 'servers_per_port', '0')) + for c in self.configs['object-replicator'].values()) + Manager(['main']).start(wait=False) - self.port2server = {} - for server, port in [('account', 6002), ('container', 6001), - ('object', 6000)]: - for number in xrange(1, 9): - self.port2server[port + (number * 10)] = \ - '%s%d' % (server, number) - for port in self.port2server: - check_server(port, self.port2server, self.pids) - self.port2server[8080] = 'proxy' - self.url, self.token, self.account = \ - check_server(8080, self.port2server, self.pids) - self.configs = defaultdict(dict) - for name in ('account', 'container', 'object'): - for server_name in (name, '%s-replicator' % name): - for server in Manager([server_name]): - for i, conf in enumerate(server.conf_files(), 1): - self.configs[server.server][i] = conf + for ipport in self.ipport2server: + check_server(ipport, self.ipport2server, self.pids) + proxy_ipport = ('127.0.0.1', 8080) + self.ipport2server[proxy_ipport] = 'proxy' + self.url, self.token, self.account = check_server( + proxy_ipport, self.ipport2server, self.pids) self.replicators = Manager( ['account-replicator', 'container-replicator', 'object-replicator']) self.updaters = Manager(['container-updater', 'object-updater']) - self.server_port_to_conf = {} - # get some configs backend daemon configs loaded up - for server in ('account', 'container', 'object'): - self.server_port_to_conf[server] = build_port_to_conf(server) except BaseException: try: raise @@ -288,7 +318,11 @@ class ProbeTest(unittest.TestCase): Manager(['all']).kill() def device_dir(self, server, node): - conf = self.server_port_to_conf[server][node['port']] + server_type, config_number = get_server_number( + (node['ip'], node['port']), self.ipport2server) + repl_server = '%s-replicator' % server_type + conf = readconf(self.configs[repl_server][config_number], + section_name=repl_server) return os.path.join(conf['devices'], node['device']) def storage_dir(self, server, node, part=None, policy=None): @@ -301,9 +335,24 @@ class ProbeTest(unittest.TestCase): def config_number(self, node): _server_type, config_number = get_server_number( - node['port'], self.port2server) + (node['ip'], node['port']), self.ipport2server) return config_number + def is_local_to(self, node1, node2): + """ + Return True if both ring devices are "local" to each other (on the same + "server". + """ + if self.servers_per_port: + return node1['ip'] == node2['ip'] + + # Without a disambiguating IP, for SAIOs, we have to assume ports + # uniquely identify "servers". SAIOs should be configured to *either* + # have unique IPs per node (e.g. 127.0.0.1, 127.0.0.2, etc.) OR unique + # ports per server (i.e. sdb1 & sdb5 would have same port numbers in + # the 8-disk EC ring). + return node1['port'] == node2['port'] + def get_to_final_state(self): # these .stop()s are probably not strictly necessary, # but may prevent race conditions diff --git a/test/probe/test_account_failures.py b/test/probe/test_account_failures.py index e1fd2cb93f..783d3da9b8 100755 --- a/test/probe/test_account_failures.py +++ b/test/probe/test_account_failures.py @@ -97,8 +97,9 @@ class TestAccountFailures(ReplProbeTest): self.assert_(found2) apart, anodes = self.account_ring.get_nodes(self.account) - kill_nonprimary_server(anodes, self.port2server, self.pids) - kill_server(anodes[0]['port'], self.port2server, self.pids) + kill_nonprimary_server(anodes, self.ipport2server, self.pids) + kill_server((anodes[0]['ip'], anodes[0]['port']), + self.ipport2server, self.pids) # Kill account servers excepting two of the primaries # Delete container1 @@ -146,7 +147,8 @@ class TestAccountFailures(ReplProbeTest): self.assert_(found2) # Restart other primary account server - start_server(anodes[0]['port'], self.port2server, self.pids) + start_server((anodes[0]['ip'], anodes[0]['port']), + self.ipport2server, self.pids) # Assert that server doesn't know about container1's deletion or the # new container2/object2 yet diff --git a/test/probe/test_container_failures.py b/test/probe/test_container_failures.py index fe6aa49dfa..5eddad1464 100755 --- a/test/probe/test_container_failures.py +++ b/test/probe/test_container_failures.py @@ -49,14 +49,16 @@ class TestContainerFailures(ReplProbeTest): client.put_container(self.url, self.token, container1) # Kill container1 servers excepting two of the primaries - kill_nonprimary_server(cnodes, self.port2server, self.pids) - kill_server(cnodes[0]['port'], self.port2server, self.pids) + kill_nonprimary_server(cnodes, self.ipport2server, self.pids) + kill_server((cnodes[0]['ip'], cnodes[0]['port']), + self.ipport2server, self.pids) # Delete container1 client.delete_container(self.url, self.token, container1) # Restart other container1 primary server - start_server(cnodes[0]['port'], self.port2server, self.pids) + start_server((cnodes[0]['ip'], cnodes[0]['port']), + self.ipport2server, self.pids) # Create container1/object1 (allowed because at least server thinks the # container exists) @@ -87,18 +89,23 @@ class TestContainerFailures(ReplProbeTest): client.put_container(self.url, self.token, container1) # Kill container1 servers excepting one of the primaries - cnp_port = kill_nonprimary_server(cnodes, self.port2server, self.pids) - kill_server(cnodes[0]['port'], self.port2server, self.pids) - kill_server(cnodes[1]['port'], self.port2server, self.pids) + cnp_ipport = kill_nonprimary_server(cnodes, self.ipport2server, + self.pids) + kill_server((cnodes[0]['ip'], cnodes[0]['port']), + self.ipport2server, self.pids) + kill_server((cnodes[1]['ip'], cnodes[1]['port']), + self.ipport2server, self.pids) # Delete container1 directly to the one primary still up direct_client.direct_delete_container(cnodes[2], cpart, self.account, container1) # Restart other container1 servers - start_server(cnodes[0]['port'], self.port2server, self.pids) - start_server(cnodes[1]['port'], self.port2server, self.pids) - start_server(cnp_port, self.port2server, self.pids) + start_server((cnodes[0]['ip'], cnodes[0]['port']), + self.ipport2server, self.pids) + start_server((cnodes[1]['ip'], cnodes[1]['port']), + self.ipport2server, self.pids) + start_server(cnp_ipport, self.ipport2server, self.pids) # Get to a final state self.get_to_final_state() diff --git a/test/probe/test_empty_device_handoff.py b/test/probe/test_empty_device_handoff.py index e4b2033e0f..f68ee6692b 100755 --- a/test/probe/test_empty_device_handoff.py +++ b/test/probe/test_empty_device_handoff.py @@ -26,7 +26,8 @@ from swiftclient import client from swift.common import direct_client from swift.obj.diskfile import get_data_dir from swift.common.exceptions import ClientException -from test.probe.common import kill_server, ReplProbeTest, start_server +from test.probe.common import ( + kill_server, ReplProbeTest, start_server, get_server_number) from swift.common.utils import readconf from swift.common.manager import Manager @@ -35,7 +36,8 @@ class TestEmptyDevice(ReplProbeTest): def _get_objects_dir(self, onode): device = onode['device'] - node_id = (onode['port'] - 6000) / 10 + _, node_id = get_server_number((onode['ip'], onode['port']), + self.ipport2server) obj_server_conf = readconf(self.configs['object-server'][node_id]) devices = obj_server_conf['app:object-server']['devices'] obj_dir = '%s/%s' % (devices, device) @@ -56,7 +58,8 @@ class TestEmptyDevice(ReplProbeTest): onode = onodes[0] # Kill one container/obj primary server - kill_server(onode['port'], self.port2server, self.pids) + kill_server((onode['ip'], onode['port']), + self.ipport2server, self.pids) # Delete the default data directory for objects on the primary server obj_dir = '%s/%s' % (self._get_objects_dir(onode), @@ -74,7 +77,8 @@ class TestEmptyDevice(ReplProbeTest): # Kill other two container/obj primary servers # to ensure GET handoff works for node in onodes[1:]: - kill_server(node['port'], self.port2server, self.pids) + kill_server((node['ip'], node['port']), + self.ipport2server, self.pids) # Indirectly through proxy assert we can get container/obj odata = client.get_object(self.url, self.token, container, obj)[-1] @@ -83,7 +87,8 @@ class TestEmptyDevice(ReplProbeTest): 'returned: %s' % repr(odata)) # Restart those other two container/obj primary servers for node in onodes[1:]: - start_server(node['port'], self.port2server, self.pids) + start_server((node['ip'], node['port']), + self.ipport2server, self.pids) self.assertFalse(os.path.exists(obj_dir)) # We've indirectly verified the handoff node has the object, but # let's directly verify it. @@ -122,7 +127,8 @@ class TestEmptyDevice(ReplProbeTest): missing) # Bring the first container/obj primary server back up - start_server(onode['port'], self.port2server, self.pids) + start_server((onode['ip'], onode['port']), + self.ipport2server, self.pids) # Assert that it doesn't have container/obj yet self.assertFalse(os.path.exists(obj_dir)) @@ -136,21 +142,17 @@ class TestEmptyDevice(ReplProbeTest): else: self.fail("Expected ClientException but didn't get it") - try: - port_num = onode['replication_port'] - except KeyError: - port_num = onode['port'] - try: - another_port_num = another_onode['replication_port'] - except KeyError: - another_port_num = another_onode['port'] - # Run object replication for first container/obj primary server - num = (port_num - 6000) / 10 + _, num = get_server_number( + (onode['ip'], onode.get('replication_port', onode['port'])), + self.ipport2server) Manager(['object-replicator']).once(number=num) # Run object replication for handoff node - another_num = (another_port_num - 6000) / 10 + _, another_num = get_server_number( + (another_onode['ip'], + another_onode.get('replication_port', another_onode['port'])), + self.ipport2server) Manager(['object-replicator']).once(number=another_num) # Assert the first container/obj primary server now has container/obj diff --git a/test/probe/test_object_async_update.py b/test/probe/test_object_async_update.py index 05d05b3adf..8657314fc7 100755 --- a/test/probe/test_object_async_update.py +++ b/test/probe/test_object_async_update.py @@ -41,15 +41,17 @@ class TestObjectAsyncUpdate(ReplProbeTest): # Kill container servers excepting two of the primaries cpart, cnodes = self.container_ring.get_nodes(self.account, container) cnode = cnodes[0] - kill_nonprimary_server(cnodes, self.port2server, self.pids) - kill_server(cnode['port'], self.port2server, self.pids) + kill_nonprimary_server(cnodes, self.ipport2server, self.pids) + kill_server((cnode['ip'], cnode['port']), + self.ipport2server, self.pids) # Create container/obj obj = 'object-%s' % uuid4() client.put_object(self.url, self.token, container, obj, '') # Restart other primary server - start_server(cnode['port'], self.port2server, self.pids) + start_server((cnode['ip'], cnode['port']), + self.ipport2server, self.pids) # Assert it does not know about container/obj self.assert_(not direct_client.direct_get_container( diff --git a/test/probe/test_object_handoff.py b/test/probe/test_object_handoff.py index c3e3990839..37fb7626b5 100755 --- a/test/probe/test_object_handoff.py +++ b/test/probe/test_object_handoff.py @@ -41,7 +41,8 @@ class TestObjectHandoff(ReplProbeTest): opart, onodes = self.object_ring.get_nodes( self.account, container, obj) onode = onodes[0] - kill_server(onode['port'], self.port2server, self.pids) + kill_server((onode['ip'], onode['port']), + self.ipport2server, self.pids) # Create container/obj (goes to two primary servers and one handoff) client.put_object(self.url, self.token, container, obj, 'VERIFY') @@ -53,7 +54,8 @@ class TestObjectHandoff(ReplProbeTest): # Kill other two container/obj primary servers # to ensure GET handoff works for node in onodes[1:]: - kill_server(node['port'], self.port2server, self.pids) + kill_server((node['ip'], node['port']), + self.ipport2server, self.pids) # Indirectly through proxy assert we can get container/obj odata = client.get_object(self.url, self.token, container, obj)[-1] @@ -63,7 +65,8 @@ class TestObjectHandoff(ReplProbeTest): # Restart those other two container/obj primary servers for node in onodes[1:]: - start_server(node['port'], self.port2server, self.pids) + start_server((node['ip'], node['port']), + self.ipport2server, self.pids) # We've indirectly verified the handoff node has the container/object, # but let's directly verify it. @@ -90,7 +93,8 @@ class TestObjectHandoff(ReplProbeTest): (cnode['ip'], cnode['port'])) # Bring the first container/obj primary server back up - start_server(onode['port'], self.port2server, self.pids) + start_server((onode['ip'], onode['port']), + self.ipport2server, self.pids) # Assert that it doesn't have container/obj yet try: @@ -138,7 +142,8 @@ class TestObjectHandoff(ReplProbeTest): # Kill the first container/obj primary server again (we have two # primaries and the handoff up now) - kill_server(onode['port'], self.port2server, self.pids) + kill_server((onode['ip'], onode['port']), + self.ipport2server, self.pids) # Delete container/obj try: @@ -175,7 +180,8 @@ class TestObjectHandoff(ReplProbeTest): (cnode['ip'], cnode['port'])) # Restart the first container/obj primary server again - start_server(onode['port'], self.port2server, self.pids) + start_server((onode['ip'], onode['port']), + self.ipport2server, self.pids) # Assert it still has container/obj direct_client.direct_get_object( diff --git a/test/probe/test_reconstructor_revert.py b/test/probe/test_reconstructor_revert.py index 135d1ce421..1daf7a3725 100755 --- a/test/probe/test_reconstructor_revert.py +++ b/test/probe/test_reconstructor_revert.py @@ -294,7 +294,7 @@ class TestReconstructorRevert(ECProbeTest): # the same server handoff_fragment_etag = None for node in onodes: - if node['port'] == hnode['port']: + if self.is_local_to(node, hnode): # we'll keep track of the etag of this fragment we're removing # in case we need it later (queue forshadowing music)... try: @@ -327,7 +327,7 @@ class TestReconstructorRevert(ECProbeTest): raise # partner already had it's fragment removed if (handoff_fragment_etag is not None and - hnode['port'] == partner['port']): + self.is_local_to(hnode, partner)): # oh, well that makes sense then... rebuilt_fragment_etag = handoff_fragment_etag else: diff --git a/test/unit/__init__.py b/test/unit/__init__.py index a4d1cd35ca..0929293b54 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -30,7 +30,7 @@ import eventlet from eventlet.green import socket from tempfile import mkdtemp from shutil import rmtree -from swift.common.utils import Timestamp +from swift.common.utils import Timestamp, NOTICE from test import get_config from swift.common import swob, utils from swift.common.ring import Ring, RingData @@ -478,8 +478,18 @@ class FakeLogger(logging.Logger, object): logging.INFO: 'info', logging.DEBUG: 'debug', logging.CRITICAL: 'critical', + NOTICE: 'notice', } + def notice(self, msg, *args, **kwargs): + """ + Convenience function for syslog priority LOG_NOTICE. The python + logging lvl is set to 25, just above info. SysLogHandler is + monkey patched to map this log lvl to the LOG_NOTICE syslog + priority. + """ + self.log(NOTICE, msg, *args, **kwargs) + def _log(self, level, msg, *args, **kwargs): store_name = self.store_in[level] cargs = [msg] @@ -495,7 +505,7 @@ class FakeLogger(logging.Logger, object): def _clear(self): self.log_dict = defaultdict(list) self.lines_dict = {'critical': [], 'error': [], 'info': [], - 'warning': [], 'debug': []} + 'warning': [], 'debug': [], 'notice': []} def get_lines_for_level(self, level): if level not in self.lines_dict: diff --git a/test/unit/common/ring/test_ring.py b/test/unit/common/ring/test_ring.py index 77b57a9a85..5ee1af0ca6 100644 --- a/test/unit/common/ring/test_ring.py +++ b/test/unit/common/ring/test_ring.py @@ -77,6 +77,15 @@ class TestRingData(unittest.TestCase): for p in xrange(pickle.HIGHEST_PROTOCOL): with closing(GzipFile(ring_fname, 'wb')) as f: pickle.dump(rd, f, protocol=p) + meta_only = ring.RingData.load(ring_fname, metadata_only=True) + self.assertEqual([ + {'id': 0, 'zone': 0, 'region': 1, 'ip': '10.1.1.0', + 'port': 7000}, + {'id': 1, 'zone': 1, 'region': 1, 'ip': '10.1.1.1', + 'port': 7000}, + ], meta_only.devs) + # Pickled rings can't load only metadata, so you get it all + self.assert_ring_data_equal(rd, meta_only) ring_data = ring.RingData.load(ring_fname) self.assert_ring_data_equal(rd, ring_data) @@ -86,6 +95,12 @@ class TestRingData(unittest.TestCase): [array.array('H', [0, 1, 0, 1]), array.array('H', [0, 1, 0, 1])], [{'id': 0, 'zone': 0}, {'id': 1, 'zone': 1}], 30) rd.save(ring_fname) + meta_only = ring.RingData.load(ring_fname, metadata_only=True) + self.assertEqual([ + {'id': 0, 'zone': 0, 'region': 1}, + {'id': 1, 'zone': 1, 'region': 1}, + ], meta_only.devs) + self.assertEqual([], meta_only._replica2part2dev_id) rd2 = ring.RingData.load(ring_fname) self.assert_ring_data_equal(rd, rd2) diff --git a/test/unit/common/ring/test_utils.py b/test/unit/common/ring/test_utils.py index 8eaca09756..4d078e6c00 100644 --- a/test/unit/common/ring/test_utils.py +++ b/test/unit/common/ring/test_utils.py @@ -185,22 +185,41 @@ class TestUtils(unittest.TestCase): self.assertFalse(is_valid_hostname("$blah#")) def test_is_local_device(self): - my_ips = ["127.0.0.1", - "0000:0000:0000:0000:0000:0000:0000:0001"] + # localhost shows up in whataremyips() output as "::1" for IPv6 + my_ips = ["127.0.0.1", "::1"] my_port = 6000 self.assertTrue(is_local_device(my_ips, my_port, - "localhost", - my_port)) + "127.0.0.1", my_port)) + self.assertTrue(is_local_device(my_ips, my_port, + "::1", my_port)) + self.assertTrue(is_local_device( + my_ips, my_port, + "0000:0000:0000:0000:0000:0000:0000:0001", my_port)) + self.assertTrue(is_local_device(my_ips, my_port, + "localhost", my_port)) self.assertFalse(is_local_device(my_ips, my_port, - "localhost", - my_port + 1)) + "localhost", my_port + 1)) self.assertFalse(is_local_device(my_ips, my_port, - "127.0.0.2", - my_port)) + "127.0.0.2", my_port)) # for those that don't have a local port self.assertTrue(is_local_device(my_ips, None, my_ips[0], None)) + # When servers_per_port is active, the "my_port" passed in is None + # which means "don't include port in the determination of locality + # because it's not reliable in this deployment scenario" + self.assertTrue(is_local_device(my_ips, None, + "127.0.0.1", 6666)) + self.assertTrue(is_local_device(my_ips, None, + "::1", 6666)) + self.assertTrue(is_local_device( + my_ips, None, + "0000:0000:0000:0000:0000:0000:0000:0001", 6666)) + self.assertTrue(is_local_device(my_ips, None, + "localhost", 6666)) + self.assertFalse(is_local_device(my_ips, None, + "127.0.0.2", my_port)) + def test_validate_and_normalize_ip(self): ipv4 = "10.0.0.1" self.assertEqual(ipv4, validate_and_normalize_ip(ipv4)) diff --git a/test/unit/common/test_db_replicator.py b/test/unit/common/test_db_replicator.py index 5f5c6893fa..8cc556127e 100644 --- a/test/unit/common/test_db_replicator.py +++ b/test/unit/common/test_db_replicator.py @@ -477,7 +477,7 @@ class TestDBReplicator(unittest.TestCase): def test_run_once_no_ips(self): replicator = TestReplicator({}, logger=unit.FakeLogger()) self._patch(patch.object, db_replicator, 'whataremyips', - lambda *args: []) + lambda *a, **kw: []) replicator.run_once() @@ -487,7 +487,9 @@ class TestDBReplicator(unittest.TestCase): def test_run_once_node_is_not_mounted(self): db_replicator.ring = FakeRingWithSingleNode() - conf = {'mount_check': 'true', 'bind_port': 6000} + # If a bind_ip is specified, it's plumbed into whataremyips() and + # returned by itself. + conf = {'mount_check': 'true', 'bind_ip': '1.1.1.1', 'bind_port': 6000} replicator = TestReplicator(conf, logger=unit.FakeLogger()) self.assertEqual(replicator.mount_check, True) self.assertEqual(replicator.port, 6000) @@ -498,8 +500,6 @@ class TestDBReplicator(unittest.TestCase): replicator.ring.devs[0]['device'])) return False - self._patch(patch.object, db_replicator, 'whataremyips', - lambda *args: ['1.1.1.1']) self._patch(patch.object, db_replicator, 'ismount', mock_ismount) replicator.run_once() @@ -528,7 +528,7 @@ class TestDBReplicator(unittest.TestCase): self.assertEquals(1, node_id) self._patch(patch.object, db_replicator, 'whataremyips', - lambda *args: ['1.1.1.1']) + lambda *a, **kw: ['1.1.1.1']) self._patch(patch.object, db_replicator, 'ismount', lambda *args: True) self._patch(patch.object, db_replicator, 'unlink_older_than', mock_unlink_older_than) @@ -1390,7 +1390,7 @@ class TestReplicatorSync(unittest.TestCase): return True daemon._rsync_file = _rsync_file with mock.patch('swift.common.db_replicator.whataremyips', - new=lambda: [node['replication_ip']]): + new=lambda *a, **kw: [node['replication_ip']]): daemon.run_once() return daemon diff --git a/test/unit/common/test_storage_policy.py b/test/unit/common/test_storage_policy.py index 6406dc1923..6e3f217db0 100644 --- a/test/unit/common/test_storage_policy.py +++ b/test/unit/common/test_storage_policy.py @@ -15,14 +15,17 @@ import unittest import StringIO from ConfigParser import ConfigParser +import os import mock +from functools import partial from tempfile import NamedTemporaryFile -from test.unit import patch_policies, FakeRing +from test.unit import patch_policies, FakeRing, temptree from swift.common.storage_policy import ( StoragePolicyCollection, POLICIES, PolicyError, parse_storage_policies, reload_storage_policies, get_policy_string, split_policy_string, BaseStoragePolicy, StoragePolicy, ECStoragePolicy, REPL_POLICY, EC_POLICY, - VALID_EC_TYPES, DEFAULT_EC_OBJECT_SEGMENT_SIZE) + VALID_EC_TYPES, DEFAULT_EC_OBJECT_SEGMENT_SIZE, BindPortsCache) +from swift.common.ring import RingData from swift.common.exceptions import RingValidationError @@ -740,6 +743,139 @@ class TestStoragePolicies(unittest.TestCase): self.assertRaises(PolicyError, policies.get_object_ring, 99, '/path/not/used') + def test_bind_ports_cache(self): + test_policies = [StoragePolicy(0, 'aay', True), + StoragePolicy(1, 'bee', False), + StoragePolicy(2, 'cee', False)] + + my_ips = ['1.2.3.4', '2.3.4.5'] + other_ips = ['3.4.5.6', '4.5.6.7'] + bind_ip = my_ips[1] + devs_by_ring_name1 = { + 'object': [ # 'aay' + {'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[0], + 'port': 6006}, + {'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[0], + 'port': 6007}, + {'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[1], + 'port': 6008}, + None, + {'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[1], + 'port': 6009}], + 'object-1': [ # 'bee' + {'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[1], + 'port': 6006}, # dupe + {'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[0], + 'port': 6010}, + {'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[1], + 'port': 6011}, + {'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[1], + 'port': 6012}], + 'object-2': [ # 'cee' + {'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[0], + 'port': 6010}, # on our IP and a not-us IP + {'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[0], + 'port': 6013}, + None, + {'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[1], + 'port': 6014}, + {'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[1], + 'port': 6015}], + } + devs_by_ring_name2 = { + 'object': [ # 'aay' + {'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[0], + 'port': 6016}, + {'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[1], + 'port': 6019}], + 'object-1': [ # 'bee' + {'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[1], + 'port': 6016}, # dupe + {'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[1], + 'port': 6022}], + 'object-2': [ # 'cee' + {'id': 0, 'zone': 0, 'region': 1, 'ip': my_ips[0], + 'port': 6020}, + {'id': 0, 'zone': 0, 'region': 1, 'ip': other_ips[1], + 'port': 6025}], + } + ring_files = [ring_name + '.ring.gz' + for ring_name in sorted(devs_by_ring_name1)] + + def _fake_load(gz_path, stub_objs, metadata_only=False): + return RingData( + devs=stub_objs[os.path.basename(gz_path)[:-8]], + replica2part2dev_id=[], + part_shift=24) + + with mock.patch( + 'swift.common.storage_policy.RingData.load' + ) as mock_ld, \ + patch_policies(test_policies), \ + mock.patch('swift.common.storage_policy.whataremyips') \ + as mock_whataremyips, \ + temptree(ring_files) as tempdir: + mock_whataremyips.return_value = my_ips + + cache = BindPortsCache(tempdir, bind_ip) + + self.assertEqual([ + mock.call(bind_ip), + ], mock_whataremyips.mock_calls) + mock_whataremyips.reset_mock() + + mock_ld.side_effect = partial(_fake_load, + stub_objs=devs_by_ring_name1) + self.assertEqual(set([ + 6006, 6008, 6011, 6010, 6014, + ]), cache.all_bind_ports_for_node()) + self.assertEqual([ + mock.call(os.path.join(tempdir, ring_files[0]), + metadata_only=True), + mock.call(os.path.join(tempdir, ring_files[1]), + metadata_only=True), + mock.call(os.path.join(tempdir, ring_files[2]), + metadata_only=True), + ], mock_ld.mock_calls) + mock_ld.reset_mock() + + mock_ld.side_effect = partial(_fake_load, + stub_objs=devs_by_ring_name2) + self.assertEqual(set([ + 6006, 6008, 6011, 6010, 6014, + ]), cache.all_bind_ports_for_node()) + self.assertEqual([], mock_ld.mock_calls) + + # but when all the file mtimes are made different, it'll + # reload + for gz_file in [os.path.join(tempdir, n) + for n in ring_files]: + os.utime(gz_file, (88, 88)) + + self.assertEqual(set([ + 6016, 6020, + ]), cache.all_bind_ports_for_node()) + self.assertEqual([ + mock.call(os.path.join(tempdir, ring_files[0]), + metadata_only=True), + mock.call(os.path.join(tempdir, ring_files[1]), + metadata_only=True), + mock.call(os.path.join(tempdir, ring_files[2]), + metadata_only=True), + ], mock_ld.mock_calls) + mock_ld.reset_mock() + + # Don't do something stupid like crash if a ring file is missing. + os.unlink(os.path.join(tempdir, 'object-2.ring.gz')) + + self.assertEqual(set([ + 6016, 6020, + ]), cache.all_bind_ports_for_node()) + self.assertEqual([], mock_ld.mock_calls) + + # whataremyips() is only called in the constructor + self.assertEqual([], mock_whataremyips.mock_calls) + def test_singleton_passthrough(self): test_policies = [StoragePolicy(0, 'aay', True), StoragePolicy(1, 'bee', False), diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 3072883b83..a668e0ff52 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -1488,6 +1488,18 @@ class TestUtils(unittest.TestCase): self.assert_(len(myips) > 1) self.assert_('127.0.0.1' in myips) + def test_whataremyips_bind_to_all(self): + for any_addr in ('0.0.0.0', '0000:0000:0000:0000:0000:0000:0000:0000', + '::0', '::0000', '::', + # Wacky parse-error input produces all IPs + 'I am a bear'): + myips = utils.whataremyips(any_addr) + self.assert_(len(myips) > 1) + self.assert_('127.0.0.1' in myips) + + def test_whataremyips_bind_ip_specific(self): + self.assertEqual(['1.2.3.4'], utils.whataremyips('1.2.3.4')) + def test_whataremyips_error(self): def my_interfaces(): return ['eth0'] @@ -1725,6 +1737,21 @@ log_name = %(yarr)s''' for func in required_func_calls: self.assert_(utils.os.called_funcs[func]) + def test_drop_privileges_no_call_setsid(self): + user = getuser() + # over-ride os with mock + required_func_calls = ('setgroups', 'setgid', 'setuid', 'chdir', + 'umask') + bad_func_calls = ('setsid',) + utils.os = MockOs(called_funcs=required_func_calls, + raise_funcs=bad_func_calls) + # exercise the code + utils.drop_privileges(user, call_setsid=False) + for func in required_func_calls: + self.assert_(utils.os.called_funcs[func]) + for func in bad_func_calls: + self.assert_(func not in utils.os.called_funcs) + @reset_logger_state def test_capture_stdio(self): # stubs diff --git a/test/unit/common/test_wsgi.py b/test/unit/common/test_wsgi.py index 1fbd012dbe..a4da9effef 100644 --- a/test/unit/common/test_wsgi.py +++ b/test/unit/common/test_wsgi.py @@ -42,7 +42,8 @@ from swift.common.swob import Request from swift.common import wsgi, utils from swift.common.storage_policy import POLICIES -from test.unit import temptree, with_tempdir, write_fake_ring, patch_policies +from test.unit import ( + temptree, with_tempdir, write_fake_ring, patch_policies, FakeLogger) from paste.deploy import loadwsgi @@ -688,6 +689,65 @@ class TestWSGI(unittest.TestCase): self.assertEqual(calls['_loadapp'], 1) self.assertEqual(rc, 0) + @mock.patch('swift.common.wsgi.run_server') + @mock.patch('swift.common.wsgi.WorkersStrategy') + @mock.patch('swift.common.wsgi.ServersPerPortStrategy') + def test_run_server_strategy_plumbing(self, mock_per_port, mock_workers, + mock_run_server): + # Make sure the right strategy gets used in a number of different + # config cases. + mock_per_port().bind_ports.return_value = 'stop early' + mock_workers().bind_ports.return_value = 'stop early' + logger = FakeLogger() + stub__initrp = [ + {'__file__': 'test', 'workers': 2}, # conf + logger, + 'log_name', + ] + with mock.patch.object(wsgi, '_initrp', return_value=stub__initrp): + for server_type in ('account-server', 'container-server', + 'object-server'): + mock_per_port.reset_mock() + mock_workers.reset_mock() + logger._clear() + self.assertEqual(1, wsgi.run_wsgi('conf_file', server_type)) + self.assertEqual([ + 'stop early', + ], logger.get_lines_for_level('error')) + self.assertEqual([], mock_per_port.mock_calls) + self.assertEqual([ + mock.call(stub__initrp[0], logger), + mock.call().bind_ports(), + ], mock_workers.mock_calls) + + stub__initrp[0]['servers_per_port'] = 3 + for server_type in ('account-server', 'container-server'): + mock_per_port.reset_mock() + mock_workers.reset_mock() + logger._clear() + self.assertEqual(1, wsgi.run_wsgi('conf_file', server_type)) + self.assertEqual([ + 'stop early', + ], logger.get_lines_for_level('error')) + self.assertEqual([], mock_per_port.mock_calls) + self.assertEqual([ + mock.call(stub__initrp[0], logger), + mock.call().bind_ports(), + ], mock_workers.mock_calls) + + mock_per_port.reset_mock() + mock_workers.reset_mock() + logger._clear() + self.assertEqual(1, wsgi.run_wsgi('conf_file', 'object-server')) + self.assertEqual([ + 'stop early', + ], logger.get_lines_for_level('error')) + self.assertEqual([ + mock.call(stub__initrp[0], logger, servers_per_port=3), + mock.call().bind_ports(), + ], mock_per_port.mock_calls) + self.assertEqual([], mock_workers.mock_calls) + def test_run_server_failure1(self): calls = defaultdict(lambda: 0) @@ -751,6 +811,380 @@ class TestWSGI(unittest.TestCase): self.assertEquals(r.environ['PATH_INFO'], '/override') +class TestServersPerPortStrategy(unittest.TestCase): + def setUp(self): + self.logger = FakeLogger() + self.conf = { + 'workers': 100, # ignored + 'user': 'bob', + 'swift_dir': '/jim/cricket', + 'ring_check_interval': '76', + 'bind_ip': '2.3.4.5', + } + self.servers_per_port = 3 + self.s1, self.s2 = mock.MagicMock(), mock.MagicMock() + patcher = mock.patch('swift.common.wsgi.get_socket', + side_effect=[self.s1, self.s2]) + self.mock_get_socket = patcher.start() + self.addCleanup(patcher.stop) + patcher = mock.patch('swift.common.wsgi.drop_privileges') + self.mock_drop_privileges = patcher.start() + self.addCleanup(patcher.stop) + patcher = mock.patch('swift.common.wsgi.BindPortsCache') + self.mock_cache_class = patcher.start() + self.addCleanup(patcher.stop) + patcher = mock.patch('swift.common.wsgi.os.setsid') + self.mock_setsid = patcher.start() + self.addCleanup(patcher.stop) + patcher = mock.patch('swift.common.wsgi.os.chdir') + self.mock_chdir = patcher.start() + self.addCleanup(patcher.stop) + patcher = mock.patch('swift.common.wsgi.os.umask') + self.mock_umask = patcher.start() + self.addCleanup(patcher.stop) + + self.all_bind_ports_for_node = \ + self.mock_cache_class().all_bind_ports_for_node + self.ports = (6006, 6007) + self.all_bind_ports_for_node.return_value = set(self.ports) + + self.strategy = wsgi.ServersPerPortStrategy(self.conf, self.logger, + self.servers_per_port) + + def test_loop_timeout(self): + # This strategy should loop every ring_check_interval seconds, even if + # no workers exit. + self.assertEqual(76, self.strategy.loop_timeout()) + + # Check the default + del self.conf['ring_check_interval'] + self.strategy = wsgi.ServersPerPortStrategy(self.conf, self.logger, + self.servers_per_port) + + self.assertEqual(15, self.strategy.loop_timeout()) + + def test_bind_ports(self): + self.strategy.bind_ports() + + self.assertEqual(set((6006, 6007)), self.strategy.bind_ports) + self.assertEqual([ + mock.call({'workers': 100, # ignored + 'user': 'bob', + 'swift_dir': '/jim/cricket', + 'ring_check_interval': '76', + 'bind_ip': '2.3.4.5', + 'bind_port': 6006}), + mock.call({'workers': 100, # ignored + 'user': 'bob', + 'swift_dir': '/jim/cricket', + 'ring_check_interval': '76', + 'bind_ip': '2.3.4.5', + 'bind_port': 6007}), + ], self.mock_get_socket.mock_calls) + self.assertEqual( + 6006, self.strategy.port_pid_state.port_for_sock(self.s1)) + self.assertEqual( + 6007, self.strategy.port_pid_state.port_for_sock(self.s2)) + self.assertEqual([mock.call()], self.mock_setsid.mock_calls) + self.assertEqual([mock.call('/')], self.mock_chdir.mock_calls) + self.assertEqual([mock.call(0o22)], self.mock_umask.mock_calls) + + def test_bind_ports_ignores_setsid_errors(self): + self.mock_setsid.side_effect = OSError() + self.strategy.bind_ports() + + self.assertEqual(set((6006, 6007)), self.strategy.bind_ports) + self.assertEqual([ + mock.call({'workers': 100, # ignored + 'user': 'bob', + 'swift_dir': '/jim/cricket', + 'ring_check_interval': '76', + 'bind_ip': '2.3.4.5', + 'bind_port': 6006}), + mock.call({'workers': 100, # ignored + 'user': 'bob', + 'swift_dir': '/jim/cricket', + 'ring_check_interval': '76', + 'bind_ip': '2.3.4.5', + 'bind_port': 6007}), + ], self.mock_get_socket.mock_calls) + self.assertEqual( + 6006, self.strategy.port_pid_state.port_for_sock(self.s1)) + self.assertEqual( + 6007, self.strategy.port_pid_state.port_for_sock(self.s2)) + self.assertEqual([mock.call()], self.mock_setsid.mock_calls) + self.assertEqual([mock.call('/')], self.mock_chdir.mock_calls) + self.assertEqual([mock.call(0o22)], self.mock_umask.mock_calls) + + def test_no_fork_sock(self): + self.assertEqual(None, self.strategy.no_fork_sock()) + + def test_new_worker_socks(self): + self.strategy.bind_ports() + self.all_bind_ports_for_node.reset_mock() + + pid = 88 + got_si = [] + for s, i in self.strategy.new_worker_socks(): + got_si.append((s, i)) + self.strategy.register_worker_start(s, i, pid) + pid += 1 + + self.assertEqual([ + (self.s1, 0), (self.s1, 1), (self.s1, 2), + (self.s2, 0), (self.s2, 1), (self.s2, 2), + ], got_si) + self.assertEqual([ + 'Started child %d (PID %d) for port %d' % (0, 88, 6006), + 'Started child %d (PID %d) for port %d' % (1, 89, 6006), + 'Started child %d (PID %d) for port %d' % (2, 90, 6006), + 'Started child %d (PID %d) for port %d' % (0, 91, 6007), + 'Started child %d (PID %d) for port %d' % (1, 92, 6007), + 'Started child %d (PID %d) for port %d' % (2, 93, 6007), + ], self.logger.get_lines_for_level('notice')) + self.logger._clear() + + # Steady-state... + self.assertEqual([], list(self.strategy.new_worker_socks())) + self.all_bind_ports_for_node.reset_mock() + + # Get rid of servers for ports which disappear from the ring + self.ports = (6007,) + self.all_bind_ports_for_node.return_value = set(self.ports) + self.s1.reset_mock() + self.s2.reset_mock() + + with mock.patch('swift.common.wsgi.greenio') as mock_greenio: + self.assertEqual([], list(self.strategy.new_worker_socks())) + + self.assertEqual([ + mock.call(), # ring_check_interval has passed... + ], self.all_bind_ports_for_node.mock_calls) + self.assertEqual([ + mock.call.shutdown_safe(self.s1), + ], mock_greenio.mock_calls) + self.assertEqual([ + mock.call.close(), + ], self.s1.mock_calls) + self.assertEqual([], self.s2.mock_calls) # not closed + self.assertEqual([ + 'Closing unnecessary sock for port %d' % 6006, + ], self.logger.get_lines_for_level('notice')) + self.logger._clear() + + # Create new socket & workers for new ports that appear in ring + self.ports = (6007, 6009) + self.all_bind_ports_for_node.return_value = set(self.ports) + self.s1.reset_mock() + self.s2.reset_mock() + s3 = mock.MagicMock() + self.mock_get_socket.side_effect = Exception('ack') + + # But first make sure we handle failure to bind to the requested port! + got_si = [] + for s, i in self.strategy.new_worker_socks(): + got_si.append((s, i)) + self.strategy.register_worker_start(s, i, pid) + pid += 1 + + self.assertEqual([], got_si) + self.assertEqual([ + 'Unable to bind to port %d: %s' % (6009, Exception('ack')), + 'Unable to bind to port %d: %s' % (6009, Exception('ack')), + 'Unable to bind to port %d: %s' % (6009, Exception('ack')), + ], self.logger.get_lines_for_level('critical')) + self.logger._clear() + + # Will keep trying, so let it succeed again + self.mock_get_socket.side_effect = [s3] + + got_si = [] + for s, i in self.strategy.new_worker_socks(): + got_si.append((s, i)) + self.strategy.register_worker_start(s, i, pid) + pid += 1 + + self.assertEqual([ + (s3, 0), (s3, 1), (s3, 2), + ], got_si) + self.assertEqual([ + 'Started child %d (PID %d) for port %d' % (0, 94, 6009), + 'Started child %d (PID %d) for port %d' % (1, 95, 6009), + 'Started child %d (PID %d) for port %d' % (2, 96, 6009), + ], self.logger.get_lines_for_level('notice')) + self.logger._clear() + + # Steady-state... + self.assertEqual([], list(self.strategy.new_worker_socks())) + self.all_bind_ports_for_node.reset_mock() + + # Restart a guy who died on us + self.strategy.register_worker_exit(95) # server_idx == 1 + + got_si = [] + for s, i in self.strategy.new_worker_socks(): + got_si.append((s, i)) + self.strategy.register_worker_start(s, i, pid) + pid += 1 + + self.assertEqual([ + (s3, 1), + ], got_si) + self.assertEqual([ + 'Started child %d (PID %d) for port %d' % (1, 97, 6009), + ], self.logger.get_lines_for_level('notice')) + self.logger._clear() + + # Check log_sock_exit + self.strategy.log_sock_exit(self.s2, 2) + self.assertEqual([ + 'Child %d (PID %d, port %d) exiting normally' % ( + 2, os.getpid(), 6007), + ], self.logger.get_lines_for_level('notice')) + + # It's ok to register_worker_exit for a PID that's already had its + # socket closed due to orphaning. + # This is one of the workers for port 6006 that already got reaped. + self.assertEqual(None, self.strategy.register_worker_exit(89)) + + def test_post_fork_hook(self): + self.strategy.post_fork_hook() + + self.assertEqual([ + mock.call('bob', call_setsid=False), + ], self.mock_drop_privileges.mock_calls) + + def test_shutdown_sockets(self): + self.strategy.bind_ports() + + with mock.patch('swift.common.wsgi.greenio') as mock_greenio: + self.strategy.shutdown_sockets() + + self.assertEqual([ + mock.call.shutdown_safe(self.s1), + mock.call.shutdown_safe(self.s2), + ], mock_greenio.mock_calls) + self.assertEqual([ + mock.call.close(), + ], self.s1.mock_calls) + self.assertEqual([ + mock.call.close(), + ], self.s2.mock_calls) + + +class TestWorkersStrategy(unittest.TestCase): + def setUp(self): + self.logger = FakeLogger() + self.conf = { + 'workers': 2, + 'user': 'bob', + } + self.strategy = wsgi.WorkersStrategy(self.conf, self.logger) + patcher = mock.patch('swift.common.wsgi.get_socket', + return_value='abc') + self.mock_get_socket = patcher.start() + self.addCleanup(patcher.stop) + patcher = mock.patch('swift.common.wsgi.drop_privileges') + self.mock_drop_privileges = patcher.start() + self.addCleanup(patcher.stop) + + def test_loop_timeout(self): + # This strategy should block in the green.os.wait() until a worker + # process exits. + self.assertEqual(None, self.strategy.loop_timeout()) + + def test_binding(self): + self.assertEqual(None, self.strategy.bind_ports()) + + self.assertEqual('abc', self.strategy.sock) + self.assertEqual([ + mock.call(self.conf), + ], self.mock_get_socket.mock_calls) + self.assertEqual([ + mock.call('bob'), + ], self.mock_drop_privileges.mock_calls) + + self.mock_get_socket.side_effect = wsgi.ConfigFilePortError() + + self.assertEqual( + 'bind_port wasn\'t properly set in the config file. ' + 'It must be explicitly set to a valid port number.', + self.strategy.bind_ports()) + + def test_no_fork_sock(self): + self.strategy.bind_ports() + self.assertEqual(None, self.strategy.no_fork_sock()) + + self.conf['workers'] = 0 + self.strategy = wsgi.WorkersStrategy(self.conf, self.logger) + self.strategy.bind_ports() + + self.assertEqual('abc', self.strategy.no_fork_sock()) + + def test_new_worker_socks(self): + self.strategy.bind_ports() + pid = 88 + sock_count = 0 + for s, i in self.strategy.new_worker_socks(): + self.assertEqual('abc', s) + self.assertEqual(None, i) # unused for this strategy + self.strategy.register_worker_start(s, 'unused', pid) + pid += 1 + sock_count += 1 + + self.assertEqual([ + 'Started child %s' % 88, + 'Started child %s' % 89, + ], self.logger.get_lines_for_level('notice')) + + self.assertEqual(2, sock_count) + self.assertEqual([], list(self.strategy.new_worker_socks())) + + sock_count = 0 + self.strategy.register_worker_exit(88) + + self.assertEqual([ + 'Removing dead child %s' % 88, + ], self.logger.get_lines_for_level('error')) + + for s, i in self.strategy.new_worker_socks(): + self.assertEqual('abc', s) + self.assertEqual(None, i) # unused for this strategy + self.strategy.register_worker_start(s, 'unused', pid) + pid += 1 + sock_count += 1 + + self.assertEqual(1, sock_count) + self.assertEqual([ + 'Started child %s' % 88, + 'Started child %s' % 89, + 'Started child %s' % 90, + ], self.logger.get_lines_for_level('notice')) + + def test_post_fork_hook(self): + # Just don't crash or do something stupid + self.assertEqual(None, self.strategy.post_fork_hook()) + + def test_shutdown_sockets(self): + self.mock_get_socket.return_value = mock.MagicMock() + self.strategy.bind_ports() + with mock.patch('swift.common.wsgi.greenio') as mock_greenio: + self.strategy.shutdown_sockets() + self.assertEqual([ + mock.call.shutdown_safe(self.mock_get_socket.return_value), + ], mock_greenio.mock_calls) + self.assertEqual([ + mock.call.close(), + ], self.mock_get_socket.return_value.mock_calls) + + def test_log_sock_exit(self): + self.strategy.log_sock_exit('blahblah', 'blahblah') + my_pid = os.getpid() + self.assertEqual([ + 'Child %d exiting normally' % my_pid, + ], self.logger.get_lines_for_level('notice')) + + class TestWSGIContext(unittest.TestCase): def test_app_call(self): diff --git a/test/unit/container/test_sync.py b/test/unit/container/test_sync.py index 8c6d895323..bdf59f9f3e 100644 --- a/test/unit/container/test_sync.py +++ b/test/unit/container/test_sync.py @@ -289,7 +289,11 @@ class TestContainerSync(unittest.TestCase): # those. cring = FakeRing() with mock.patch('swift.container.sync.InternalClient'): - cs = sync.ContainerSync({}, container_ring=cring) + cs = sync.ContainerSync({ + 'bind_ip': '10.0.0.0', + }, container_ring=cring) + # Plumbing test for bind_ip and whataremyips() + self.assertEqual(['10.0.0.0'], cs._myips) orig_ContainerBroker = sync.ContainerBroker try: sync.ContainerBroker = lambda p: FakeContainerBroker( diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index 321ea3751d..a52e64bd1a 100755 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -73,16 +73,10 @@ def make_ec_archive_bodies(policy, test_body): fragment_payloads.append(fragments) # join up the fragment payloads per node - ec_archive_bodies = [''.join(fragments) - for fragments in zip(*fragment_payloads)] + ec_archive_bodies = [''.join(frags) for frags in zip(*fragment_payloads)] return ec_archive_bodies -def _ips(): - return ['127.0.0.1'] -object_reconstructor.whataremyips = _ips - - def _create_test_rings(path): testgz = os.path.join(path, 'object.ring.gz') intended_replica2part2dev_id = [ @@ -582,7 +576,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): except AssertionError as e: extra_info = \ '\n\n... for %r in part num %s job %r' % ( - k, part_num, job_key) + k, part_num, job_key) raise AssertionError(str(e) + extra_info) else: self.fail( @@ -1001,6 +995,7 @@ class TestObjectReconstructor(unittest.TestCase): def setUp(self): self.policy = POLICIES.default + self.policy.object_ring._rtime = time.time() + 3600 self.testdir = tempfile.mkdtemp() self.devices = os.path.join(self.testdir, 'devices') self.local_dev = self.policy.object_ring.devs[0] @@ -1009,6 +1004,7 @@ class TestObjectReconstructor(unittest.TestCase): self.conf = { 'devices': self.devices, 'mount_check': False, + 'bind_ip': self.ip, 'bind_port': self.port, } self.logger = debug_logger('object-reconstructor') @@ -1042,9 +1038,7 @@ class TestObjectReconstructor(unittest.TestCase): utils.mkdirs(os.path.join( self.devices, self.local_dev['device'], datadir, str(part))) - with mock.patch('swift.obj.reconstructor.whataremyips', - return_value=[self.ip]): - part_infos = list(self.reconstructor.collect_parts()) + part_infos = list(self.reconstructor.collect_parts()) found_parts = sorted(int(p['partition']) for p in part_infos) self.assertEqual(found_parts, sorted(stub_parts)) for part_info in part_infos: @@ -1056,10 +1050,112 @@ class TestObjectReconstructor(unittest.TestCase): diskfile.get_data_dir(self.policy), str(part_info['partition']))) + def test_collect_parts_skips_non_local_devs_servers_per_port(self): + self._configure_reconstructor(devices=self.devices, mount_check=False, + bind_ip=self.ip, bind_port=self.port, + servers_per_port=2) + + device_parts = { + 'sda': (374,), + 'sdb': (179, 807), # w/one-serv-per-port, same IP alone is local + 'sdc': (363, 468, 843), + 'sdd': (912,), # "not local" via different IP + } + for policy in POLICIES: + datadir = diskfile.get_data_dir(policy) + for dev, parts in device_parts.items(): + for part in parts: + utils.mkdirs(os.path.join( + self.devices, dev, + datadir, str(part))) + + # we're only going to add sda and sdc into the ring + local_devs = ('sda', 'sdb', 'sdc') + stub_ring_devs = [{ + 'device': dev, + 'replication_ip': self.ip, + 'replication_port': self.port + 1 if dev == 'sdb' else self.port, + } for dev in local_devs] + stub_ring_devs.append({ + 'device': 'sdd', + 'replication_ip': '127.0.0.88', # not local via IP + 'replication_port': self.port, + }) + self.reconstructor.bind_ip = '0.0.0.0' # use whataremyips + with nested(mock.patch('swift.obj.reconstructor.whataremyips', + return_value=[self.ip]), + mock.patch.object(self.policy.object_ring, '_devs', + new=stub_ring_devs)): + part_infos = list(self.reconstructor.collect_parts()) + found_parts = sorted(int(p['partition']) for p in part_infos) + expected_parts = sorted(itertools.chain( + *(device_parts[d] for d in local_devs))) + self.assertEqual(found_parts, expected_parts) + for part_info in part_infos: + self.assertEqual(part_info['policy'], self.policy) + self.assertTrue(part_info['local_dev'] in stub_ring_devs) + dev = part_info['local_dev'] + self.assertEqual(part_info['part_path'], + os.path.join(self.devices, + dev['device'], + diskfile.get_data_dir(self.policy), + str(part_info['partition']))) + + def test_collect_parts_multi_device_skips_non_non_local_devs(self): + device_parts = { + 'sda': (374,), + 'sdb': (179, 807), # "not local" via different port + 'sdc': (363, 468, 843), + 'sdd': (912,), # "not local" via different IP + } + for policy in POLICIES: + datadir = diskfile.get_data_dir(policy) + for dev, parts in device_parts.items(): + for part in parts: + utils.mkdirs(os.path.join( + self.devices, dev, + datadir, str(part))) + + # we're only going to add sda and sdc into the ring + local_devs = ('sda', 'sdc') + stub_ring_devs = [{ + 'device': dev, + 'replication_ip': self.ip, + 'replication_port': self.port, + } for dev in local_devs] + stub_ring_devs.append({ + 'device': 'sdb', + 'replication_ip': self.ip, + 'replication_port': self.port + 1, # not local via port + }) + stub_ring_devs.append({ + 'device': 'sdd', + 'replication_ip': '127.0.0.88', # not local via IP + 'replication_port': self.port, + }) + self.reconstructor.bind_ip = '0.0.0.0' # use whataremyips + with nested(mock.patch('swift.obj.reconstructor.whataremyips', + return_value=[self.ip]), + mock.patch.object(self.policy.object_ring, '_devs', + new=stub_ring_devs)): + part_infos = list(self.reconstructor.collect_parts()) + found_parts = sorted(int(p['partition']) for p in part_infos) + expected_parts = sorted(itertools.chain( + *(device_parts[d] for d in local_devs))) + self.assertEqual(found_parts, expected_parts) + for part_info in part_infos: + self.assertEqual(part_info['policy'], self.policy) + self.assertTrue(part_info['local_dev'] in stub_ring_devs) + dev = part_info['local_dev'] + self.assertEqual(part_info['part_path'], + os.path.join(self.devices, + dev['device'], + diskfile.get_data_dir(self.policy), + str(part_info['partition']))) + def test_collect_parts_multi_device_skips_non_ring_devices(self): device_parts = { 'sda': (374,), - 'sdb': (179, 807), 'sdc': (363, 468, 843), } for policy in POLICIES: @@ -1075,8 +1171,9 @@ class TestObjectReconstructor(unittest.TestCase): stub_ring_devs = [{ 'device': dev, 'replication_ip': self.ip, - 'replication_port': self.port + 'replication_port': self.port, } for dev in local_devs] + self.reconstructor.bind_ip = '0.0.0.0' # use whataremyips with nested(mock.patch('swift.obj.reconstructor.whataremyips', return_value=[self.ip]), mock.patch.object(self.policy.object_ring, '_devs', diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index a0844ebb8f..08eb88b9aa 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -36,9 +36,8 @@ from swift.obj import diskfile, replicator as object_replicator from swift.common.storage_policy import StoragePolicy, POLICIES -def _ips(): +def _ips(*args, **kwargs): return ['127.0.0.0'] -object_replicator.whataremyips = _ips def mock_http_connect(status): @@ -171,34 +170,46 @@ class TestObjectReplicator(unittest.TestCase): rmtree(self.testdir, ignore_errors=1) os.mkdir(self.testdir) os.mkdir(self.devices) - os.mkdir(os.path.join(self.devices, 'sda')) - self.objects = os.path.join(self.devices, 'sda', - diskfile.get_data_dir(POLICIES[0])) - self.objects_1 = os.path.join(self.devices, 'sda', - diskfile.get_data_dir(POLICIES[1])) - os.mkdir(self.objects) - os.mkdir(self.objects_1) - self.parts = {} - self.parts_1 = {} - for part in ['0', '1', '2', '3']: - self.parts[part] = os.path.join(self.objects, part) - os.mkdir(self.parts[part]) - self.parts_1[part] = os.path.join(self.objects_1, part) - os.mkdir(self.parts_1[part]) + + self.objects, self.objects_1, self.parts, self.parts_1 = \ + self._write_disk_data('sda') _create_test_rings(self.testdir) + self.logger = debug_logger('test-replicator') self.conf = dict( + bind_ip=_ips()[0], bind_port=6000, swift_dir=self.testdir, devices=self.devices, mount_check='false', timeout='300', stats_interval='1', sync_method='rsync') - self.replicator = object_replicator.ObjectReplicator(self.conf) - self.logger = self.replicator.logger = debug_logger('test-replicator') - self.df_mgr = diskfile.DiskFileManager(self.conf, - self.replicator.logger) + self._create_replicator() def tearDown(self): rmtree(self.testdir, ignore_errors=1) + def _write_disk_data(self, disk_name): + os.mkdir(os.path.join(self.devices, disk_name)) + objects = os.path.join(self.devices, disk_name, + diskfile.get_data_dir(POLICIES[0])) + objects_1 = os.path.join(self.devices, disk_name, + diskfile.get_data_dir(POLICIES[1])) + os.mkdir(objects) + os.mkdir(objects_1) + parts = {} + parts_1 = {} + for part in ['0', '1', '2', '3']: + parts[part] = os.path.join(objects, part) + os.mkdir(parts[part]) + parts_1[part] = os.path.join(objects_1, part) + os.mkdir(parts_1[part]) + + return objects, objects_1, parts, parts_1 + + def _create_replicator(self): + self.replicator = object_replicator.ObjectReplicator(self.conf) + self.replicator.logger = self.logger + self.df_mgr = diskfile.DiskFileManager(self.conf, self.logger) + def test_run_once(self): conf = dict(swift_dir=self.testdir, devices=self.devices, + bind_ip=_ips()[0], mount_check='false', timeout='300', stats_interval='1') replicator = object_replicator.ObjectReplicator(conf) was_connector = object_replicator.http_connect @@ -260,7 +271,9 @@ class TestObjectReplicator(unittest.TestCase): process_arg_checker.append( (0, '', ['rsync', whole_path_from, rsync_mods])) with _mock_process(process_arg_checker): - replicator.run_once() + with mock.patch('swift.obj.replicator.whataremyips', + side_effect=_ips): + replicator.run_once() self.assertFalse(process_errors) object_replicator.http_connect = was_connector @@ -321,17 +334,306 @@ class TestObjectReplicator(unittest.TestCase): [node['id'] for node in jobs_by_pol_part['12']['nodes']], [2, 3]) self.assertEquals( [node['id'] for node in jobs_by_pol_part['13']['nodes']], [3, 1]) - for part in ['00', '01', '02', '03', ]: + for part in ['00', '01', '02', '03']: for node in jobs_by_pol_part[part]['nodes']: self.assertEquals(node['device'], 'sda') self.assertEquals(jobs_by_pol_part[part]['path'], os.path.join(self.objects, part[1:])) - for part in ['10', '11', '12', '13', ]: + for part in ['10', '11', '12', '13']: for node in jobs_by_pol_part[part]['nodes']: self.assertEquals(node['device'], 'sda') self.assertEquals(jobs_by_pol_part[part]['path'], os.path.join(self.objects_1, part[1:])) + @mock.patch('swift.obj.replicator.random.shuffle', side_effect=lambda l: l) + def test_collect_jobs_multi_disk(self, mock_shuffle): + devs = [ + # Two disks on same IP/port + {'id': 0, 'device': 'sda', 'zone': 0, + 'region': 1, 'ip': '1.1.1.1', 'port': 1111, + 'replication_ip': '127.0.0.0', 'replication_port': 6000}, + {'id': 1, 'device': 'sdb', 'zone': 1, + 'region': 1, 'ip': '1.1.1.1', 'port': 1111, + 'replication_ip': '127.0.0.0', 'replication_port': 6000}, + # Two disks on same server, different ports + {'id': 2, 'device': 'sdc', 'zone': 2, + 'region': 2, 'ip': '1.1.1.2', 'port': 1112, + 'replication_ip': '127.0.0.1', 'replication_port': 6000}, + {'id': 3, 'device': 'sdd', 'zone': 4, + 'region': 2, 'ip': '1.1.1.2', 'port': 1112, + 'replication_ip': '127.0.0.1', 'replication_port': 6001}, + ] + objects_sdb, objects_1_sdb, _, _ = self._write_disk_data('sdb') + objects_sdc, objects_1_sdc, _, _ = self._write_disk_data('sdc') + objects_sdd, objects_1_sdd, _, _ = self._write_disk_data('sdd') + _create_test_rings(self.testdir, devs) + + jobs = self.replicator.collect_jobs() + + self.assertEqual([mock.call(jobs)], mock_shuffle.mock_calls) + + jobs_to_delete = [j for j in jobs if j['delete']] + self.assertEquals(len(jobs_to_delete), 4) + self.assertEqual([ + '1', '2', # policy 0; 1 not on sda, 2 not on sdb + '1', '2', # policy 1; 1 not on sda, 2 not on sdb + ], [j['partition'] for j in jobs_to_delete]) + + jobs_by_pol_part_dev = {} + for job in jobs: + # There should be no jobs with a device not in just sda & sdb + self.assertTrue(job['device'] in ('sda', 'sdb')) + jobs_by_pol_part_dev[ + str(int(job['policy'])) + job['partition'] + job['device'] + ] = job + + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['00sda']['nodes']], + [1, 2]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['00sdb']['nodes']], + [0, 2]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['01sda']['nodes']], + [1, 2, 3]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['01sdb']['nodes']], + [2, 3]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['02sda']['nodes']], + [2, 3]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['02sdb']['nodes']], + [2, 3, 0]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['03sda']['nodes']], + [3, 1]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['03sdb']['nodes']], + [3, 0]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['10sda']['nodes']], + [1, 2]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['10sdb']['nodes']], + [0, 2]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['11sda']['nodes']], + [1, 2, 3]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['11sdb']['nodes']], + [2, 3]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['12sda']['nodes']], + [2, 3]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['12sdb']['nodes']], + [2, 3, 0]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['13sda']['nodes']], + [3, 1]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['13sdb']['nodes']], + [3, 0]) + for part in ['00', '01', '02', '03']: + self.assertEquals(jobs_by_pol_part_dev[part + 'sda']['path'], + os.path.join(self.objects, part[1:])) + self.assertEquals(jobs_by_pol_part_dev[part + 'sdb']['path'], + os.path.join(objects_sdb, part[1:])) + for part in ['10', '11', '12', '13']: + self.assertEquals(jobs_by_pol_part_dev[part + 'sda']['path'], + os.path.join(self.objects_1, part[1:])) + self.assertEquals(jobs_by_pol_part_dev[part + 'sdb']['path'], + os.path.join(objects_1_sdb, part[1:])) + + @mock.patch('swift.obj.replicator.random.shuffle', side_effect=lambda l: l) + def test_collect_jobs_multi_disk_diff_ports_normal(self, mock_shuffle): + # Normally (servers_per_port=0), replication_ip AND replication_port + # are used to determine local ring device entries. Here we show that + # with bind_ip='127.0.0.1', bind_port=6000, only "sdc" is local. + devs = [ + # Two disks on same IP/port + {'id': 0, 'device': 'sda', 'zone': 0, + 'region': 1, 'ip': '1.1.1.1', 'port': 1111, + 'replication_ip': '127.0.0.0', 'replication_port': 6000}, + {'id': 1, 'device': 'sdb', 'zone': 1, + 'region': 1, 'ip': '1.1.1.1', 'port': 1111, + 'replication_ip': '127.0.0.0', 'replication_port': 6000}, + # Two disks on same server, different ports + {'id': 2, 'device': 'sdc', 'zone': 2, + 'region': 2, 'ip': '1.1.1.2', 'port': 1112, + 'replication_ip': '127.0.0.1', 'replication_port': 6000}, + {'id': 3, 'device': 'sdd', 'zone': 4, + 'region': 2, 'ip': '1.1.1.2', 'port': 1112, + 'replication_ip': '127.0.0.1', 'replication_port': 6001}, + ] + objects_sdb, objects_1_sdb, _, _ = self._write_disk_data('sdb') + objects_sdc, objects_1_sdc, _, _ = self._write_disk_data('sdc') + objects_sdd, objects_1_sdd, _, _ = self._write_disk_data('sdd') + _create_test_rings(self.testdir, devs) + + self.conf['bind_ip'] = '127.0.0.1' + self._create_replicator() + + jobs = self.replicator.collect_jobs() + + self.assertEqual([mock.call(jobs)], mock_shuffle.mock_calls) + + jobs_to_delete = [j for j in jobs if j['delete']] + self.assertEquals(len(jobs_to_delete), 2) + self.assertEqual([ + '3', # policy 0; 3 not on sdc + '3', # policy 1; 3 not on sdc + ], [j['partition'] for j in jobs_to_delete]) + + jobs_by_pol_part_dev = {} + for job in jobs: + # There should be no jobs with a device not sdc + self.assertEqual(job['device'], 'sdc') + jobs_by_pol_part_dev[ + str(int(job['policy'])) + job['partition'] + job['device'] + ] = job + + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['00sdc']['nodes']], + [0, 1]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['01sdc']['nodes']], + [1, 3]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['02sdc']['nodes']], + [3, 0]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['03sdc']['nodes']], + [3, 0, 1]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['10sdc']['nodes']], + [0, 1]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['11sdc']['nodes']], + [1, 3]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['12sdc']['nodes']], + [3, 0]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['13sdc']['nodes']], + [3, 0, 1]) + for part in ['00', '01', '02', '03']: + self.assertEquals(jobs_by_pol_part_dev[part + 'sdc']['path'], + os.path.join(objects_sdc, part[1:])) + for part in ['10', '11', '12', '13']: + self.assertEquals(jobs_by_pol_part_dev[part + 'sdc']['path'], + os.path.join(objects_1_sdc, part[1:])) + + @mock.patch('swift.obj.replicator.random.shuffle', side_effect=lambda l: l) + def test_collect_jobs_multi_disk_servers_per_port(self, mock_shuffle): + # Normally (servers_per_port=0), replication_ip AND replication_port + # are used to determine local ring device entries. Here we show that + # with servers_per_port > 0 and bind_ip='127.0.0.1', bind_port=6000, + # then both "sdc" and "sdd" are local. + devs = [ + # Two disks on same IP/port + {'id': 0, 'device': 'sda', 'zone': 0, + 'region': 1, 'ip': '1.1.1.1', 'port': 1111, + 'replication_ip': '127.0.0.0', 'replication_port': 6000}, + {'id': 1, 'device': 'sdb', 'zone': 1, + 'region': 1, 'ip': '1.1.1.1', 'port': 1111, + 'replication_ip': '127.0.0.0', 'replication_port': 6000}, + # Two disks on same server, different ports + {'id': 2, 'device': 'sdc', 'zone': 2, + 'region': 2, 'ip': '1.1.1.2', 'port': 1112, + 'replication_ip': '127.0.0.1', 'replication_port': 6000}, + {'id': 3, 'device': 'sdd', 'zone': 4, + 'region': 2, 'ip': '1.1.1.2', 'port': 1112, + 'replication_ip': '127.0.0.1', 'replication_port': 6001}, + ] + objects_sdb, objects_1_sdb, _, _ = self._write_disk_data('sdb') + objects_sdc, objects_1_sdc, _, _ = self._write_disk_data('sdc') + objects_sdd, objects_1_sdd, _, _ = self._write_disk_data('sdd') + _create_test_rings(self.testdir, devs) + + self.conf['bind_ip'] = '127.0.0.1' + self.conf['servers_per_port'] = 1 # diff port ok + self._create_replicator() + + jobs = self.replicator.collect_jobs() + + self.assertEqual([mock.call(jobs)], mock_shuffle.mock_calls) + + jobs_to_delete = [j for j in jobs if j['delete']] + self.assertEquals(len(jobs_to_delete), 4) + self.assertEqual([ + '3', '0', # policy 0; 3 not on sdc, 0 not on sdd + '3', '0', # policy 1; 3 not on sdc, 0 not on sdd + ], [j['partition'] for j in jobs_to_delete]) + + jobs_by_pol_part_dev = {} + for job in jobs: + # There should be no jobs with a device not in just sdc & sdd + self.assertTrue(job['device'] in ('sdc', 'sdd')) + jobs_by_pol_part_dev[ + str(int(job['policy'])) + job['partition'] + job['device'] + ] = job + + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['00sdc']['nodes']], + [0, 1]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['00sdd']['nodes']], + [0, 1, 2]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['01sdc']['nodes']], + [1, 3]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['01sdd']['nodes']], + [1, 2]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['02sdc']['nodes']], + [3, 0]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['02sdd']['nodes']], + [2, 0]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['03sdc']['nodes']], + [3, 0, 1]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['03sdd']['nodes']], + [0, 1]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['10sdc']['nodes']], + [0, 1]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['10sdd']['nodes']], + [0, 1, 2]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['11sdc']['nodes']], + [1, 3]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['11sdd']['nodes']], + [1, 2]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['12sdc']['nodes']], + [3, 0]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['12sdd']['nodes']], + [2, 0]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['13sdc']['nodes']], + [3, 0, 1]) + self.assertEquals([node['id'] + for node in jobs_by_pol_part_dev['13sdd']['nodes']], + [0, 1]) + for part in ['00', '01', '02', '03']: + self.assertEquals(jobs_by_pol_part_dev[part + 'sdc']['path'], + os.path.join(objects_sdc, part[1:])) + self.assertEquals(jobs_by_pol_part_dev[part + 'sdd']['path'], + os.path.join(objects_sdd, part[1:])) + for part in ['10', '11', '12', '13']: + self.assertEquals(jobs_by_pol_part_dev[part + 'sdc']['path'], + os.path.join(objects_1_sdc, part[1:])) + self.assertEquals(jobs_by_pol_part_dev[part + 'sdd']['path'], + os.path.join(objects_1_sdd, part[1:])) + def test_collect_jobs_handoffs_first(self): self.replicator.handoffs_first = True jobs = self.replicator.collect_jobs() @@ -929,6 +1231,7 @@ class TestObjectReplicator(unittest.TestCase): def test_run_once_recover_from_failure(self): conf = dict(swift_dir=self.testdir, devices=self.devices, + bind_ip=_ips()[0], mount_check='false', timeout='300', stats_interval='1') replicator = object_replicator.ObjectReplicator(conf) was_connector = object_replicator.http_connect @@ -975,6 +1278,7 @@ class TestObjectReplicator(unittest.TestCase): def test_run_once_recover_from_timeout(self): conf = dict(swift_dir=self.testdir, devices=self.devices, + bind_ips=_ips()[0], mount_check='false', timeout='300', stats_interval='1') replicator = object_replicator.ObjectReplicator(conf) was_connector = object_replicator.http_connect