Exclude local_dev from sync partners on failure
If the primary left or right hand partners are down, the next best thing is to validate the rest of the primary nodes. Where the rest should exclude not just the left and right hand partners - but ourself as well. This fixes a accidental noop when partner node is unavailable and another node is missing data. Validation: Add probetests to cover ssync failures for the primary sync_to nodes for sync jobs. Drive-by: Make additional plumbing for the check_mount and check_dir constraints into the remaining daemons. Change-Id: I4d1c047106c242bca85c94b569d98fd59bb255f4
This commit is contained in:
parent
843236a635
commit
a3559edc23
@ -29,8 +29,8 @@ from eventlet.support.greenlets import GreenletExit
|
||||
from swift import gettext_ as _
|
||||
from swift.common.utils import (
|
||||
whataremyips, unlink_older_than, compute_eta, get_logger,
|
||||
dump_recon_cache, ismount, mkdirs, config_true_value, list_from_csv,
|
||||
get_hub, tpool_reraise, GreenAsyncPile, Timestamp, remove_file)
|
||||
dump_recon_cache, mkdirs, config_true_value, list_from_csv, get_hub,
|
||||
tpool_reraise, GreenAsyncPile, Timestamp, remove_file)
|
||||
from swift.common.swob import HeaderKeyDict
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.daemon import Daemon
|
||||
@ -569,9 +569,12 @@ class ObjectReconstructor(Daemon):
|
||||
job['sync_to'],
|
||||
# I think we could order these based on our index to better
|
||||
# protect against a broken chain
|
||||
itertools.ifilter(
|
||||
lambda n: n['id'] not in (n['id'] for n in job['sync_to']),
|
||||
job['policy'].object_ring.get_part_nodes(job['partition'])),
|
||||
[
|
||||
n for n in
|
||||
job['policy'].object_ring.get_part_nodes(job['partition'])
|
||||
if n['id'] != job['local_dev']['id'] and
|
||||
n['id'] not in (m['id'] for m in job['sync_to'])
|
||||
],
|
||||
)
|
||||
syncd_with = 0
|
||||
for node in dest_nodes:
|
||||
@ -777,13 +780,14 @@ class ObjectReconstructor(Daemon):
|
||||
if override_devices and (local_dev['device'] not in
|
||||
override_devices):
|
||||
continue
|
||||
dev_path = join(self.devices_dir, local_dev['device'])
|
||||
obj_path = join(dev_path, data_dir)
|
||||
tmp_path = join(dev_path, get_tmp_dir(int(policy)))
|
||||
if self.mount_check and not ismount(dev_path):
|
||||
dev_path = self._df_router[policy].get_dev_path(
|
||||
local_dev['device'])
|
||||
if not dev_path:
|
||||
self.logger.warn(_('%s is not mounted'),
|
||||
local_dev['device'])
|
||||
continue
|
||||
obj_path = join(dev_path, data_dir)
|
||||
tmp_path = join(dev_path, get_tmp_dir(int(policy)))
|
||||
unlink_older_than(tmp_path, time.time() -
|
||||
self.reclaim_age)
|
||||
if not os.path.exists(obj_path):
|
||||
|
@ -19,7 +19,6 @@ import eventlet
|
||||
import eventlet.wsgi
|
||||
import eventlet.greenio
|
||||
|
||||
from swift.common import constraints
|
||||
from swift.common import exceptions
|
||||
from swift.common import http
|
||||
from swift.common import swob
|
||||
@ -176,8 +175,7 @@ class Receiver(object):
|
||||
self.frag_index = None
|
||||
utils.validate_device_partition(self.device, self.partition)
|
||||
self.diskfile_mgr = self.app._diskfile_router[self.policy]
|
||||
if self.diskfile_mgr.mount_check and not constraints.check_mount(
|
||||
self.diskfile_mgr.devices, self.device):
|
||||
if not self.diskfile_mgr.get_dev_path(self.device):
|
||||
raise swob.HTTPInsufficientStorage(drive=self.device)
|
||||
self.fp = self.request.environ['wsgi.input']
|
||||
for data in self._ensure_flush():
|
||||
|
@ -26,7 +26,7 @@ from swiftclient import get_auth, head_account
|
||||
|
||||
from swift.obj.diskfile import get_data_dir
|
||||
from swift.common.ring import Ring
|
||||
from swift.common.utils import readconf
|
||||
from swift.common.utils import readconf, renamer
|
||||
from swift.common.manager import Manager
|
||||
from swift.common.storage_policy import POLICIES, EC_POLICY, REPL_POLICY
|
||||
|
||||
@ -314,6 +314,19 @@ class ProbeTest(unittest.TestCase):
|
||||
self.updaters.once()
|
||||
self.replicators.once()
|
||||
|
||||
def kill_drive(self, device):
|
||||
if os.path.ismount(device):
|
||||
os.system('sudo umount %s' % device)
|
||||
else:
|
||||
renamer(device, device + "X")
|
||||
|
||||
def revive_drive(self, device):
|
||||
disabled_name = device + "X"
|
||||
if os.path.isdir(disabled_name):
|
||||
renamer(device + "X", device)
|
||||
else:
|
||||
os.system('sudo mount %s' % device)
|
||||
|
||||
|
||||
class ReplProbeTest(ProbeTest):
|
||||
|
||||
|
@ -19,12 +19,14 @@ import unittest
|
||||
import uuid
|
||||
import shutil
|
||||
import random
|
||||
from collections import defaultdict
|
||||
|
||||
from test.probe.common import ECProbeTest
|
||||
|
||||
from swift.common import direct_client
|
||||
from swift.common.storage_policy import EC_POLICY
|
||||
from swift.common.manager import Manager
|
||||
from swift.obj.reconstructor import _get_partners
|
||||
|
||||
from swiftclient import client
|
||||
|
||||
@ -165,6 +167,61 @@ class TestReconstructorRebuild(ECProbeTest):
|
||||
self._format_node(onode),
|
||||
[self._format_node(n) for n in node_list]))
|
||||
|
||||
def test_rebuild_partner_down(self):
|
||||
# create EC container
|
||||
headers = {'X-Storage-Policy': self.policy.name}
|
||||
client.put_container(self.url, self.token, self.container_name,
|
||||
headers=headers)
|
||||
|
||||
# PUT object
|
||||
contents = Body()
|
||||
client.put_object(self.url, self.token,
|
||||
self.container_name,
|
||||
self.object_name,
|
||||
contents=contents)
|
||||
|
||||
opart, onodes = self.object_ring.get_nodes(
|
||||
self.account, self.container_name, self.object_name)
|
||||
|
||||
# find a primary server that only has one of it's devices in the
|
||||
# primary node list
|
||||
group_nodes_by_config = defaultdict(list)
|
||||
for n in onodes:
|
||||
group_nodes_by_config[self.config_number(n)].append(n)
|
||||
for config_number, node_list in group_nodes_by_config.items():
|
||||
if len(node_list) == 1:
|
||||
break
|
||||
else:
|
||||
self.fail('ring balancing did not use all available nodes')
|
||||
primary_node = node_list[0]
|
||||
|
||||
# pick one it's partners to fail randomly
|
||||
partner_node = random.choice(_get_partners(
|
||||
primary_node['index'], onodes))
|
||||
|
||||
# 507 the partner device
|
||||
device_path = self.device_dir('object', partner_node)
|
||||
self.kill_drive(device_path)
|
||||
|
||||
# select another primary sync_to node to fail
|
||||
failed_primary = [n for n in onodes if n['id'] not in
|
||||
(primary_node['id'], partner_node['id'])][0]
|
||||
# ... capture it's fragment etag
|
||||
failed_primary_etag = self.direct_get(failed_primary, opart)
|
||||
# ... and delete it
|
||||
part_dir = self.storage_dir('object', failed_primary, part=opart)
|
||||
shutil.rmtree(part_dir, True)
|
||||
|
||||
# reconstruct from the primary, while one of it's partners is 507'd
|
||||
self.reconstructor.once(number=self.config_number(primary_node))
|
||||
|
||||
# the other failed primary will get it's fragment rebuilt instead
|
||||
self.assertEqual(failed_primary_etag,
|
||||
self.direct_get(failed_primary, opart))
|
||||
|
||||
# just to be nice
|
||||
self.revive_drive(device_path)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
@ -17,7 +17,6 @@
|
||||
from hashlib import md5
|
||||
import unittest
|
||||
import uuid
|
||||
import os
|
||||
import random
|
||||
import shutil
|
||||
from collections import defaultdict
|
||||
@ -27,7 +26,6 @@ from test.probe.common import ECProbeTest
|
||||
from swift.common import direct_client
|
||||
from swift.common.storage_policy import EC_POLICY
|
||||
from swift.common.manager import Manager
|
||||
from swift.common.utils import renamer
|
||||
from swift.obj import reconstructor
|
||||
|
||||
from swiftclient import client
|
||||
@ -70,19 +68,6 @@ class TestReconstructorRevert(ECProbeTest):
|
||||
self.assertEqual(self.policy.policy_type, EC_POLICY)
|
||||
self.reconstructor = Manager(["object-reconstructor"])
|
||||
|
||||
def kill_drive(self, device):
|
||||
if os.path.ismount(device):
|
||||
os.system('sudo umount %s' % device)
|
||||
else:
|
||||
renamer(device, device + "X")
|
||||
|
||||
def revive_drive(self, device):
|
||||
disabled_name = device + "X"
|
||||
if os.path.isdir(disabled_name):
|
||||
renamer(device + "X", device)
|
||||
else:
|
||||
os.system('sudo mount %s' % device)
|
||||
|
||||
def proxy_get(self):
|
||||
# GET object
|
||||
headers, body = client.get_object(self.url, self.token,
|
||||
@ -277,6 +262,8 @@ class TestReconstructorRevert(ECProbeTest):
|
||||
else:
|
||||
self.fail('ring balancing did not use all available nodes')
|
||||
primary_node = node_list[0]
|
||||
|
||||
# ... and 507 it's device
|
||||
primary_device = self.device_dir('object', primary_node)
|
||||
self.kill_drive(primary_device)
|
||||
|
||||
|
@ -932,7 +932,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
def test_process_job_all_insufficient_storage(self):
|
||||
self.reconstructor._reset_stats()
|
||||
with mock_ssync_sender():
|
||||
with mocked_http_conn(*[507] * 10):
|
||||
with mocked_http_conn(*[507] * 8):
|
||||
found_jobs = []
|
||||
for part_info in self.reconstructor.collect_parts():
|
||||
jobs = self.reconstructor.build_reconstruction_jobs(
|
||||
@ -954,7 +954,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
def test_process_job_all_client_error(self):
|
||||
self.reconstructor._reset_stats()
|
||||
with mock_ssync_sender():
|
||||
with mocked_http_conn(*[400] * 10):
|
||||
with mocked_http_conn(*[400] * 8):
|
||||
found_jobs = []
|
||||
for part_info in self.reconstructor.collect_parts():
|
||||
jobs = self.reconstructor.build_reconstruction_jobs(
|
||||
@ -976,7 +976,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
def test_process_job_all_timeout(self):
|
||||
self.reconstructor._reset_stats()
|
||||
with mock_ssync_sender():
|
||||
with nested(mocked_http_conn(*[Timeout()] * 10)):
|
||||
with nested(mocked_http_conn(*[Timeout()] * 8)):
|
||||
found_jobs = []
|
||||
for part_info in self.reconstructor.collect_parts():
|
||||
jobs = self.reconstructor.build_reconstruction_jobs(
|
||||
@ -1012,6 +1012,13 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
'bind_port': self.port,
|
||||
}
|
||||
self.logger = debug_logger('object-reconstructor')
|
||||
self._configure_reconstructor()
|
||||
self.policy.object_ring.max_more_nodes = \
|
||||
self.policy.object_ring.replicas
|
||||
self.ts_iter = make_timestamp_iter()
|
||||
|
||||
def _configure_reconstructor(self, **kwargs):
|
||||
self.conf.update(kwargs)
|
||||
self.reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
self.conf, logger=self.logger)
|
||||
self.reconstructor._reset_stats()
|
||||
@ -1019,9 +1026,6 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
# directly, so you end up with a /0 when you try to show the
|
||||
# percentage of complete jobs as ratio of the total job count
|
||||
self.reconstructor.job_count = 1
|
||||
self.policy.object_ring.max_more_nodes = \
|
||||
self.policy.object_ring.replicas
|
||||
self.ts_iter = make_timestamp_iter()
|
||||
|
||||
def tearDown(self):
|
||||
self.reconstructor.stats_line()
|
||||
@ -1115,16 +1119,16 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
|
||||
paths = []
|
||||
|
||||
def fake_ismount(path):
|
||||
paths.append(path)
|
||||
def fake_check_mount(devices, device):
|
||||
paths.append(os.path.join(devices, device))
|
||||
return False
|
||||
|
||||
with nested(mock.patch('swift.obj.reconstructor.whataremyips',
|
||||
return_value=[self.ip]),
|
||||
mock.patch.object(self.policy.object_ring, '_devs',
|
||||
new=stub_ring_devs),
|
||||
mock.patch('swift.obj.reconstructor.ismount',
|
||||
fake_ismount)):
|
||||
mock.patch('swift.obj.diskfile.check_mount',
|
||||
fake_check_mount)):
|
||||
part_infos = list(self.reconstructor.collect_parts())
|
||||
self.assertEqual(2, len(part_infos)) # sanity, same jobs
|
||||
self.assertEqual(set(int(p['partition']) for p in part_infos),
|
||||
@ -1134,13 +1138,16 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
self.assertEqual(paths, [])
|
||||
|
||||
# ... now with mount check
|
||||
self.reconstructor.mount_check = True
|
||||
self._configure_reconstructor(mount_check=True)
|
||||
self.assertTrue(self.reconstructor.mount_check)
|
||||
for policy in POLICIES:
|
||||
self.assertTrue(self.reconstructor._df_router[policy].mount_check)
|
||||
with nested(mock.patch('swift.obj.reconstructor.whataremyips',
|
||||
return_value=[self.ip]),
|
||||
mock.patch.object(self.policy.object_ring, '_devs',
|
||||
new=stub_ring_devs),
|
||||
mock.patch('swift.obj.reconstructor.ismount',
|
||||
fake_ismount)):
|
||||
mock.patch('swift.obj.diskfile.check_mount',
|
||||
fake_check_mount)):
|
||||
part_infos = list(self.reconstructor.collect_parts())
|
||||
self.assertEqual([], part_infos) # sanity, no jobs
|
||||
|
||||
@ -1148,7 +1155,8 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
self.assertEqual(set(paths), set([
|
||||
os.path.join(self.devices, dev) for dev in local_devs]))
|
||||
|
||||
def fake_ismount(path):
|
||||
def fake_check_mount(devices, device):
|
||||
path = os.path.join(devices, device)
|
||||
if path.endswith('sda'):
|
||||
return True
|
||||
else:
|
||||
@ -1158,8 +1166,8 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
return_value=[self.ip]),
|
||||
mock.patch.object(self.policy.object_ring, '_devs',
|
||||
new=stub_ring_devs),
|
||||
mock.patch('swift.obj.reconstructor.ismount',
|
||||
fake_ismount)):
|
||||
mock.patch('swift.obj.diskfile.check_mount',
|
||||
fake_check_mount)):
|
||||
part_infos = list(self.reconstructor.collect_parts())
|
||||
self.assertEqual(1, len(part_infos)) # only sda picked up (part 0)
|
||||
self.assertEqual(part_infos[0]['partition'], 0)
|
||||
@ -1171,6 +1179,8 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
'replication_ip': self.ip,
|
||||
'replication_port': self.port
|
||||
} for dev in local_devs]
|
||||
for device in local_devs:
|
||||
utils.mkdirs(os.path.join(self.devices, device))
|
||||
fake_unlink = mock.MagicMock()
|
||||
self.reconstructor.reclaim_age = 1000
|
||||
now = time.time()
|
||||
|
@ -23,7 +23,6 @@ import unittest
|
||||
import eventlet
|
||||
import mock
|
||||
|
||||
from swift.common import constraints
|
||||
from swift.common import exceptions
|
||||
from swift.common import swob
|
||||
from swift.common import utils
|
||||
@ -53,6 +52,7 @@ class TestReceiver(unittest.TestCase):
|
||||
'mount_check': 'false',
|
||||
'replication_one_per_device': 'false',
|
||||
'log_requests': 'false'}
|
||||
utils.mkdirs(os.path.join(self.testdir, 'device', 'partition'))
|
||||
self.controller = server.ObjectController(self.conf)
|
||||
self.controller.bytes_per_sync = 1
|
||||
|
||||
@ -285,8 +285,8 @@ class TestReceiver(unittest.TestCase):
|
||||
mock.patch.object(
|
||||
self.controller._diskfile_router[POLICIES.legacy],
|
||||
'mount_check', False),
|
||||
mock.patch.object(
|
||||
constraints, 'check_mount', return_value=False)) as (
|
||||
mock.patch('swift.obj.diskfile.check_mount',
|
||||
return_value=False)) as (
|
||||
mocked_replication_semaphore,
|
||||
mocked_mount_check,
|
||||
mocked_check_mount):
|
||||
@ -305,8 +305,8 @@ class TestReceiver(unittest.TestCase):
|
||||
mock.patch.object(
|
||||
self.controller._diskfile_router[POLICIES.legacy],
|
||||
'mount_check', True),
|
||||
mock.patch.object(
|
||||
constraints, 'check_mount', return_value=False)) as (
|
||||
mock.patch('swift.obj.diskfile.check_mount',
|
||||
return_value=False)) as (
|
||||
mocked_replication_semaphore,
|
||||
mocked_mount_check,
|
||||
mocked_check_mount):
|
||||
|
Loading…
x
Reference in New Issue
Block a user