Merge "reconciler: Record queue-clean-up later than old-policy-clean-up"
This commit is contained in:
commit
90f75e33b7
@ -435,7 +435,7 @@ class ContainerReconciler(Daemon):
|
||||
an object was manually re-enqued.
|
||||
"""
|
||||
q_path = '/%s/%s/%s' % (MISPLACED_OBJECTS_ACCOUNT, container, obj)
|
||||
x_timestamp = slightly_later_timestamp(max(q_record, q_ts))
|
||||
x_timestamp = slightly_later_timestamp(max(q_record, q_ts), offset=2)
|
||||
self.stats_log('pop_queue', 'remove %r (%f) from the queue (%s)',
|
||||
q_path, q_ts, x_timestamp)
|
||||
headers = {'X-Timestamp': x_timestamp}
|
||||
@ -623,7 +623,7 @@ class ContainerReconciler(Daemon):
|
||||
# optimistically move any source with a timestamp >= q_ts
|
||||
ts = max(Timestamp(source_ts), q_ts)
|
||||
# move the object
|
||||
put_timestamp = slightly_later_timestamp(ts, offset=2)
|
||||
put_timestamp = slightly_later_timestamp(ts, offset=3)
|
||||
self.stats_log('copy_attempt', '%r (%f) in policy_index %s will be '
|
||||
'moved to policy_index %s (%s)', path, source_ts,
|
||||
q_policy_index, container_policy_index, put_timestamp)
|
||||
@ -663,7 +663,7 @@ class ContainerReconciler(Daemon):
|
||||
Issue a DELETE request against the destination to match the
|
||||
misplaced DELETE against the source.
|
||||
"""
|
||||
delete_timestamp = slightly_later_timestamp(q_ts, offset=2)
|
||||
delete_timestamp = slightly_later_timestamp(q_ts, offset=3)
|
||||
self.stats_log('delete_attempt', '%r (%f) in policy_index %s '
|
||||
'will be deleted from policy_index %s (%s)', path,
|
||||
source_ts, q_policy_index, container_policy_index,
|
||||
|
@ -947,8 +947,8 @@ class TestReconciler(unittest.TestCase):
|
||||
with mock.patch('time.time', lambda: next(mock_time_iter)):
|
||||
self.reconciler.run_once()
|
||||
|
||||
return [c[1][1:4] for c in
|
||||
mocks['direct_delete_container_entry'].mock_calls]
|
||||
return [tuple(list(c[1][1:4]) + [c[2]['headers']['X-Timestamp']])
|
||||
for c in mocks['direct_delete_container_entry'].mock_calls]
|
||||
|
||||
def test_no_concurrency(self):
|
||||
self._mock_listing({
|
||||
@ -971,8 +971,10 @@ class TestReconciler(unittest.TestCase):
|
||||
self.assertEqual(order_recieved, ['o1', 'o2'])
|
||||
# process in order recieved
|
||||
self.assertEqual(deleted_container_entries, [
|
||||
('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1'),
|
||||
('.misplaced_objects', '3600', '1:/AUTH_bob/c/o2'),
|
||||
('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1',
|
||||
Timestamp(3618.84187, offset=2).internal),
|
||||
('.misplaced_objects', '3600', '1:/AUTH_bob/c/o2',
|
||||
Timestamp(3724.23456, offset=2).internal)
|
||||
])
|
||||
|
||||
def test_concurrency(self):
|
||||
@ -1000,8 +1002,10 @@ class TestReconciler(unittest.TestCase):
|
||||
self.assertEqual(order_recieved, ['o1', 'o2'])
|
||||
# ... and so we finish o2 first
|
||||
self.assertEqual(deleted_container_entries, [
|
||||
('.misplaced_objects', '3600', '1:/AUTH_bob/c/o2'),
|
||||
('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1'),
|
||||
('.misplaced_objects', '3600', '1:/AUTH_bob/c/o2',
|
||||
Timestamp(3724.23456, offset=2).internal),
|
||||
('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1',
|
||||
Timestamp(3618.84187, offset=2).internal),
|
||||
])
|
||||
|
||||
def test_multi_process_should_process(self):
|
||||
@ -1100,7 +1104,8 @@ class TestReconciler(unittest.TestCase):
|
||||
# so pop the queue for that one
|
||||
self.assertEqual(self.reconciler.stats['pop_queue'], 1)
|
||||
self.assertEqual(deleted_container_entries,
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1')])
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1',
|
||||
Timestamp(3618.84187, offset=2).internal)])
|
||||
self.assertEqual(self.reconciler.stats['success'], 1)
|
||||
|
||||
def test_queue_name_with_policy_index_delimiter_in_name(self):
|
||||
@ -1145,7 +1150,8 @@ class TestReconciler(unittest.TestCase):
|
||||
# and pop the queue for that one
|
||||
self.assertEqual(self.reconciler.stats['pop_queue'], 1)
|
||||
self.assertEqual(deleted_container_entries, [(
|
||||
'.misplaced_objects', '3600', '1:/%s' % obj_path)])
|
||||
'.misplaced_objects', '3600', '1:/%s' % obj_path,
|
||||
Timestamp(3618.84187, offset=2).internal)])
|
||||
self.assertEqual(self.reconciler.stats['success'], 1)
|
||||
|
||||
def test_unable_to_direct_get_oldest_storage_policy(self):
|
||||
@ -1243,9 +1249,9 @@ class TestReconciler(unittest.TestCase):
|
||||
[('HEAD', '/v1/AUTH_bob/c/o1'),
|
||||
('PUT', '/v1/AUTH_bob/c/o1')])
|
||||
put_headers = self.fake_swift.storage_policy[0].headers[1]
|
||||
# we PUT the object in the right place with q_ts + offset 2
|
||||
# we PUT the object in the right place with q_ts + offset 3
|
||||
self.assertEqual(put_headers.get('X-Timestamp'),
|
||||
Timestamp(3618.84187, offset=2))
|
||||
Timestamp(3618.84187, offset=3))
|
||||
# cleans up the old
|
||||
self.assertEqual(self.reconciler.stats['cleanup_attempt'], 1)
|
||||
self.assertEqual(self.reconciler.stats['cleanup_success'], 1)
|
||||
@ -1256,7 +1262,8 @@ class TestReconciler(unittest.TestCase):
|
||||
# and when we're done, we pop the entry from the queue
|
||||
self.assertEqual(self.reconciler.stats['pop_queue'], 1)
|
||||
self.assertEqual(deleted_container_entries,
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1')])
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1',
|
||||
Timestamp(3618.84187, offset=2).internal)])
|
||||
self.assertEqual(self.reconciler.stats['success'], 1)
|
||||
|
||||
def test_object_move_the_other_direction(self):
|
||||
@ -1290,9 +1297,9 @@ class TestReconciler(unittest.TestCase):
|
||||
[('HEAD', '/v1/AUTH_bob/c/o1'), # 1
|
||||
('PUT', '/v1/AUTH_bob/c/o1')]) # 3
|
||||
put_headers = self.fake_swift.storage_policy[1].headers[1]
|
||||
# we PUT the object in the right place with q_ts + offset 2
|
||||
# we PUT the object in the right place with q_ts + offset 3
|
||||
self.assertEqual(put_headers.get('X-Timestamp'),
|
||||
Timestamp(3618.84187, offset=2).internal)
|
||||
Timestamp(3618.84187, offset=3).internal)
|
||||
# cleans up the old
|
||||
self.assertEqual(self.reconciler.stats['cleanup_attempt'], 1)
|
||||
self.assertEqual(self.reconciler.stats['cleanup_success'], 1)
|
||||
@ -1303,7 +1310,8 @@ class TestReconciler(unittest.TestCase):
|
||||
# and when we're done, we pop the entry from the queue
|
||||
self.assertEqual(self.reconciler.stats['pop_queue'], 1)
|
||||
self.assertEqual(deleted_container_entries,
|
||||
[('.misplaced_objects', '3600', '0:/AUTH_bob/c/o1')])
|
||||
[('.misplaced_objects', '3600', '0:/AUTH_bob/c/o1',
|
||||
Timestamp(3618.84187, offset=2).internal)])
|
||||
self.assertEqual(self.reconciler.stats['success'], 1)
|
||||
|
||||
def test_object_move_with_unicode_and_spaces(self):
|
||||
@ -1349,9 +1357,9 @@ class TestReconciler(unittest.TestCase):
|
||||
[('HEAD', '/v1/%s' % obj_path), # 1
|
||||
('PUT', '/v1/%s' % obj_path)]) # 3
|
||||
put_headers = self.fake_swift.storage_policy[0].headers[1]
|
||||
# we PUT the object in the right place with q_ts + offset 2
|
||||
# we PUT the object in the right place with q_ts + offset 3
|
||||
self.assertEqual(put_headers.get('X-Timestamp'),
|
||||
Timestamp(3618.84187, offset=2).internal)
|
||||
Timestamp(3618.84187, offset=3).internal)
|
||||
# cleans up the old
|
||||
self.assertEqual(self.reconciler.stats['cleanup_attempt'], 1)
|
||||
self.assertEqual(self.reconciler.stats['cleanup_success'], 1)
|
||||
@ -1365,7 +1373,8 @@ class TestReconciler(unittest.TestCase):
|
||||
self.assertEqual(self.reconciler.stats['pop_queue'], 1)
|
||||
# this mock received the name, it's encoded down in buffered_http
|
||||
self.assertEqual(deleted_container_entries,
|
||||
[('.misplaced_objects', '3600', '1:/%s' % obj_name)])
|
||||
[('.misplaced_objects', '3600', '1:/%s' % obj_name,
|
||||
Timestamp(3618.84187, offset=2).internal)])
|
||||
self.assertEqual(self.reconciler.stats['success'], 1)
|
||||
|
||||
def test_object_delete(self):
|
||||
@ -1406,9 +1415,9 @@ class TestReconciler(unittest.TestCase):
|
||||
[('HEAD', '/v1/AUTH_bob/c/o1'),
|
||||
('DELETE', '/v1/AUTH_bob/c/o1')])
|
||||
reconcile_headers = self.fake_swift.storage_policy[0].headers[1]
|
||||
# we DELETE the object in the right place with q_ts + offset 2
|
||||
# we DELETE the object in the right place with q_ts + offset 3
|
||||
self.assertEqual(reconcile_headers.get('X-Timestamp'),
|
||||
Timestamp(q_ts, offset=2).internal)
|
||||
Timestamp(q_ts, offset=3).internal)
|
||||
# cleans up the old
|
||||
self.assertEqual(self.reconciler.stats['cleanup_attempt'], 1)
|
||||
self.assertEqual(self.reconciler.stats['cleanup_success'], 1)
|
||||
@ -1419,7 +1428,8 @@ class TestReconciler(unittest.TestCase):
|
||||
# and when we're done, we pop the entry from the queue
|
||||
self.assertEqual(self.reconciler.stats['pop_queue'], 1)
|
||||
self.assertEqual(deleted_container_entries,
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1')])
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1',
|
||||
Timestamp(q_ts, offset=2).internal)])
|
||||
self.assertEqual(self.reconciler.stats['success'], 1)
|
||||
|
||||
def test_object_enqueued_for_the_correct_dest_noop(self):
|
||||
@ -1443,7 +1453,8 @@ class TestReconciler(unittest.TestCase):
|
||||
# so we just pop the queue
|
||||
self.assertEqual(self.reconciler.stats['pop_queue'], 1)
|
||||
self.assertEqual(deleted_container_entries,
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1')])
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1',
|
||||
Timestamp(3618.84187, offset=2).internal)])
|
||||
self.assertEqual(self.reconciler.stats['success'], 1)
|
||||
|
||||
def test_object_move_src_object_newer_than_queue_entry(self):
|
||||
@ -1478,10 +1489,10 @@ class TestReconciler(unittest.TestCase):
|
||||
self.fake_swift.storage_policy[0].calls,
|
||||
[('HEAD', '/v1/AUTH_bob/c/o1'), # 1
|
||||
('PUT', '/v1/AUTH_bob/c/o1')]) # 3
|
||||
# .. with source timestamp + offset 2
|
||||
# .. with source timestamp + offset 3
|
||||
put_headers = self.fake_swift.storage_policy[0].headers[1]
|
||||
self.assertEqual(put_headers.get('X-Timestamp'),
|
||||
Timestamp(3600.234567, offset=2))
|
||||
Timestamp(3600.234567, offset=3))
|
||||
# src object is cleaned up
|
||||
self.assertEqual(self.reconciler.stats['cleanup_attempt'], 1)
|
||||
self.assertEqual(self.reconciler.stats['cleanup_success'], 1)
|
||||
@ -1491,7 +1502,8 @@ class TestReconciler(unittest.TestCase):
|
||||
# and queue is popped
|
||||
self.assertEqual(self.reconciler.stats['pop_queue'], 1)
|
||||
self.assertEqual(deleted_container_entries,
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1')])
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1',
|
||||
Timestamp(3600.123456, offset=2).internal)])
|
||||
self.assertEqual(self.reconciler.stats['success'], 1)
|
||||
|
||||
def test_object_move_src_object_older_than_queue_entry(self):
|
||||
@ -1542,7 +1554,7 @@ class TestReconciler(unittest.TestCase):
|
||||
self._mock_oldest_spi({'c': 0})
|
||||
self.fake_swift.storage_policy[1].register(
|
||||
'GET', '/v1/AUTH_bob/c/o1', swob.HTTPNotFound,
|
||||
{'X-Backend-Timestamp': Timestamp(q_ts, offset=2).internal})
|
||||
{'X-Backend-Timestamp': Timestamp(q_ts, offset=3).internal})
|
||||
deleted_container_entries = self._run_once()
|
||||
|
||||
# found a misplaced object
|
||||
@ -1682,10 +1694,10 @@ class TestReconciler(unittest.TestCase):
|
||||
self.fake_swift.storage_policy[0].calls,
|
||||
[('HEAD', '/v1/AUTH_bob/c/o1'), # 1
|
||||
('PUT', '/v1/AUTH_bob/c/o1')]) # 3
|
||||
# .. with source timestamp + offset 2
|
||||
# .. with source timestamp + offset 3
|
||||
put_headers = self.fake_swift.storage_policy[0].headers[1]
|
||||
self.assertEqual(put_headers.get('X-Timestamp'),
|
||||
Timestamp(3600.123457, offset=2))
|
||||
Timestamp(3600.123457, offset=3))
|
||||
# we try to cleanup
|
||||
self.assertEqual(self.reconciler.stats['cleanup_attempt'], 1)
|
||||
# ... with q_ts + offset 1
|
||||
@ -1731,7 +1743,8 @@ class TestReconciler(unittest.TestCase):
|
||||
# so... we're done
|
||||
self.assertEqual(self.reconciler.stats['pop_queue'], 1)
|
||||
self.assertEqual(deleted_container_entries,
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1')])
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1',
|
||||
Timestamp(q_ts, offset=2).internal)])
|
||||
# dunno if this is helpful, but FWIW we don't throw tombstones?
|
||||
self.assertEqual(self.reconciler.stats['cleanup_attempt'], 0)
|
||||
self.assertEqual(self.reconciler.stats['success'], 1) # lol
|
||||
@ -1772,7 +1785,8 @@ class TestReconciler(unittest.TestCase):
|
||||
# and wipe our hands of it
|
||||
self.assertEqual(self.reconciler.stats['pop_queue'], 1)
|
||||
self.assertEqual(deleted_container_entries,
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1')])
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1',
|
||||
Timestamp(3679.2019, offset=2).internal)])
|
||||
self.assertEqual(self.reconciler.stats['success'], 1)
|
||||
|
||||
def test_object_move_dest_object_newer_than_queue_entry(self):
|
||||
@ -1813,7 +1827,8 @@ class TestReconciler(unittest.TestCase):
|
||||
# and since we cleaned up the old object, so this counts as done
|
||||
self.assertEqual(self.reconciler.stats['pop_queue'], 1)
|
||||
self.assertEqual(deleted_container_entries,
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1')])
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1',
|
||||
Timestamp(3679.2019, offset=2).internal)])
|
||||
self.assertEqual(self.reconciler.stats['success'], 1)
|
||||
|
||||
def test_object_move_dest_object_older_than_queue_entry(self):
|
||||
@ -1847,10 +1862,10 @@ class TestReconciler(unittest.TestCase):
|
||||
self.fake_swift.storage_policy[0].calls,
|
||||
[('HEAD', '/v1/AUTH_bob/c/o1'), # 1
|
||||
('PUT', '/v1/AUTH_bob/c/o1')]) # 3
|
||||
# ... with a q_ts + offset 2
|
||||
# ... with a q_ts + offset 3
|
||||
put_headers = self.fake_swift.storage_policy[0].headers[1]
|
||||
self.assertEqual(put_headers.get('X-Timestamp'),
|
||||
Timestamp(36123.38393, offset=2))
|
||||
Timestamp(36123.38393, offset=3))
|
||||
# then clean the dark matter
|
||||
self.assertEqual(self.reconciler.stats['cleanup_attempt'], 1)
|
||||
self.assertEqual(self.reconciler.stats['cleanup_success'], 1)
|
||||
@ -1861,7 +1876,8 @@ class TestReconciler(unittest.TestCase):
|
||||
# and pop the queue
|
||||
self.assertEqual(self.reconciler.stats['pop_queue'], 1)
|
||||
self.assertEqual(deleted_container_entries,
|
||||
[('.misplaced_objects', '36000', '1:/AUTH_bob/c/o1')])
|
||||
[('.misplaced_objects', '36000', '1:/AUTH_bob/c/o1',
|
||||
Timestamp(36123.38393, offset=2).internal)])
|
||||
self.assertEqual(self.reconciler.stats['success'], 1)
|
||||
|
||||
def test_object_move_put_fails(self):
|
||||
@ -1899,9 +1915,9 @@ class TestReconciler(unittest.TestCase):
|
||||
[('HEAD', '/v1/AUTH_bob/c/o1'), # 1
|
||||
('PUT', '/v1/AUTH_bob/c/o1')]) # 3
|
||||
put_headers = self.fake_swift.storage_policy[0].headers[1]
|
||||
# ...with q_ts + offset 2 (20-microseconds)
|
||||
# ...with q_ts + offset 3 (20-microseconds)
|
||||
self.assertEqual(put_headers.get('X-Timestamp'),
|
||||
Timestamp(36123.383925, offset=2))
|
||||
Timestamp(36123.383925, offset=3))
|
||||
# but it failed
|
||||
self.assertEqual(self.reconciler.stats['copy_success'], 0)
|
||||
self.assertEqual(self.reconciler.stats['copy_failed'], 1)
|
||||
@ -1950,9 +1966,9 @@ class TestReconciler(unittest.TestCase):
|
||||
[('HEAD', '/v1/AUTH_bob/c/o1'), # 1
|
||||
('PUT', '/v1/AUTH_bob/c/o1')]) # 3
|
||||
put_headers = self.fake_swift.storage_policy[0].headers[1]
|
||||
# ...with q_ts + offset 2 (20-microseconds)
|
||||
# ...with q_ts + offset 3 (20-microseconds)
|
||||
self.assertEqual(put_headers.get('X-Timestamp'),
|
||||
Timestamp(36123.383925, offset=2))
|
||||
Timestamp(36123.383925, offset=3))
|
||||
# but it blows up hard
|
||||
self.assertEqual(self.reconciler.stats['unhandled_error'], 1)
|
||||
# so we don't cleanup
|
||||
@ -2029,7 +2045,8 @@ class TestReconciler(unittest.TestCase):
|
||||
# probably been reaped, so we'll just give up
|
||||
self.assertEqual(
|
||||
deleted_container_entries,
|
||||
[('.misplaced_objects', container, '1:/AUTH_jeb/c/o1')])
|
||||
[('.misplaced_objects', container, '1:/AUTH_jeb/c/o1',
|
||||
Timestamp(queue_ts, offset=2).internal)])
|
||||
|
||||
def test_delete_old_empty_queue_containers(self):
|
||||
ts = time.time() - self.reconciler.reclaim_age * 1.1
|
||||
|
Loading…
x
Reference in New Issue
Block a user