Merge "Improved container-sync resiliency"
This commit is contained in:
commit
85b7346808
@ -223,7 +223,7 @@ hash of the object name, so it's not always guaranteed to be exactly
|
||||
one out of every three rows, but it usually gets close. For the sake
|
||||
of example, let's say that this process ends up owning rows 2 and 5.
|
||||
|
||||
Once it's finished syncing those rows, it updates SP1 to be the
|
||||
Once it's finished trying to sync those rows, it updates SP1 to be the
|
||||
biggest row-id that it's seen, which is 6 in this example. ::
|
||||
|
||||
SP2 SP1
|
||||
@ -241,19 +241,23 @@ container, creating new rows in the database. ::
|
||||
|
||||
On the next run, the container-sync starts off looking at rows with
|
||||
ids between SP1 and SP2. This time, there are a bunch of them. The
|
||||
sync process takes the ones it *does not* own and syncs them. Again,
|
||||
this is based on the hashes, so this will be everything it didn't sync
|
||||
before. In this example, that's rows 0, 1, 3, 4, and 6.
|
||||
sync process try to sync all of them. If it succeeds, it will set
|
||||
SP2 to equal SP1. If it fails, it will set SP2 to the failed object
|
||||
and will continue to try all other objects till SP1, setting SP2 to
|
||||
the first object that failed.
|
||||
|
||||
Under normal circumstances, the container-sync processes for the other
|
||||
replicas will have already taken care of synchronizing those rows, so
|
||||
this is a set of quick checks. However, if one of those other sync
|
||||
Under normal circumstances, the container-sync processes
|
||||
will have already taken care of synchronizing all rows, between SP1
|
||||
and SP2, resulting in a set of quick checks.
|
||||
However, if one of the sync
|
||||
processes failed for some reason, then this is a vital fallback to
|
||||
make sure all the objects in the container get synchronized. Without
|
||||
this seemingly-redundant work, any container-sync failure results in
|
||||
unsynchronized objects.
|
||||
unsynchronized objects. Note that the container sync will persistently
|
||||
retry to sync any faulty object until success, while logging each failure.
|
||||
|
||||
Once it's done with the fallback rows, SP2 is advanced to SP1. ::
|
||||
Once it's done with the fallback rows, and assuming no faults occured,
|
||||
SP2 is advanced to SP1. ::
|
||||
|
||||
SP2
|
||||
SP1
|
||||
|
@ -286,6 +286,7 @@ class ContainerSync(Daemon):
|
||||
self.logger.increment('failures')
|
||||
return
|
||||
stop_at = time() + self.container_time
|
||||
next_sync_point = None
|
||||
while time() < stop_at and sync_point2 < sync_point1:
|
||||
rows = broker.get_items_since(sync_point2, 1)
|
||||
if not rows:
|
||||
@ -296,16 +297,19 @@ class ContainerSync(Daemon):
|
||||
key = hash_path(info['account'], info['container'],
|
||||
row['name'], raw_digest=True)
|
||||
# This node will only initially sync out one third of the
|
||||
# objects (if 3 replicas, 1/4 if 4, etc.). This section
|
||||
# will attempt to sync previously skipped rows in case the
|
||||
# other nodes didn't succeed.
|
||||
if unpack_from('>I', key)[0] % \
|
||||
len(nodes) != ordinal:
|
||||
if not self.container_sync_row(row, sync_to, sync_key,
|
||||
broker, info):
|
||||
return
|
||||
# objects (if 3 replicas, 1/4 if 4, etc.) and will skip
|
||||
# problematic rows as needed in case of faults.
|
||||
# This section will attempt to sync previously skipped
|
||||
# rows in case the previous attempts by any of the nodes
|
||||
# didn't succeed.
|
||||
if not self.container_sync_row(row, sync_to, sync_key,
|
||||
broker, info):
|
||||
if not next_sync_point:
|
||||
next_sync_point = sync_point2
|
||||
sync_point2 = row['ROWID']
|
||||
broker.set_x_container_sync_points(None, sync_point2)
|
||||
if next_sync_point:
|
||||
broker.set_x_container_sync_points(None, next_sync_point)
|
||||
while time() < stop_at:
|
||||
rows = broker.get_items_since(sync_point1, 1)
|
||||
if not rows:
|
||||
@ -317,12 +321,11 @@ class ContainerSync(Daemon):
|
||||
# objects (if 3 replicas, 1/4 if 4, etc.). It'll come back
|
||||
# around to the section above and attempt to sync
|
||||
# previously skipped rows in case the other nodes didn't
|
||||
# succeed.
|
||||
# succeed or in case it failed to do so the first time.
|
||||
if unpack_from('>I', key)[0] % \
|
||||
len(nodes) == ordinal:
|
||||
if not self.container_sync_row(row, sync_to, sync_key,
|
||||
broker, info):
|
||||
return
|
||||
self.container_sync_row(row, sync_to, sync_key,
|
||||
broker, info)
|
||||
sync_point1 = row['ROWID']
|
||||
broker.set_x_container_sync_points(sync_point1, None)
|
||||
self.container_syncs += 1
|
||||
@ -420,10 +423,11 @@ class ContainerSync(Daemon):
|
||||
'sync_to': sync_to})
|
||||
elif err.http_status == HTTP_NOT_FOUND:
|
||||
self.logger.info(
|
||||
_('Not found %(sync_from)r => %(sync_to)r'),
|
||||
_('Not found %(sync_from)r => %(sync_to)r \
|
||||
- object %(obj_name)r'),
|
||||
{'sync_from': '%s/%s' %
|
||||
(quote(info['account']), quote(info['container'])),
|
||||
'sync_to': sync_to})
|
||||
'sync_to': sync_to, 'obj_name': row['name']})
|
||||
else:
|
||||
self.logger.exception(
|
||||
_('ERROR Syncing %(db_file)s %(row)s'),
|
||||
|
@ -422,10 +422,10 @@ class TestContainerSync(unittest.TestCase):
|
||||
cs.allowed_sync_hosts = ['127.0.0.1']
|
||||
cs.container_sync('isa.db')
|
||||
# Succeeds because no rows match
|
||||
self.assertEquals(cs.container_failures, 0)
|
||||
self.assertEquals(cs.container_failures, 1)
|
||||
self.assertEquals(cs.container_skips, 0)
|
||||
self.assertEquals(fcb.sync_point1, None)
|
||||
self.assertEquals(fcb.sync_point2, 1)
|
||||
self.assertEquals(fcb.sync_point2, -1)
|
||||
|
||||
def fake_hash_path(account, container, obj, raw_digest=False):
|
||||
# Ensures that all rows match for full syncing, ordinal is 0
|
||||
@ -446,7 +446,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
cs.allowed_sync_hosts = ['127.0.0.1']
|
||||
cs.container_sync('isa.db')
|
||||
# Succeeds because the two sync points haven't deviated yet
|
||||
self.assertEquals(cs.container_failures, 0)
|
||||
self.assertEquals(cs.container_failures, 1)
|
||||
self.assertEquals(cs.container_skips, 0)
|
||||
self.assertEquals(fcb.sync_point1, -1)
|
||||
self.assertEquals(fcb.sync_point2, -1)
|
||||
@ -465,9 +465,9 @@ class TestContainerSync(unittest.TestCase):
|
||||
cs.container_sync('isa.db')
|
||||
# Fails because container_sync_row will fail since the row has no
|
||||
# 'deleted' key
|
||||
self.assertEquals(cs.container_failures, 1)
|
||||
self.assertEquals(cs.container_failures, 2)
|
||||
self.assertEquals(cs.container_skips, 0)
|
||||
self.assertEquals(fcb.sync_point1, -1)
|
||||
self.assertEquals(fcb.sync_point1, None)
|
||||
self.assertEquals(fcb.sync_point2, -1)
|
||||
|
||||
fcb = FakeContainerBroker('path',
|
||||
@ -484,9 +484,9 @@ class TestContainerSync(unittest.TestCase):
|
||||
cs.allowed_sync_hosts = ['127.0.0.1']
|
||||
cs.container_sync('isa.db')
|
||||
# Fails because delete_object fails
|
||||
self.assertEquals(cs.container_failures, 2)
|
||||
self.assertEquals(cs.container_failures, 3)
|
||||
self.assertEquals(cs.container_skips, 0)
|
||||
self.assertEquals(fcb.sync_point1, -1)
|
||||
self.assertEquals(fcb.sync_point1, None)
|
||||
self.assertEquals(fcb.sync_point2, -1)
|
||||
|
||||
def fake_delete_object(*args, **kwargs):
|
||||
@ -507,7 +507,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
cs.allowed_sync_hosts = ['127.0.0.1']
|
||||
cs.container_sync('isa.db')
|
||||
# Succeeds because delete_object succeeds
|
||||
self.assertEquals(cs.container_failures, 2)
|
||||
self.assertEquals(cs.container_failures, 3)
|
||||
self.assertEquals(cs.container_skips, 0)
|
||||
self.assertEquals(fcb.sync_point1, None)
|
||||
self.assertEquals(fcb.sync_point2, 1)
|
||||
@ -574,10 +574,11 @@ class TestContainerSync(unittest.TestCase):
|
||||
cs.allowed_sync_hosts = ['127.0.0.1']
|
||||
cs.container_sync('isa.db')
|
||||
# Fails because row is missing 'deleted' key
|
||||
# Nevertheless the fault is skipped
|
||||
self.assertEquals(cs.container_failures, 1)
|
||||
self.assertEquals(cs.container_skips, 0)
|
||||
self.assertEquals(fcb.sync_point1, -1)
|
||||
self.assertEquals(fcb.sync_point2, -1)
|
||||
self.assertEquals(fcb.sync_point1, 1)
|
||||
self.assertEquals(fcb.sync_point2, None)
|
||||
|
||||
fcb = FakeContainerBroker('path',
|
||||
info={'account': 'a', 'container': 'c',
|
||||
|
Loading…
Reference in New Issue
Block a user