From ea8e545a27f06868323ff91c1584d18ab9ac6cda Mon Sep 17 00:00:00 2001 From: Clay Gerrard Date: Mon, 4 Feb 2019 15:46:40 -0600 Subject: [PATCH] Rebuild frags for unmounted disks Change the behavior of the EC reconstructor to perform a fragment rebuild to a handoff node when a primary peer responds with 507 to the REPLICATE request. Each primary node in a EC ring will sync with exactly three primary peers, in addition to the left & right nodes we now select a third node from the far side of the ring. If any of these partners respond unmounted the reconstructor will rebuild it's fragments to a handoff node with the appropriate index. To prevent ssync (which is uninterruptible) receiving a 409 (Conflict) we must give the remote handoff node the correct backend_index for the fragments it will recieve. In the common case we will use determistically different handoffs for each fragment index to prevent multiple unmounted primary disks from forcing a single handoff node to hold more than one rebuilt fragment. Handoff nodes will continue to attempt to revert rebuilt handoff fragments to the appropriate primary until it is remounted or rebalanced. After a rebalance of EC rings (potentially removing unmounted/failed devices), it's most IO efficient to run in handoffs_only mode to avoid unnecessary rebuilds. Closes-Bug: #1510342 Change-Id: Ief44ed39d97f65e4270bf73051da9a2dd0ddbaec --- etc/object-server.conf-sample | 9 + swift/common/ring/ring.py | 68 +- swift/obj/reconstructor.py | 154 +++-- test/probe/test_reconstructor_rebuild.py | 66 +- test/unit/__init__.py | 8 +- test/unit/common/ring/test_ring.py | 4 + test/unit/obj/test_reconstructor.py | 772 ++++++++++++++++------- 7 files changed, 721 insertions(+), 360 deletions(-) diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index 6cbc79a098..2d060fbd9c 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -352,6 +352,15 @@ use = egg:swift#recon # honored as a synonym, but may be ignored in a future release. # handoffs_only = False # +# The default strategy for unmounted drives will stage rebuilt data on a +# handoff node until updated rings are deployed. Because fragments are rebuilt +# on offset handoffs based on fragment index and the proxy limits how deep it +# will search for EC frags we restrict how many nodes we'll try. Setting to 0 +# will disable rebuilds to handoffs and only rebuild fragments for unmounted +# devices to mounted primaries after a ring change. +# Setting to -1 means "no limit". +# rebuild_handoff_node_count = 2 +# # You can set scheduling priority of processes. Niceness values range from -20 # (most favorable to the process) to 19 (least favorable to the process). # nice_priority = diff --git a/swift/common/ring/ring.py b/swift/common/ring/ring.py index a28d97d2fa..ed888e28da 100644 --- a/swift/common/ring/ring.py +++ b/swift/common/ring/ring.py @@ -24,7 +24,7 @@ from time import time import os from io import BufferedReader from hashlib import md5 -from itertools import chain +from itertools import chain, count from tempfile import NamedTemporaryFile import sys @@ -237,35 +237,36 @@ class Ring(object): self._replica2part2dev_id = ring_data._replica2part2dev_id self._part_shift = ring_data._part_shift self._rebuild_tier_data() - - # Do this now, when we know the data has changed, rather than - # doing it on every call to get_more_nodes(). - # - # Since this is to speed up the finding of handoffs, we only - # consider devices with at least one partition assigned. This - # way, a region, zone, or server with no partitions assigned - # does not count toward our totals, thereby keeping the early - # bailouts in get_more_nodes() working. - dev_ids_with_parts = set() - for part2dev_id in self._replica2part2dev_id: - for dev_id in part2dev_id: - dev_ids_with_parts.add(dev_id) - - regions = set() - zones = set() - ips = set() - self._num_devs = 0 - for dev in self._devs: - if dev and dev['id'] in dev_ids_with_parts: - regions.add(dev['region']) - zones.add((dev['region'], dev['zone'])) - ips.add((dev['region'], dev['zone'], dev['ip'])) - self._num_devs += 1 - self._num_regions = len(regions) - self._num_zones = len(zones) - self._num_ips = len(ips) + self._update_bookkeeping() self._next_part_power = ring_data.next_part_power + def _update_bookkeeping(self): + # Do this now, when we know the data has changed, rather than + # doing it on every call to get_more_nodes(). + # + # Since this is to speed up the finding of handoffs, we only + # consider devices with at least one partition assigned. This + # way, a region, zone, or server with no partitions assigned + # does not count toward our totals, thereby keeping the early + # bailouts in get_more_nodes() working. + dev_ids_with_parts = set() + for part2dev_id in self._replica2part2dev_id: + for dev_id in part2dev_id: + dev_ids_with_parts.add(dev_id) + regions = set() + zones = set() + ips = set() + self._num_devs = 0 + for dev in self._devs: + if dev and dev['id'] in dev_ids_with_parts: + regions.add(dev['region']) + zones.add((dev['region'], dev['zone'])) + ips.add((dev['region'], dev['zone'], dev['ip'])) + self._num_devs += 1 + self._num_regions = len(regions) + self._num_zones = len(zones) + self._num_ips = len(ips) + @property def next_part_power(self): return self._next_part_power @@ -407,8 +408,8 @@ class Ring(object): if time() > self._rtime: self._reload() primary_nodes = self._get_part_nodes(part) - used = set(d['id'] for d in primary_nodes) + index = count() same_regions = set(d['region'] for d in primary_nodes) same_zones = set((d['region'], d['zone']) for d in primary_nodes) same_ips = set( @@ -434,7 +435,7 @@ class Ring(object): dev = self._devs[dev_id] region = dev['region'] if dev_id not in used and region not in same_regions: - yield dev + yield dict(dev, handoff_index=next(index)) used.add(dev_id) same_regions.add(region) zone = dev['zone'] @@ -459,7 +460,7 @@ class Ring(object): dev = self._devs[dev_id] zone = (dev['region'], dev['zone']) if dev_id not in used and zone not in same_zones: - yield dev + yield dict(dev, handoff_index=next(index)) used.add(dev_id) same_zones.add(zone) ip = zone + (dev['ip'],) @@ -482,7 +483,7 @@ class Ring(object): dev = self._devs[dev_id] ip = (dev['region'], dev['zone'], dev['ip']) if dev_id not in used and ip not in same_ips: - yield dev + yield dict(dev, handoff_index=next(index)) used.add(dev_id) same_ips.add(ip) if len(same_ips) == self._num_ips: @@ -501,7 +502,8 @@ class Ring(object): if handoff_part < len(part2dev_id): dev_id = part2dev_id[handoff_part] if dev_id not in used: - yield self._devs[dev_id] + dev = self._devs[dev_id] + yield dict(dev, handoff_index=next(index)) used.add(dev_id) if len(used) == self._num_devs: hit_all_devs = True diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index 64497552e3..71ae06e5fc 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -19,7 +19,6 @@ import os from os.path import join import random import time -import itertools from collections import defaultdict import six import six.moves.cPickle as pickle @@ -51,18 +50,22 @@ from swift.common.exceptions import ConnectionTimeout, DiskFileError, \ SYNC, REVERT = ('sync_only', 'sync_revert') -def _get_partners(frag_index, part_nodes): +def _get_partners(node_index, part_nodes): """ - Returns the left and right partners of the node whose index is - equal to the given frag_index. + Returns the left, right and far partners of the node whose index is equal + to the given node_index. - :param frag_index: a fragment index + :param node_index: the primary index :param part_nodes: a list of primary nodes - :returns: [, ] + :returns: [, , ] """ + num_nodes = len(part_nodes) return [ - part_nodes[(frag_index - 1) % len(part_nodes)], - part_nodes[(frag_index + 1) % len(part_nodes)], + part_nodes[(node_index - 1) % num_nodes], + part_nodes[(node_index + 1) % num_nodes], + part_nodes[( + node_index + (num_nodes // 2) + ) % num_nodes], ] @@ -203,6 +206,8 @@ class ObjectReconstructor(Daemon): elif default_handoffs_only: self.logger.warning('Ignored handoffs_first option in favor ' 'of handoffs_only.') + self.rebuild_handoff_node_count = int(conf.get( + 'rebuild_handoff_node_count', 2)) self._df_router = DiskFileRouter(conf, self.logger) self.all_local_devices = self.get_local_devices() @@ -667,6 +672,33 @@ class ObjectReconstructor(Daemon): _("Trying to sync suffixes with %s") % _full_path( node, job['partition'], '', job['policy'])) + def _iter_nodes_for_frag(self, policy, partition, node): + """ + Generate a priority list of nodes that can sync to the given node. + + The primary node is always the highest priority, after that we'll use + handoffs. + + To avoid conflicts placing frags we'll skip through the handoffs and + only yield back those that are offset equal to to the given primary + node index. + + Nodes returned from this iterator will have 'backend_index' set. + """ + node['backend_index'] = policy.get_backend_index(node['index']) + yield node + count = 0 + for handoff_node in policy.object_ring.get_more_nodes(partition): + handoff_backend_index = policy.get_backend_index( + handoff_node['handoff_index']) + if handoff_backend_index == node['backend_index']: + if (self.rebuild_handoff_node_count >= 0 and + count >= self.rebuild_handoff_node_count): + break + handoff_node['backend_index'] = handoff_backend_index + yield handoff_node + count += 1 + def _get_suffixes_to_sync(self, job, node): """ For SYNC jobs we need to make a remote REPLICATE request to get @@ -677,48 +709,56 @@ class ObjectReconstructor(Daemon): :param: the job dict, with the keys defined in ``_get_part_jobs`` :param node: the remote node dict :returns: a (possibly empty) list of strings, the suffixes to be - synced with the remote node. + synced and the remote node. """ # get hashes from the remote node remote_suffixes = None + attempts_remaining = 1 headers = self.headers.copy() headers['X-Backend-Storage-Policy-Index'] = int(job['policy']) - try: - with Timeout(self.http_timeout): - resp = http_connect( - node['replication_ip'], node['replication_port'], - node['device'], job['partition'], 'REPLICATE', - '', headers=headers).getresponse() - if resp.status == HTTP_INSUFFICIENT_STORAGE: - self.logger.error( - _('%s responded as unmounted'), - _full_path(node, job['partition'], '', - job['policy'])) - elif resp.status != HTTP_OK: - full_path = _full_path(node, job['partition'], '', - job['policy']) - self.logger.error( - _("Invalid response %(resp)s from %(full_path)s"), - {'resp': resp.status, 'full_path': full_path}) - else: - remote_suffixes = pickle.loads(resp.read()) - except (Exception, Timeout): - # all exceptions are logged here so that our caller can - # safely catch our exception and continue to the next node - # without logging - self.logger.exception('Unable to get remote suffix hashes ' - 'from %r' % _full_path( - node, job['partition'], '', - job['policy'])) - + possible_nodes = self._iter_nodes_for_frag( + job['policy'], job['partition'], node) + while remote_suffixes is None and attempts_remaining: + try: + node = next(possible_nodes) + except StopIteration: + break + attempts_remaining -= 1 + try: + with Timeout(self.http_timeout): + resp = http_connect( + node['replication_ip'], node['replication_port'], + node['device'], job['partition'], 'REPLICATE', + '', headers=headers).getresponse() + if resp.status == HTTP_INSUFFICIENT_STORAGE: + self.logger.error( + _('%s responded as unmounted'), + _full_path(node, job['partition'], '', + job['policy'])) + attempts_remaining += 1 + elif resp.status != HTTP_OK: + full_path = _full_path(node, job['partition'], '', + job['policy']) + self.logger.error( + _("Invalid response %(resp)s from %(full_path)s"), + {'resp': resp.status, 'full_path': full_path}) + else: + remote_suffixes = pickle.loads(resp.read()) + except (Exception, Timeout): + # all exceptions are logged here so that our caller can + # safely catch our exception and continue to the next node + # without logging + self.logger.exception('Unable to get remote suffix hashes ' + 'from %r' % _full_path( + node, job['partition'], '', + job['policy'])) if remote_suffixes is None: raise SuffixSyncError('Unable to get remote suffix hashes') suffixes = self.get_suffix_delta(job['hashes'], job['frag_index'], remote_suffixes, - job['policy'].get_backend_index( - node['index'])) + node['backend_index']) # now recalculate local hashes for suffixes that don't # match so we're comparing the latest local_suff = self._get_hashes(job['local_dev']['device'], @@ -728,11 +768,10 @@ class ObjectReconstructor(Daemon): suffixes = self.get_suffix_delta(local_suff, job['frag_index'], remote_suffixes, - job['policy'].get_backend_index( - node['index'])) + node['backend_index']) self.suffix_count += len(suffixes) - return suffixes + return suffixes, node def delete_reverted_objs(self, job, objects, frag_index): """ @@ -798,38 +837,15 @@ class ObjectReconstructor(Daemon): """ self.logger.increment( 'partition.update.count.%s' % (job['local_dev']['device'],)) - # after our left and right partners, if there's some sort of - # failure we'll continue onto the remaining primary nodes and - # make sure they're in sync - or potentially rebuild missing - # fragments we find - dest_nodes = itertools.chain( - job['sync_to'], - # I think we could order these based on our index to better - # protect against a broken chain - [ - n for n in - job['policy'].object_ring.get_part_nodes(job['partition']) - if n['id'] != job['local_dev']['id'] and - n['id'] not in (m['id'] for m in job['sync_to']) - ], - ) - syncd_with = 0 - for node in dest_nodes: - if syncd_with >= len(job['sync_to']): - # success! - break - + for node in job['sync_to']: try: - suffixes = self._get_suffixes_to_sync(job, node) + suffixes, node = self._get_suffixes_to_sync(job, node) except SuffixSyncError: continue if not suffixes: - syncd_with += 1 continue - node['backend_index'] = job['policy'].get_backend_index( - node['index']) # ssync any out-of-sync suffixes with the remote node success, _ = ssync_sender( self, node, job, suffixes)() @@ -838,8 +854,6 @@ class ObjectReconstructor(Daemon): # update stats for this attempt self.suffix_sync += len(suffixes) self.logger.update_stats('suffix.syncs', len(suffixes)) - if success: - syncd_with += 1 self.logger.timing_since('partition.update.timing', begin) def _revert(self, job, begin): @@ -951,6 +965,8 @@ class ObjectReconstructor(Daemon): try: suffixes = data_fi_to_suffixes.pop(frag_index) except KeyError: + # N.B. If this function ever returns an empty list of jobs + # the entire partition will be deleted. suffixes = [] sync_job = build_job( job_type=SYNC, diff --git a/test/probe/test_reconstructor_rebuild.py b/test/probe/test_reconstructor_rebuild.py index d64d85fa5b..5ebfb7381f 100644 --- a/test/probe/test_reconstructor_rebuild.py +++ b/test/probe/test_reconstructor_rebuild.py @@ -22,7 +22,6 @@ import unittest import uuid import shutil import random -from collections import defaultdict import os import time @@ -32,7 +31,6 @@ from test.probe.common import ECProbeTest from swift.common import direct_client from swift.common.storage_policy import EC_POLICY from swift.common.manager import Manager -from swift.obj.reconstructor import _get_partners from swiftclient import client, ClientException @@ -300,46 +298,46 @@ class TestReconstructorRebuild(ECProbeTest): self._test_rebuild_scenario(failed, non_durable, 3) def test_rebuild_partner_down(self): - # find a primary server that only has one of it's devices in the - # primary node list - group_nodes_by_config = defaultdict(list) - for n in self.onodes: - group_nodes_by_config[self.config_number(n)].append(n) - for config_number, node_list in group_nodes_by_config.items(): - if len(node_list) == 1: - break - else: - self.fail('ring balancing did not use all available nodes') - primary_node = node_list[0] + # we have to pick a lower index because we have few handoffs + nodes = self.onodes[:2] + random.shuffle(nodes) # left or right is fine + primary_node, partner_node = nodes - # pick one it's partners to fail randomly - partner_node = random.choice(_get_partners( - primary_node['index'], self.onodes)) + # capture fragment etag from partner + failed_partner_meta, failed_partner_etag = self.direct_get( + partner_node, self.opart) - # 507 the partner device + # and 507 the failed partner device device_path = self.device_dir('object', partner_node) self.kill_drive(device_path) - # select another primary sync_to node to fail - failed_primary = [n for n in self.onodes if n['id'] not in - (primary_node['id'], partner_node['id'])][0] - # ... capture it's fragment etag - failed_primary_meta, failed_primary_etag = self.direct_get( - failed_primary, self.opart) - # ... and delete it - part_dir = self.storage_dir('object', failed_primary, part=self.opart) - shutil.rmtree(part_dir, True) - # reconstruct from the primary, while one of it's partners is 507'd self.reconstructor.once(number=self.config_number(primary_node)) - # the other failed primary will get it's fragment rebuilt instead - failed_primary_meta_new, failed_primary_etag_new = self.direct_get( - failed_primary, self.opart) - del failed_primary_meta['Date'] - del failed_primary_meta_new['Date'] - self.assertEqual(failed_primary_etag, failed_primary_etag_new) - self.assertEqual(failed_primary_meta, failed_primary_meta_new) + # a handoff will pickup the rebuild + hnodes = list(self.object_ring.get_more_nodes(self.opart)) + for node in hnodes: + try: + found_meta, found_etag = self.direct_get( + node, self.opart) + except DirectClientException as e: + if e.http_status != 404: + raise + else: + break + else: + self.fail('Unable to fetch rebuilt frag from handoffs %r ' + 'given primary nodes %r with %s unmounted ' + 'trying to rebuild from %s' % ( + [h['device'] for h in hnodes], + [n['device'] for n in self.onodes], + partner_node['device'], + primary_node['device'], + )) + self.assertEqual(failed_partner_etag, found_etag) + del failed_partner_meta['Date'] + del found_meta['Date'] + self.assertEqual(failed_partner_meta, found_meta) # just to be nice self.revive_drive(device_path) diff --git a/test/unit/__init__.py b/test/unit/__init__.py index de29af08be..7109e3c210 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -274,6 +274,7 @@ class FakeRing(Ring): return [dict(node, index=i) for i, node in enumerate(list(self._devs))] def get_more_nodes(self, part): + index_counter = itertools.count() for x in range(self.replicas, (self.replicas + self.max_more_nodes)): yield {'ip': '10.0.0.%s' % x, 'replication_ip': '10.0.0.%s' % x, @@ -282,7 +283,8 @@ class FakeRing(Ring): 'device': 'sda', 'zone': x % 3, 'region': x % 2, - 'id': x} + 'id': x, + 'handoff_index': next(index_counter)} def write_fake_ring(path, *devs): @@ -346,6 +348,9 @@ class FabricatedRing(Ring): self._part_shift = 32 - part_power self._reload() + def has_changed(self): + return False + def _reload(self, *args, **kwargs): self._rtime = time.time() * 2 if hasattr(self, '_replica2part2dev_id'): @@ -370,6 +375,7 @@ class FabricatedRing(Ring): for p in range(2 ** self.part_power): for r in range(self.replicas): self._replica2part2dev_id[r][p] = next(dev_ids) + self._update_bookkeeping() class FakeMemcache(object): diff --git a/test/unit/common/ring/test_ring.py b/test/unit/common/ring/test_ring.py index 17335f582b..222376c0c6 100644 --- a/test/unit/common/ring/test_ring.py +++ b/test/unit/common/ring/test_ring.py @@ -568,6 +568,10 @@ class TestRing(TestRingBase): self.assertEqual(len(devs), len(exp_handoffs)) dev_ids = [d['id'] for d in devs] self.assertEqual(dev_ids, exp_handoffs) + # We mark handoffs so code consuming extra nodes can reason about how + # far they've gone + for i, d in enumerate(devs): + self.assertEqual(d['handoff_index'], i) # The first 6 replicas plus the 3 primary nodes should cover all 9 # zones in this test diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index 6e48f92213..335983c15d 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -40,7 +40,7 @@ from swift.obj import diskfile, reconstructor as object_reconstructor from swift.common import ring from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy, POLICIES, EC_POLICY) -from swift.obj.reconstructor import REVERT +from swift.obj.reconstructor import SYNC, REVERT from test.unit import (patch_policies, debug_logger, mocked_http_conn, FabricatedRing, make_timestamp_iter, @@ -143,7 +143,7 @@ def get_header_frag_index(self, body): @patch_policies([StoragePolicy(0, name='zero', is_default=True), ECStoragePolicy(1, name='one', ec_type=DEFAULT_TEST_EC_TYPE, - ec_ndata=2, ec_nparity=1)]) + ec_ndata=3, ec_nparity=2)]) class TestGlobalSetupObjectReconstructor(unittest.TestCase): # Tests for reconstructor using real objects in test partition directories. legacy_durable = False @@ -151,9 +151,8 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): def setUp(self): skip_if_no_xattrs() self.testdir = tempfile.mkdtemp() - _create_test_rings(self.testdir) - POLICIES[0].object_ring = ring.Ring(self.testdir, ring_name='object') - POLICIES[1].object_ring = ring.Ring(self.testdir, ring_name='object-1') + POLICIES[0].object_ring = FabricatedRing(3) + POLICIES[1].object_ring = FabricatedRing(5) utils.HASH_PATH_SUFFIX = b'endcap' utils.HASH_PATH_PREFIX = b'' self.devices = os.path.join(self.testdir, 'node') @@ -176,7 +175,8 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.conf = dict( swift_dir=self.testdir, devices=self.devices, mount_check='false', - timeout='300', stats_interval='1') + timeout='300', stats_interval='1', + bind_ip='10.0.0.1', bind_port=6200) self.logger = debug_logger('test-reconstructor') self.reconstructor = object_reconstructor.ObjectReconstructor( self.conf, logger=self.logger) @@ -189,13 +189,16 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): # part 0: 3C1/hash/xxx#1#d.data <-- job: sync_only - partners (FI 1) # 061/hash/xxx#1#d.data <-- included in earlier job (FI 1) # /xxx#2#d.data <-- job: sync_revert to index 2 + # part_nodes: ['sda0', 'sda1', 'sda2', 'sda3', 'sda4'] - # part 1: 3C1/hash/xxx#0#d.data <-- job: sync_only - partners (FI 0) + # part 1: 3C1/hash/xxx#0#d.data <-- job: sync_revert to index 0 # /xxx#1#d.data <-- job: sync_revert to index 1 # 061/hash/xxx#1#d.data <-- included in earlier job (FI 1) + # part_nodes: ['sda5', 'sda6', 'sda7', 'sda0', 'sda1'] # part 2: 3C1/hash/xxx#2#d.data <-- job: sync_revert to index 2 # 061/hash/xxx#0#d.data <-- job: sync_revert to index 0 + # part_nodes: ['sda2', 'sda3', 'sda4', 'sda5', 'sda6'] def _create_frag_archives(policy, obj_path, local_id, obj_set): # we'll create 2 sets of objects in different suffix dirs @@ -251,7 +254,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): for obj_num in range(0, 3): _create_df(obj_num, part_num) - ips = utils.whataremyips() + ips = utils.whataremyips(self.reconstructor.bind_ip) for policy in [p for p in POLICIES if p.policy_type == EC_POLICY]: self.ec_policy = policy self.ec_obj_ring = self.reconstructor.load_object_ring( @@ -312,13 +315,14 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'sync_to': [{ 'index': 2, 'replication_port': 6200, - 'zone': 2, - 'ip': '127.0.0.2', + 'zone': 1, + 'ip': '10.0.0.2', 'region': 1, 'port': 6200, - 'replication_ip': '127.0.0.2', - 'device': 'sda1', + 'replication_ip': '10.0.0.2', + 'device': 'sda2', 'id': 2, + 'weight': 1.0, }], 'job_type': object_reconstructor.REVERT, 'suffixes': ['061'], @@ -328,11 +332,13 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'local_dev': { 'replication_port': 6200, 'zone': 1, - 'ip': '127.0.0.1', + 'ip': '10.0.0.1', 'region': 1, 'id': 1, - 'replication_ip': '127.0.0.1', - 'device': 'sda1', 'port': 6200, + 'replication_ip': '10.0.0.1', + 'device': 'sda1', + 'port': 6200, + 'weight': 1.0, }, 'hashes': { '061': { @@ -349,22 +355,36 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'sync_to': [{ 'index': 0, 'replication_port': 6200, - 'zone': 0, - 'ip': '127.0.0.0', + 'zone': 1, + 'ip': '10.0.0.0', 'region': 1, 'port': 6200, - 'replication_ip': '127.0.0.0', - 'device': 'sda1', 'id': 0, + 'replication_ip': '10.0.0.0', + 'device': 'sda0', + 'id': 0, + 'weight': 1.0, }, { 'index': 2, 'replication_port': 6200, - 'zone': 2, - 'ip': '127.0.0.2', + 'zone': 1, + 'ip': '10.0.0.2', 'region': 1, 'port': 6200, - 'replication_ip': '127.0.0.2', - 'device': 'sda1', + 'replication_ip': '10.0.0.2', + 'device': 'sda2', 'id': 2, + 'weight': 1.0, + }, { + 'index': 3, + 'replication_port': 6200, + 'zone': 1, + 'ip': '10.0.0.3', + 'region': 1, + 'port': 6200, + 'replication_ip': '10.0.0.3', + 'device': 'sda3', + 'id': 3, + 'weight': 1.0, }], 'job_type': object_reconstructor.SYNC, 'sync_diskfile_builder': self.reconstructor.reconstruct_fa, @@ -375,12 +395,13 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'local_dev': { 'replication_port': 6200, 'zone': 1, - 'ip': '127.0.0.1', + 'ip': '10.0.0.1', 'region': 1, 'id': 1, - 'replication_ip': '127.0.0.1', + 'replication_ip': '10.0.0.1', 'device': 'sda1', 'port': 6200, + 'weight': 1.0, }, 'hashes': { @@ -402,13 +423,14 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'sync_to': [{ 'index': 1, 'replication_port': 6200, - 'zone': 2, - 'ip': '127.0.0.2', + 'zone': 1, + 'ip': '10.0.0.2', 'region': 1, 'port': 6200, - 'replication_ip': '127.0.0.2', - 'device': 'sda1', - 'id': 2, + 'replication_ip': '10.0.0.2', + 'device': 'sda6', + 'id': 6, + 'weight': 1.0, }], 'job_type': object_reconstructor.REVERT, 'suffixes': ['061', '3c1'], @@ -418,12 +440,13 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'local_dev': { 'replication_port': 6200, 'zone': 1, - 'ip': '127.0.0.1', + 'ip': '10.0.0.1', 'region': 1, 'id': 1, - 'replication_ip': '127.0.0.1', + 'replication_ip': '10.0.0.1', 'device': 'sda1', 'port': 6200, + 'weight': 1.0, }, 'hashes': { @@ -439,27 +462,18 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): }, }, { 'sync_to': [{ - 'index': 2, + 'index': 0, 'replication_port': 6200, - 'zone': 4, - 'ip': '127.0.0.3', + 'zone': 1, + 'ip': '10.0.0.1', 'region': 1, 'port': 6200, - 'replication_ip': '127.0.0.3', - 'device': 'sda1', 'id': 3, - }, { - 'index': 1, - 'replication_port': 6200, - 'zone': 2, - 'ip': '127.0.0.2', - 'region': 1, - 'port': 6200, - 'replication_ip': '127.0.0.2', - 'device': 'sda1', - 'id': 2, + 'replication_ip': '10.0.0.1', + 'device': 'sda5', + 'id': 5, + 'weight': 1.0, }], - 'job_type': object_reconstructor.SYNC, - 'sync_diskfile_builder': self.reconstructor.reconstruct_fa, + 'job_type': object_reconstructor.REVERT, 'suffixes': ['3c1'], 'partition': 1, 'frag_index': 0, @@ -467,12 +481,13 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'local_dev': { 'replication_port': 6200, 'zone': 1, - 'ip': '127.0.0.1', + 'ip': '10.0.0.1', 'region': 1, 'id': 1, - 'replication_ip': '127.0.0.1', + 'replication_ip': '10.0.0.1', 'device': 'sda1', 'port': 6200, + 'weight': 1.0, }, 'hashes': { '061': { @@ -485,6 +500,70 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 1: '0e6e8d48d801dc89fd31904ae3b31229', }, }, + }, { + 'sync_to': [{ + 'index': 3, + 'replication_port': 6200, + 'zone': 1, + 'ip': '10.0.0.0', + 'region': 1, + 'port': 6200, + 'replication_ip': '10.0.0.0', + 'device': 'sda0', + 'id': 0, + 'weight': 1.0, + }, { + 'index': 0, + 'replication_port': 6200, + 'zone': 1, + 'ip': '10.0.0.1', + 'region': 1, + 'port': 6200, + 'replication_ip': '10.0.0.1', + 'device': 'sda5', + 'id': 5, + 'weight': 1.0, + }, { + 'index': 1, + 'replication_port': 6200, + 'zone': 1, + 'ip': '10.0.0.2', + 'region': 1, + 'port': 6200, + 'replication_ip': '10.0.0.2', + 'device': 'sda6', + 'id': 6, + 'weight': 1.0, + }], + 'job_type': object_reconstructor.SYNC, + 'sync_diskfile_builder': self.reconstructor.reconstruct_fa, + 'suffixes': [], + 'partition': 1, + 'frag_index': 4, + 'device': 'sda1', + 'local_dev': { + 'replication_port': 6200, + 'zone': 1, + 'ip': '10.0.0.1', + 'region': 1, + 'id': 1, + 'replication_ip': '10.0.0.1', + 'device': 'sda1', + 'port': 6200, + 'weight': 1.0, + }, + 'hashes': { + '061': { + None: '85b02a5283704292a511078a5c483da5', + 1: '0e6e8d48d801dc89fd31904ae3b31229', + }, + '3c1': { + 0: '0e6e8d48d801dc89fd31904ae3b31229', + None: '85b02a5283704292a511078a5c483da5', + 1: '0e6e8d48d801dc89fd31904ae3b31229', + }, + }, + }] ) # part num 2 @@ -493,12 +572,14 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'sync_to': [{ 'index': 0, 'replication_port': 6200, - 'zone': 2, - 'ip': '127.0.0.2', + 'zone': 1, + 'ip': '10.0.0.2', 'region': 1, 'port': 6200, - 'replication_ip': '127.0.0.2', - 'device': 'sda1', 'id': 2, + 'replication_ip': '10.0.0.2', + 'device': 'sda2', + 'id': 2, + 'weight': 1.0, }], 'job_type': object_reconstructor.REVERT, 'suffixes': ['061'], @@ -508,12 +589,13 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'local_dev': { 'replication_port': 6200, 'zone': 1, - 'ip': '127.0.0.1', + 'ip': '10.0.0.1', 'region': 1, 'id': 1, - 'replication_ip': '127.0.0.1', + 'replication_ip': '10.0.0.1', 'device': 'sda1', 'port': 6200, + 'weight': 1.0, }, 'hashes': { '061': { @@ -529,13 +611,14 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'sync_to': [{ 'index': 2, 'replication_port': 6200, - 'zone': 0, - 'ip': '127.0.0.0', + 'zone': 1, + 'ip': '10.0.0.0', 'region': 1, 'port': 6200, - 'replication_ip': '127.0.0.0', - 'device': 'sda1', - 'id': 0, + 'replication_ip': '10.0.0.0', + 'device': 'sda4', + 'id': 4, + 'weight': 1.0, }], 'job_type': object_reconstructor.REVERT, 'suffixes': ['3c1'], @@ -545,12 +628,13 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'local_dev': { 'replication_port': 6200, 'zone': 1, - 'ip': '127.0.0.1', + 'ip': '10.0.0.1', 'region': 1, 'id': 1, - 'replication_ip': '127.0.0.1', + 'replication_ip': '10.0.0.1', 'device': 'sda1', - 'port': 6200 + 'port': 6200, + 'weight': 1.0, }, 'hashes': { '061': { @@ -572,6 +656,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.fail('Unknown part number %r' % part_num) expected_by_part_frag_index = dict( ((j['partition'], j['frag_index']), j) for j in expected_jobs) + unexpected_jobs = [] for job in jobs: job_key = (job['partition'], job['frag_index']) if job_key in expected_by_part_frag_index: @@ -585,15 +670,17 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.assertEqual(value, expected_value) except AssertionError as e: extra_info = \ - '\n\n... for %r in part num %s job %r' % ( - k, part_num, job_key) + '\n\n... for %r in part num %s frag %s' % ( + k, part_num, job_key[1]) raise AssertionError(str(e) + extra_info) else: - self.fail( - 'Unexpected job %r for part num %s - ' - 'expected jobs where %r' % ( - job_key, part_num, - expected_by_part_frag_index.keys())) + unexpected_jobs.append(job) + if unexpected_jobs: + self.fail( + 'Unexpected jobs for frags %r in part num %s - ' + 'expected jobs for frags %r' % ( + [j['frag_index'] for j in unexpected_jobs], part_num, + [k[1] for k in expected_by_part_frag_index])) for expected_job in expected_jobs: if expected_job in jobs: jobs.remove(expected_job) @@ -601,22 +688,30 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): check_jobs(part_num) def _run_once(self, http_count, extra_devices, override_devices=None): - ring_devs = list(self.policy.object_ring.devs) + id_counter = itertools.count( + max(d['id'] for d in self.policy.object_ring.devs) + 1) for device, parts in extra_devices.items(): device_path = os.path.join(self.devices, device) os.mkdir(device_path) for part in range(parts): - os.makedirs(os.path.join(device_path, 'objects-1', str(part))) - # we update the ring to make is_local happy - devs = [dict(d) for d in ring_devs] - for d in devs: - d['device'] = device - self.policy.object_ring.devs.extend(devs) + hash_path = os.path.join( + device_path, 'objects-1', str(part), 'abc', 'hash') + os.makedirs(hash_path) + tombstone_file = utils.Timestamp(time.time()).internal + '.ts' + with open(os.path.join(hash_path, tombstone_file), 'w'): + pass + # use sda1 as a base to make is_local happy + new_device = dict(self.policy.object_ring.devs[1]) + new_device['device'] = device + new_device['id'] = next(id_counter) + self.policy.object_ring.devs.append(new_device) self.reconstructor.stats_interval = 0 self.process_job = lambda j: sleep(0) - with mocked_http_conn(*[200] * http_count, body=pickle.dumps({})): - with mock_ssync_sender(): + with mock_ssync_sender(), \ + mocked_http_conn(*[200] * http_count, + body=pickle.dumps({})) as request_log: self.reconstructor.run_once(devices=override_devices) + return request_log def test_run_once(self): # sda1: 3 is done in setup @@ -625,7 +720,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'sdc1': 1, 'sdd1': 0, } - self._run_once(18, extra_devices) + self._run_once(32, extra_devices) stats_lines = set() for line in self.logger.get_lines_for_level('info'): if 'reconstructed in' not in line: @@ -651,7 +746,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'sdc1': 1, 'sdd1': 0, } - self._run_once(2, extra_devices, 'sdc1') + self._run_once(3, extra_devices, 'sdc1') stats_lines = set() for line in self.logger.get_lines_for_level('info'): if 'reconstructed in' not in line: @@ -822,38 +917,87 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.assertIn('You should disable handoffs_only', msgs[-1]) def test_get_partners(self): - # we're going to perform an exhaustive test of every possible - # combination of partitions and nodes in our custom test ring + expected = ( + # node_index, part_nodes => partners + (0, [0, 1, 2, 3], [3, 1, 2]), + (0, [2, 3, 1, 0], [0, 3, 1]), + (0, [0, 1, 2, 3, 4], [4, 1, 2]), + (0, [0, 1, 2, 3, 4, 5], [5, 1, 3]), + (1, [0, 1, 2, 3, 4, 5], [0, 2, 4]), + (2, [0, 1, 2, 3, 4, 5], [1, 3, 5]), + (3, [0, 1, 2, 3, 4, 5], [2, 4, 0]), + (4, [0, 1, 2, 3, 4, 5], [3, 5, 1]), + (5, [0, 1, 2, 3, 4, 5], [4, 0, 2]), + (5, [1, 4, 0, 2, 3, 5], [3, 1, 0]), + ) + failures = [] + for frag_index, part_nodes, partners in expected: + sync_to = object_reconstructor._get_partners( + frag_index, part_nodes) + if partners != sync_to: + failures.append('Given nodes %r for index %s we expected ' + '%r but got %r' % ( + part_nodes, frag_index, partners, sync_to)) + if failures: + failures.insert(0, 'Some test scenarios failed:') + self.fail('\n'.join(failures)) - # format: [dev_id in question, 'part_num', - # [part_nodes for the given part], left id, right id...] - expected_partners = sorted([ - (0, '0', [0, 1, 2], 2, 1), (0, '2', [2, 3, 0], 3, 2), - (1, '0', [0, 1, 2], 0, 2), (1, '1', [1, 2, 3], 3, 2), - (2, '0', [0, 1, 2], 1, 0), (2, '1', [1, 2, 3], 1, 3), - (2, '2', [2, 3, 0], 0, 3), (3, '1', [1, 2, 3], 2, 1), - (3, '2', [2, 3, 0], 2, 0), (0, '0', [0, 1, 2], 2, 1), - (0, '2', [2, 3, 0], 3, 2), (1, '0', [0, 1, 2], 0, 2), - (1, '1', [1, 2, 3], 3, 2), (2, '0', [0, 1, 2], 1, 0), - (2, '1', [1, 2, 3], 1, 3), (2, '2', [2, 3, 0], 0, 3), - (3, '1', [1, 2, 3], 2, 1), (3, '2', [2, 3, 0], 2, 0), - ]) + def test_iter_nodes_for_frag(self): + # no limit + self.reconstructor.rebuild_handoff_node_count = -1 + policy = ECStoragePolicy(1, name='test', ec_type=DEFAULT_TEST_EC_TYPE, + ec_ndata=4, ec_nparity=3) + policy.object_ring = FabricatedRing(replicas=7, devices=28) + primaries = policy.object_ring.get_part_nodes(0) - got_partners = [] - for pol in POLICIES: - obj_ring = pol.object_ring - for part_num in self.part_nums: - part_nodes = obj_ring.get_part_nodes(int(part_num)) - primary_ids = [n['id'] for n in part_nodes] - for node in part_nodes: - partners = object_reconstructor._get_partners( - node['index'], part_nodes) - left = partners[0]['id'] - right = partners[1]['id'] - got_partners.append(( - node['id'], part_num, primary_ids, left, right)) + node = primaries[0] + nodes_for_frag = list(self.reconstructor._iter_nodes_for_frag( + policy, 0, node)) + expected = [0, 0, 7, 14] + self.assertEqual(expected, [n.get('index', n.get('handoff_index')) + for n in nodes_for_frag]) + for node in nodes_for_frag: + self.assertEqual(0, node['backend_index']) - self.assertEqual(expected_partners, sorted(got_partners)) + node = primaries[3] + nodes_for_frag = list(self.reconstructor._iter_nodes_for_frag( + policy, 0, node)) + expected = [3, 3, 10, 17] + self.assertEqual(expected, [n.get('index', n.get('handoff_index')) + for n in nodes_for_frag]) + for node in nodes_for_frag: + self.assertEqual(3, node['backend_index']) + + node = primaries[-1] + nodes_for_frag = list(self.reconstructor._iter_nodes_for_frag( + policy, 0, node)) + expected = [6, 6, 13, 20] + self.assertEqual(expected, [n.get('index', n.get('handoff_index')) + for n in nodes_for_frag]) + for node in nodes_for_frag: + self.assertEqual(6, node['backend_index']) + + # default limit is 2 + self.reconstructor.rebuild_handoff_node_count = 2 + node = primaries[0] + nodes_for_frag = list(self.reconstructor._iter_nodes_for_frag( + policy, 0, node)) + expected = [0, 0, 7] + self.assertEqual(expected, [n.get('index', n.get('handoff_index')) + for n in nodes_for_frag]) + for node in nodes_for_frag: + self.assertEqual(0, node['backend_index']) + + # zero means only primaries + self.reconstructor.rebuild_handoff_node_count = 0 + node = primaries[0] + nodes_for_frag = list(self.reconstructor._iter_nodes_for_frag( + policy, 0, node)) + expected = [0] + self.assertEqual(expected, [n.get('index', n.get('handoff_index')) + for n in nodes_for_frag]) + for node in nodes_for_frag: + self.assertEqual(0, node['backend_index']) def test_collect_parts(self): self.reconstructor._reset_stats() @@ -880,6 +1024,8 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.assertEqual(str(log_kwargs['exc_info'][1]), 'Ow!') def test_removes_zbf(self): + # suppress unmount warning + os.mkdir(os.path.join(self.devices, 'sda5')) # After running xfs_repair, a partition directory could become a # zero-byte file. If this happens, the reconstructor should clean it # up, log something, and move on to the next partition. @@ -927,6 +1073,9 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): if e.errno != 2: raise + # suppress unmount warning + os.mkdir(os.path.join(self.devices, 'sda5')) + # since our collect_parts job is a generator, that yields directly # into build_jobs and then spawns it's safe to do the remove_files # without making reconstructor startup slow @@ -996,40 +1145,46 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): # run reconstructor with delete function mocked out to check calls ssync_calls = [] - delete_func =\ - 'swift.obj.reconstructor.ObjectReconstructor.delete_reverted_objs' with mock.patch('swift.obj.reconstructor.ssync_sender', - self._make_fake_ssync(ssync_calls)): - with mocked_http_conn(*[200] * 12, body=pickle.dumps({})): - with mock.patch(delete_func) as mock_delete: - self.reconstructor.reconstruct() - expected_calls = [] - for context in ssync_calls: - if context['job']['job_type'] == REVERT: - for dirpath, files in visit_obj_dirs(context): - # sanity check - expect some files to be in dir, - # may not be for the reverted frag index - self.assertTrue(files) - n_files += len(files) - expected_calls.append(mock.call(context['job'], - context['available_map'], - context['node']['index'])) - mock_delete.assert_has_calls(expected_calls, any_order=True) + self._make_fake_ssync(ssync_calls)), \ + mocked_http_conn(*[200] * 17, body=pickle.dumps({})), \ + mock.patch.object( + self.reconstructor, 'delete_reverted_objs') as mock_delete: + self.reconstructor.reconstruct() + expected_calls = [] + for context in ssync_calls: + if context['job']['job_type'] == REVERT: + for dirpath, files in visit_obj_dirs(context): + # sanity check - expect some files to be in dir, + # may not be for the reverted frag index + self.assertTrue(files) + n_files += len(files) + expected_calls.append(mock.call(context['job'], + context['available_map'], + context['node']['index'])) + mock_delete.assert_has_calls(expected_calls, any_order=True) + # N.B. in this next test sequence we acctually delete files after + # revert, so the on-disk hashes can change. In partition 1, if the + # revert jobs (for frag_index 0 or 1) run before the sync job + # (frag_index 4) all suffixes will get removed and the sync job won't + # have anything to ship the remote (meaning there's no post-sync + # REPLICATE call). To keep the number of mocked_http_conn responses + # predictable we force a stable job order by mocking random's shuffle. ssync_calls = [] with mock.patch('swift.obj.reconstructor.ssync_sender', - self._make_fake_ssync(ssync_calls)): - with mocked_http_conn(*[200] * 12, body=pickle.dumps({})): - self.reconstructor.reconstruct() - for context in ssync_calls: - if context['job']['job_type'] == REVERT: - data_file_tail = ('#%s.data' - % context['node']['index']) - for dirpath, files in visit_obj_dirs(context): - n_files_after += len(files) - for filename in files: - self.assertFalse( - filename.endswith(data_file_tail)) + self._make_fake_ssync(ssync_calls)), \ + mocked_http_conn(*[200] * 17, body=pickle.dumps({})), \ + mock.patch('swift.obj.reconstructor.random.shuffle'): + self.reconstructor.reconstruct() + for context in ssync_calls: + if context['job']['job_type'] == REVERT: + data_file_tail = ('#%s.data' + % context['node']['index']) + for dirpath, files in visit_obj_dirs(context): + n_files_after += len(files) + for filename in files: + self.assertFalse(filename.endswith(data_file_tail)) # sanity check that some files should were deleted self.assertGreater(n_files, n_files_after) @@ -1037,6 +1192,8 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): def test_no_delete_failed_revert(self): # test will only process revert jobs self.reconstructor.handoffs_only = True + # suppress unmount warning + os.mkdir(os.path.join(self.devices, 'sda5')) captured_ssync = [] # fail all jobs on part 2 on sda1 @@ -1092,7 +1249,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): job = context['job'] expected_suffix_calls.append( (job['sync_to'][0]['replication_ip'], '/%s/%s/%s' % ( - job['device'], job['partition'], + job['sync_to'][0]['device'], job['partition'], '-'.join(sorted(job['suffixes'])))) ) self.assertEqual(set(expected_suffix_calls), @@ -1145,16 +1302,16 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self._make_fake_ssync(ssync_calls)): self.reconstructor.reconstruct(override_partitions=[2]) expected_repliate_calls = set([ - ('127.0.0.0', '/sda1/2/3c1'), - ('127.0.0.2', '/sda1/2/061'), + (u'10.0.0.0', '/sda4/2/3c1'), + (u'10.0.0.2', '/sda2/2/061'), ]) found_calls = set((r['ip'], r['path']) for r in request_log.requests) self.assertEqual(expected_repliate_calls, found_calls) expected_ssync_calls = sorted([ - ('127.0.0.0', REVERT, 2, ['3c1']), - ('127.0.0.2', REVERT, 2, ['061']), + (u'10.0.0.0', REVERT, 2, [u'3c1']), + (u'10.0.0.2', REVERT, 2, [u'061']), ]) self.assertEqual(expected_ssync_calls, sorted(( c['node']['ip'], @@ -1179,48 +1336,50 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.assertFalse(os.access(part_path, os.F_OK)) def test_process_job_all_success(self): + rehash_per_job_type = {SYNC: 2, REVERT: 1} self.reconstructor._reset_stats() with mock_ssync_sender(): - with mocked_http_conn(*[200] * 12, body=pickle.dumps({})): - found_jobs = [] - for part_info in self.reconstructor.collect_parts(): - jobs = self.reconstructor.build_reconstruction_jobs( - part_info) - found_jobs.extend(jobs) - for job in jobs: - self.logger._clear() - node_count = len(job['sync_to']) + found_jobs = [] + for part_info in self.reconstructor.collect_parts(): + jobs = self.reconstructor.build_reconstruction_jobs( + part_info) + found_jobs.extend(jobs) + for job in jobs: + self.logger._clear() + node_count = len(job['sync_to']) + rehash_count = node_count * rehash_per_job_type[ + job['job_type']] + with mocked_http_conn(*[200] * rehash_count, + body=pickle.dumps({})): self.reconstructor.process_job(job) - if job['job_type'] == object_reconstructor.REVERT: - self.assertEqual(0, count_stats( - self.logger, 'update_stats', 'suffix.hashes')) - else: - self.assertStatCount('update_stats', - 'suffix.hashes', - node_count) - self.assertEqual(node_count, count_stats( - self.logger, 'update_stats', 'suffix.hashes')) - self.assertEqual(node_count, count_stats( - self.logger, 'update_stats', 'suffix.syncs')) - self.assertNotIn('error', self.logger.all_log_lines()) + if job['job_type'] == object_reconstructor.REVERT: + self.assertStatCount('update_stats', + 'suffix.hashes', 0) + else: + self.assertStatCount('update_stats', + 'suffix.hashes', node_count) + self.assertStatCount('update_stats', + 'suffix.syncs', node_count) + self.assertNotIn('error', self.logger.all_log_lines()) self.assertEqual( - dict(collections.Counter( - (job['device'], job['partition'], job['frag_index']) - for job in found_jobs)), - {('sda1', 0, 1): 1, - ('sda1', 0, 2): 1, - ('sda1', 1, 0): 1, - ('sda1', 1, 1): 1, - ('sda1', 2, 0): 1, - ('sda1', 2, 2): 1}) - self.assertEqual(self.reconstructor.suffix_sync, 8) - self.assertEqual(self.reconstructor.suffix_count, 8) - self.assertEqual(self.reconstructor.reconstruction_count, 6) + dict(collections.Counter((job['device'], job['partition'], + job['frag_index'], job['job_type']) + for job in found_jobs)), + {('sda1', 0, 1, SYNC): 1, + ('sda1', 0, 2, REVERT): 1, + ('sda1', 1, 0, REVERT): 1, + ('sda1', 1, 1, REVERT): 1, + ('sda1', 1, 4, SYNC): 1, + ('sda1', 2, 0, REVERT): 1, + ('sda1', 2, 2, REVERT): 1}) + self.assertEqual(self.reconstructor.suffix_sync, 12) + self.assertEqual(self.reconstructor.suffix_count, 12) + self.assertEqual(self.reconstructor.reconstruction_count, 7) def test_process_job_all_insufficient_storage(self): self.reconstructor._reset_stats() with mock_ssync_sender(): - with mocked_http_conn(*[507] * 8): + with mocked_http_conn(*[507] * 15): found_jobs = [] for part_info in self.reconstructor.collect_parts(): jobs = self.reconstructor.build_reconstruction_jobs( @@ -1236,23 +1395,24 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.assertEqual(0, count_stats( self.logger, 'update_stats', 'suffix.syncs')) self.assertEqual( - dict(collections.Counter( - (job['device'], job['partition'], job['frag_index']) - for job in found_jobs)), - {('sda1', 0, 1): 1, - ('sda1', 0, 2): 1, - ('sda1', 1, 0): 1, - ('sda1', 1, 1): 1, - ('sda1', 2, 0): 1, - ('sda1', 2, 2): 1}) + dict(collections.Counter((job['device'], job['partition'], + job['frag_index'], job['job_type']) + for job in found_jobs)), + {('sda1', 0, 1, SYNC): 1, + ('sda1', 0, 2, REVERT): 1, + ('sda1', 1, 0, REVERT): 1, + ('sda1', 1, 1, REVERT): 1, + ('sda1', 1, 4, SYNC): 1, + ('sda1', 2, 0, REVERT): 1, + ('sda1', 2, 2, REVERT): 1}) self.assertEqual(self.reconstructor.suffix_sync, 0) self.assertEqual(self.reconstructor.suffix_count, 0) - self.assertEqual(self.reconstructor.reconstruction_count, 6) + self.assertEqual(self.reconstructor.reconstruction_count, 7) def test_process_job_all_client_error(self): self.reconstructor._reset_stats() with mock_ssync_sender(): - with mocked_http_conn(*[400] * 8): + with mocked_http_conn(*[400] * 11): found_jobs = [] for part_info in self.reconstructor.collect_parts(): jobs = self.reconstructor.build_reconstruction_jobs( @@ -1275,15 +1435,16 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): ('sda1', 0, 2): 1, ('sda1', 1, 0): 1, ('sda1', 1, 1): 1, + ('sda1', 1, 4): 1, ('sda1', 2, 0): 1, ('sda1', 2, 2): 1}) self.assertEqual(self.reconstructor.suffix_sync, 0) self.assertEqual(self.reconstructor.suffix_count, 0) - self.assertEqual(self.reconstructor.reconstruction_count, 6) + self.assertEqual(self.reconstructor.reconstruction_count, 7) def test_process_job_all_timeout(self): self.reconstructor._reset_stats() - with mock_ssync_sender(), mocked_http_conn(*[Timeout()] * 8): + with mock_ssync_sender(), mocked_http_conn(*[Timeout()] * 11): found_jobs = [] for part_info in self.reconstructor.collect_parts(): jobs = self.reconstructor.build_reconstruction_jobs( @@ -1306,11 +1467,12 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): ('sda1', 0, 2): 1, ('sda1', 1, 0): 1, ('sda1', 1, 1): 1, + ('sda1', 1, 4): 1, ('sda1', 2, 0): 1, ('sda1', 2, 2): 1}) self.assertEqual(self.reconstructor.suffix_sync, 0) self.assertEqual(self.reconstructor.suffix_count, 0) - self.assertEqual(self.reconstructor.reconstruction_count, 6) + self.assertEqual(self.reconstructor.reconstruction_count, 7) def test_reconstructor_skipped_partpower_increase(self): self.reconstructor._reset_stats() @@ -3133,7 +3295,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): self.assertEqual(job['job_type'], object_reconstructor.SYNC) self.assertEqual(job['frag_index'], 0) self.assertEqual(job['suffixes'], []) - self.assertEqual(len(job['sync_to']), 2) + self.assertEqual(len(job['sync_to']), 3) self.assertEqual(job['partition'], 0) self.assertEqual(job['path'], part_path) self.assertEqual(job['hashes'], {}) @@ -3165,7 +3327,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): self.assertEqual(job['job_type'], object_reconstructor.SYNC) self.assertEqual(job['frag_index'], 0) self.assertEqual(job['suffixes'], []) - self.assertEqual(len(job['sync_to']), 2) + self.assertEqual(len(job['sync_to']), 3) self.assertEqual(job['partition'], 0) self.assertEqual(job['path'], part_path) self.assertEqual(job['hashes'], {}) @@ -3210,7 +3372,9 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): self.assertEqual(job['suffixes'], stub_hashes.keys()) self.assertEqual(set([n['index'] for n in job['sync_to']]), set([(frag_index + 1) % ring.replicas, - (frag_index - 1) % ring.replicas])) + (frag_index - 1) % ring.replicas, + (frag_index + int(0.5 * ring.replicas)), + ])) self.assertEqual(job['partition'], partition) self.assertEqual(job['path'], part_path) self.assertEqual(job['hashes'], stub_hashes) @@ -3320,10 +3484,12 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): job = sync_jobs[0] self.assertEqual(job['frag_index'], frag_index) self.assertEqual(sorted(job['suffixes']), sorted(['123', 'abc'])) - self.assertEqual(len(job['sync_to']), 2) + self.assertEqual(len(job['sync_to']), 3) self.assertEqual(set([n['index'] for n in job['sync_to']]), set([(frag_index + 1) % ring.replicas, - (frag_index - 1) % ring.replicas])) + (frag_index - 1) % ring.replicas, + (frag_index + int(0.5 * ring.replicas)), + ])) self.assertEqual(1, len(revert_jobs)) job = revert_jobs[0] self.assertEqual(job['frag_index'], other_frag_index) @@ -3431,10 +3597,13 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): self.reconstructor.headers['X-Backend-Storage-Policy-Index'], int(job['policy'])) with mocked_http_conn(200, body=pickle.dumps({})) as request_log: - self.reconstructor._get_suffixes_to_sync(job, node) + suffixes, new_node = self.reconstructor._get_suffixes_to_sync( + job, node) self.assertEqual([int(job['policy'])], [ r['headers']['X-Backend-Storage-Policy-Index'] for r in request_log.requests]) + self.assertEqual(suffixes, []) + self.assertEqual(new_node, node) def test_get_suffixes_in_sync(self): part_path = os.path.join(self.devices, self.local_dev['device'], @@ -3464,10 +3633,12 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): with mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes', return_value=(None, local_hashes)), \ mocked_http_conn(200, body=remote_response) as request_log: - suffixes = self.reconstructor._get_suffixes_to_sync(job, node) + suffixes, new_node = self.reconstructor._get_suffixes_to_sync( + job, node) self.assertEqual([node['replication_ip']], [r['ip'] for r in request_log.requests]) self.assertEqual(suffixes, []) + self.assertEqual(new_node, node) def test_get_suffix_delta(self): # different @@ -3513,7 +3684,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): frag_index = self.policy.get_backend_index(local_dev['index']) sync_to = object_reconstructor._get_partners( local_dev['index'], part_nodes) - # setup left and right hashes + # setup left, right and far hashes stub_hashes = { '123': {frag_index: 'hash', None: 'hash'}, 'abc': {frag_index: 'hash', None: 'hash'}, @@ -3528,6 +3699,11 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): '123': {right_frag_index: 'hash', None: 'hash'}, 'abc': {right_frag_index: 'hash', None: 'hash'}, } + far_index = self.policy.get_backend_index(sync_to[2]['index']) + far_hashes = { + '123': {far_index: 'hash', None: 'hash'}, + 'abc': {far_index: 'hash', None: 'hash'}, + } partition = 0 part_path = os.path.join(self.devices, self.local_dev['device'], diskfile.get_data_dir(self.policy), @@ -3545,7 +3721,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): } responses = [(200, pickle.dumps(hashes)) for hashes in ( - left_hashes, right_hashes)] + left_hashes, right_hashes, far_hashes)] codes, body_iter = zip(*responses) ssync_calls = [] @@ -3556,13 +3732,13 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): mocked_http_conn(*codes, body_iter=body_iter) as request_log: self.reconstructor.process_job(job) - expected_suffix_calls = set([ + expected_suffix_calls = [ (sync_to[0]['ip'], '/%s/0' % sync_to[0]['device']), (sync_to[1]['ip'], '/%s/0' % sync_to[1]['device']), - ]) + (sync_to[2]['ip'], '/%s/0' % sync_to[2]['device']), + ] self.assertEqual(expected_suffix_calls, - set((r['ip'], r['path']) - for r in request_log.requests)) + [(r['ip'], r['path']) for r in request_log.requests]) self.assertFalse(ssync_calls) @@ -3580,6 +3756,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): } left_hashes = {} right_hashes = {} + far_hashes = {} partition = 0 part_path = os.path.join(self.devices, self.local_dev['device'], @@ -3597,8 +3774,9 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): 'local_dev': self.local_dev, } - responses = [(200, pickle.dumps(hashes)) for hashes in ( - left_hashes, left_hashes, right_hashes, right_hashes)] + responses = [] + for hashes in (left_hashes, right_hashes, far_hashes): + responses.extend([(200, pickle.dumps(hashes))] * 2) codes, body_iter = zip(*responses) ssync_calls = [] @@ -3608,19 +3786,21 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): mocked_http_conn(*codes, body_iter=body_iter) as request_log: self.reconstructor.process_job(job) - expected_suffix_calls = set([ + expected_suffix_calls = [ (sync_to[0]['ip'], '/%s/0' % sync_to[0]['device']), (sync_to[0]['ip'], '/%s/0/123-abc' % sync_to[0]['device']), (sync_to[1]['ip'], '/%s/0' % sync_to[1]['device']), (sync_to[1]['ip'], '/%s/0/123-abc' % sync_to[1]['device']), - ]) + (sync_to[2]['ip'], '/%s/0' % sync_to[2]['device']), + (sync_to[2]['ip'], '/%s/0/123-abc' % sync_to[2]['device']), + ] self.assertEqual(expected_suffix_calls, - set((r['ip'], r['path']) - for r in request_log.requests)) + [(r['ip'], r['path']) for r in request_log.requests]) expected_ssync_calls = sorted([ (sync_to[0]['ip'], 0, set(['123', 'abc'])), (sync_to[1]['ip'], 0, set(['123', 'abc'])), + (sync_to[2]['ip'], 0, set(['123', 'abc'])), ]) self.assertEqual(expected_ssync_calls, sorted(( c['node']['ip'], @@ -3656,6 +3836,11 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): '123': {right_frag_index: 'hash', None: 'hash'}, 'abc': {right_frag_index: 'hashX', None: 'hash'}, } + far_index = self.policy.get_backend_index(sync_to[2]['index']) + far_hashes = { + '123': {far_index: 'hash', None: 'hash'}, + 'abc': {far_index: 'hash', None: 'hash'}, + } job = { 'job_type': object_reconstructor.SYNC, @@ -3674,6 +3859,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): (200, pickle.dumps(left_hashes)), (200, pickle.dumps(right_hashes)), (200, pickle.dumps(right_hashes)), + (200, pickle.dumps(far_hashes)), ] codes, body_iter = zip(*responses) @@ -3739,6 +3925,12 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): 'abc': {right_frag_index: 'hash', None: 'different-because-durable'}, } + # far side is in sync + far_index = self.policy.get_backend_index(sync_to[2]['index']) + far_hashes = { + '123': {far_index: 'hash', None: 'hash'}, + 'abc': {far_index: 'hash', None: 'hash'}, + } part_path = os.path.join(self.devices, self.local_dev['device'], diskfile.get_data_dir(self.policy), @@ -3756,7 +3948,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): } responses = [(200, pickle.dumps(hashes)) for hashes in ( - left_hashes, right_hashes, right_hashes)] + left_hashes, right_hashes, right_hashes, far_hashes)] codes, body_iter = zip(*responses) ssync_calls = [] @@ -3770,6 +3962,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): (sync_to[0]['ip'], '/%s/0' % sync_to[0]['device']), (sync_to[1]['ip'], '/%s/0' % sync_to[1]['device']), (sync_to[1]['ip'], '/%s/0/abc' % sync_to[1]['device']), + (sync_to[2]['ip'], '/%s/0' % sync_to[2]['device']), ]) self.assertEqual(expected_suffix_calls, set((r['ip'], r['path']) @@ -3805,6 +3998,10 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): right_hashes = { '123': {right_frag_index: 'hash', None: 'hash'}, } + far_index = self.policy.get_backend_index(sync_to[2]['index']) + far_hashes = { + 'abc': {far_index: 'hashX', None: 'hash'}, + } part_path = os.path.join(self.devices, self.local_dev['device'], diskfile.get_data_dir(self.policy), str(partition)) @@ -3820,8 +4017,9 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): 'local_dev': self.local_dev, } - responses = [(200, pickle.dumps(hashes)) for hashes in ( - left_hashes, left_hashes, right_hashes, right_hashes)] + responses = [] + for hashes in (left_hashes, right_hashes, far_hashes): + responses.extend([(200, pickle.dumps(hashes))] * 2) codes, body_iter = zip(*responses) ssync_calls = [] @@ -3837,6 +4035,8 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): (sync_to[0]['ip'], '/%s/0/123' % sync_to[0]['device']), (sync_to[1]['ip'], '/%s/0' % sync_to[1]['device']), (sync_to[1]['ip'], '/%s/0/abc' % sync_to[1]['device']), + (sync_to[2]['ip'], '/%s/0' % sync_to[2]['device']), + (sync_to[2]['ip'], '/%s/0/123-abc' % sync_to[2]['device']), ]) self.assertEqual(expected_suffix_calls, set((r['ip'], r['path']) @@ -3844,10 +4044,12 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): self.assertEqual( dict(collections.Counter( - (c['node']['index'], tuple(c['suffixes'])) + (c['node']['index'], tuple(sorted(c['suffixes']))) for c in ssync_calls)), - {(sync_to[0]['index'], ('123', )): 1, - (sync_to[1]['index'], ('abc', )): 1}) + {(sync_to[0]['index'], ('123',)): 1, + (sync_to[1]['index'], ('abc',)): 1, + (sync_to[2]['index'], ('123', 'abc')): 1, + }) def test_process_job_primary_down(self): partition = 0 @@ -3859,7 +4061,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): } part_nodes = self.policy.object_ring.get_part_nodes(partition) - sync_to = part_nodes[:2] + sync_to = part_nodes[:3] part_path = os.path.join(self.devices, self.local_dev['device'], diskfile.get_data_dir(self.policy), @@ -3948,9 +4150,9 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): expected_suffix_calls = set(( node['replication_ip'], '/%s/0' % node['device'] - ) for node in part_nodes) + ) for node in sync_to) - possible_errors = [404, 507, Timeout(), Exception('kaboom!')] + possible_errors = [404, Timeout(), Exception('kaboom!')] codes = [random.choice(possible_errors) for r in expected_suffix_calls] @@ -3967,6 +4169,86 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): self.assertFalse(ssync_calls) + def test_process_job_sync_partner_unmounted(self): + partition = 0 + part_nodes = self.policy.object_ring.get_part_nodes(partition) + frag_index = [n['id'] for n in part_nodes].index(self.local_dev['id']) + sync_to = object_reconstructor._get_partners(frag_index, part_nodes) + self.assertEqual(3, len(sync_to)) + stub_hashes = { + '123': {frag_index: 'hash', None: 'hash'}, + 'abc': {frag_index: 'hash', None: 'hash'}, + } + # left partner out of sync + left_frag_index = self.policy.get_backend_index(sync_to[0]['index']) + left_hashes = { + '123': {left_frag_index: 'not-in-sync-hash', None: 'hash'}, + 'abc': {left_frag_index: 'hash', None: 'hash'}, + } + # we don't need right partner hashes + # far partner in sync + far_index = self.policy.get_backend_index(sync_to[2]['index']) + far_hashes = { + '123': {far_index: 'hash', None: 'hash'}, + 'abc': {far_index: 'hash', None: 'hash'}, + } + part_path = os.path.join(self.devices, self.local_dev['device'], + diskfile.get_data_dir(self.policy), + str(partition)) + job = { + 'job_type': object_reconstructor.SYNC, + 'frag_index': frag_index, + 'suffixes': stub_hashes.keys(), + 'sync_to': sync_to, + 'partition': partition, + 'path': part_path, + 'hashes': stub_hashes, + 'policy': self.policy, + 'device': self.local_dev['device'], + 'local_dev': self.local_dev, + } + + responses = [ + (200, pickle.dumps(left_hashes)), # hashes left partner + (200, pickle.dumps(left_hashes)), # hashes post-sync + (507, ''), # unmounted right partner + (200, pickle.dumps({})), # hashes handoff + (200, ''), # hashes post-sync + (200, pickle.dumps(far_hashes)), # hashes far partner + ] + codes, body_iter = zip(*responses) + + ssync_calls = [] + with mock_ssync_sender(ssync_calls), \ + mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes', + return_value=(None, stub_hashes)), \ + mocked_http_conn(*codes, body_iter=body_iter) as request_log: + self.reconstructor.process_job(job) + # increment frag_index since we're rebuilding to our right + frag_index = (frag_index + 1) % self.policy.ec_n_unique_fragments + handoffs = self.policy.object_ring.get_more_nodes(partition) + for i, handoff in enumerate(handoffs): + if i == frag_index: + break + else: + self.fail('Unable to find handoff?!') + expected = collections.Counter([ + (200, sync_to[0]['ip']), + (200, sync_to[0]['ip']), + (507, sync_to[1]['ip']), + (200, handoff['ip']), + (200, handoff['ip']), + (200, sync_to[2]['ip']), + ]) + self.assertEqual(expected, collections.Counter( + [(c, r['ip']) for c, r in zip(codes, request_log.requests)])) + expected = collections.Counter([ + sync_to[0]['ip'], + handoff['ip'], + ]) + self.assertEqual(expected, collections.Counter( + [c['node']['ip'] for c in ssync_calls])) + def test_process_job_handoff(self): frag_index = random.randint( 0, self.policy.ec_n_unique_fragments - 1) @@ -5092,6 +5374,50 @@ class TestObjectReconstructorECDuplicationFactor(TestObjectReconstructor): for index in range(28): self._test_reconstruct_with_duplicate_frags_no_errors(index) + def test_iter_nodes_for_frag(self): + self.reconstructor.rebuild_handoff_node_count = -1 + policy = ECStoragePolicy(1, name='test', ec_type=DEFAULT_TEST_EC_TYPE, + ec_ndata=4, ec_nparity=3, + ec_duplication_factor=2) + policy.object_ring = FabricatedRing(replicas=14, devices=42) + primaries = policy.object_ring.get_part_nodes(0) + + node = primaries[0] + nodes_for_frag = list(self.reconstructor._iter_nodes_for_frag( + policy, 0, node)) + expected = [0, 0, 7, 14, 21] + self.assertEqual(expected, [n.get('index', n.get('handoff_index')) + for n in nodes_for_frag]) + for node in nodes_for_frag: + self.assertEqual(0, node['backend_index']) + + node = primaries[3] + nodes_for_frag = list(self.reconstructor._iter_nodes_for_frag( + policy, 0, node)) + expected = [3, 3, 10, 17, 24] + self.assertEqual(expected, [n.get('index', n.get('handoff_index')) + for n in nodes_for_frag]) + for node in nodes_for_frag: + self.assertEqual(3, node['backend_index']) + + node = primaries[7] + nodes_for_frag = list(self.reconstructor._iter_nodes_for_frag( + policy, 0, node)) + expected = [7, 0, 7, 14, 21] + self.assertEqual(expected, [n.get('index', n.get('handoff_index')) + for n in nodes_for_frag]) + for node in nodes_for_frag: + self.assertEqual(0, node['backend_index']) + + node = primaries[-1] + nodes_for_frag = list(self.reconstructor._iter_nodes_for_frag( + policy, 0, node)) + expected = [13, 6, 13, 20, 27] + self.assertEqual(expected, [n.get('index', n.get('handoff_index')) + for n in nodes_for_frag]) + for node in nodes_for_frag: + self.assertEqual(6, node['backend_index']) + if __name__ == '__main__': unittest.main()