swift/test/unit/container/test_replicator.py
Tim Burke bc4494f24d py3: port account/container replicators
Change-Id: Ia2662d8f75883e1cc41b9277c65f8b771f56f902
2018-11-06 16:54:20 -08:00

2383 lines
110 KiB
Python

# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import shutil
import itertools
import unittest
import mock
import random
import sqlite3
from swift.common import db_replicator
from swift.common.swob import HTTPServerError
from swift.container import replicator, backend, server, sync_store
from swift.container.reconciler import (
MISPLACED_OBJECTS_ACCOUNT, get_reconciler_container_name)
from swift.common.utils import Timestamp, encode_timestamps, ShardRange, \
get_db_files, make_db_file_path
from swift.common.storage_policy import POLICIES
from test.unit.common import test_db_replicator
from test.unit import patch_policies, make_timestamp_iter, mock_check_drive, \
debug_logger, EMPTY_ETAG, FakeLogger, attach_fake_replication_rpc, \
FakeHTTPResponse
from contextlib import contextmanager
@patch_policies
class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
backend = backend.ContainerBroker
datadir = server.DATADIR
replicator_daemon = replicator.ContainerReplicator
replicator_rpc = replicator.ContainerReplicatorRpc
def assertShardRangesEqual(self, x, y):
# ShardRange.__eq__ only compares lower and upper; here we generate
# dict representations to compare all attributes
self.assertEqual([dict(sr) for sr in x], [dict(sr) for sr in y])
def assertShardRangesNotEqual(self, x, y):
# ShardRange.__eq__ only compares lower and upper; here we generate
# dict representations to compare all attributes
self.assertNotEqual([dict(sr) for sr in x], [dict(sr) for sr in y])
def test_report_up_to_date(self):
broker = self._get_broker('a', 'c', node_index=0)
broker.initialize(Timestamp(1).internal, int(POLICIES.default))
info = broker.get_info()
broker.reported(info['put_timestamp'],
info['delete_timestamp'],
info['object_count'],
info['bytes_used'])
full_info = broker.get_replication_info()
expected_info = {'put_timestamp': Timestamp(1).internal,
'delete_timestamp': '0',
'count': 0,
'bytes_used': 0,
'reported_put_timestamp': Timestamp(1).internal,
'reported_delete_timestamp': '0',
'reported_object_count': 0,
'reported_bytes_used': 0}
for key, value in expected_info.items():
msg = 'expected value for %r, %r != %r' % (
key, full_info[key], value)
self.assertEqual(full_info[key], value, msg)
repl = replicator.ContainerReplicator({})
self.assertTrue(repl.report_up_to_date(full_info))
full_info['delete_timestamp'] = Timestamp(2).internal
self.assertFalse(repl.report_up_to_date(full_info))
full_info['reported_delete_timestamp'] = Timestamp(2).internal
self.assertTrue(repl.report_up_to_date(full_info))
full_info['count'] = 1
self.assertFalse(repl.report_up_to_date(full_info))
full_info['reported_object_count'] = 1
self.assertTrue(repl.report_up_to_date(full_info))
full_info['bytes_used'] = 1
self.assertFalse(repl.report_up_to_date(full_info))
full_info['reported_bytes_used'] = 1
self.assertTrue(repl.report_up_to_date(full_info))
full_info['put_timestamp'] = Timestamp(3).internal
self.assertFalse(repl.report_up_to_date(full_info))
full_info['reported_put_timestamp'] = Timestamp(3).internal
self.assertTrue(repl.report_up_to_date(full_info))
def test_sync_remote_in_sync(self):
# setup a local container
broker = self._get_broker('a', 'c', node_index=0)
put_timestamp = time.time()
broker.initialize(put_timestamp, POLICIES.default.idx)
# "replicate" to same database
node = {'device': 'sdb', 'replication_ip': '127.0.0.1'}
daemon = replicator.ContainerReplicator({})
# replicate
part, node = self._get_broker_part_node(broker)
info = broker.get_replication_info()
success = daemon._repl_to_node(node, broker, part, info)
# nothing to do
self.assertTrue(success)
self.assertEqual(1, daemon.stats['no_change'])
def test_sync_remote_with_timings(self):
ts_iter = make_timestamp_iter()
# setup a local container
broker = self._get_broker('a', 'c', node_index=0)
put_timestamp = next(ts_iter)
broker.initialize(put_timestamp.internal, POLICIES.default.idx)
broker.update_metadata(
{'x-container-meta-test': ('foo', put_timestamp.internal)})
# setup remote container
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_broker.initialize(next(ts_iter).internal, POLICIES.default.idx)
timestamp = next(ts_iter)
for db in (broker, remote_broker):
db.put_object(
'/a/c/o', timestamp.internal, 0, 'content-type', 'etag',
storage_policy_index=db.storage_policy_index)
# replicate
daemon = replicator.ContainerReplicator({})
part, node = self._get_broker_part_node(remote_broker)
info = broker.get_replication_info()
with mock.patch.object(db_replicator, 'DEBUG_TIMINGS_THRESHOLD', -1):
success = daemon._repl_to_node(node, broker, part, info)
# nothing to do
self.assertTrue(success)
self.assertEqual(1, daemon.stats['no_change'])
expected_timings = ('info', 'update_metadata', 'merge_timestamps',
'get_sync', 'merge_syncs')
debug_lines = self.rpc.logger.logger.get_lines_for_level('debug')
self.assertEqual(len(expected_timings), len(debug_lines),
'Expected %s debug lines but only got %s: %s' %
(len(expected_timings), len(debug_lines),
debug_lines))
for metric in expected_timings:
expected = 'replicator-rpc-sync time for %s:' % metric
self.assertTrue(any(expected in line for line in debug_lines),
'debug timing %r was not in %r' % (
expected, debug_lines))
def test_sync_remote_missing(self):
broker = self._get_broker('a', 'c', node_index=0)
put_timestamp = time.time()
broker.initialize(put_timestamp, POLICIES.default.idx)
# "replicate"
part, node = self._get_broker_part_node(broker)
daemon = self._run_once(node)
# complete rsync to all other nodes
self.assertEqual(2, daemon.stats['rsync'])
for i in range(1, 3):
remote_broker = self._get_broker('a', 'c', node_index=i)
self.assertTrue(os.path.exists(remote_broker.db_file))
remote_info = remote_broker.get_info()
local_info = self._get_broker(
'a', 'c', node_index=0).get_info()
for k, v in local_info.items():
if k == 'id':
continue
self.assertEqual(remote_info[k], v,
"mismatch remote %s %r != %r" % (
k, remote_info[k], v))
def test_rsync_failure(self):
broker = self._get_broker('a', 'c', node_index=0)
put_timestamp = time.time()
broker.initialize(put_timestamp, POLICIES.default.idx)
# "replicate" to different device
daemon = replicator.ContainerReplicator({})
def _rsync_file(*args, **kwargs):
return False
daemon._rsync_file = _rsync_file
# replicate
part, local_node = self._get_broker_part_node(broker)
node = random.choice([n for n in self._ring.devs
if n['id'] != local_node['id']])
info = broker.get_replication_info()
with mock_check_drive(ismount=True):
success = daemon._repl_to_node(node, broker, part, info)
self.assertFalse(success)
def test_sync_remote_missing_most_rows(self):
put_timestamp = time.time()
# create "local" broker
broker = self._get_broker('a', 'c', node_index=0)
broker.initialize(put_timestamp, POLICIES.default.idx)
# create "remote" broker
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_broker.initialize(put_timestamp, POLICIES.default.idx)
# add a row to "local" db
broker.put_object('/a/c/o', time.time(), 0, 'content-type', 'etag',
storage_policy_index=broker.storage_policy_index)
# replicate
node = {'device': 'sdc', 'replication_ip': '127.0.0.1'}
daemon = replicator.ContainerReplicator({'per_diff': 1})
def _rsync_file(db_file, remote_file, **kwargs):
remote_server, remote_path = remote_file.split('/', 1)
dest_path = os.path.join(self.root, remote_path)
shutil.copy(db_file, dest_path)
return True
daemon._rsync_file = _rsync_file
part, node = self._get_broker_part_node(remote_broker)
info = broker.get_replication_info()
success = daemon._repl_to_node(node, broker, part, info)
self.assertTrue(success)
# row merge
self.assertEqual(1, daemon.stats['remote_merge'])
local_info = self._get_broker(
'a', 'c', node_index=0).get_info()
remote_info = self._get_broker(
'a', 'c', node_index=1).get_info()
for k, v in local_info.items():
if k == 'id':
continue
self.assertEqual(remote_info[k], v,
"mismatch remote %s %r != %r" % (
k, remote_info[k], v))
def test_sync_remote_missing_one_rows(self):
put_timestamp = time.time()
# create "local" broker
broker = self._get_broker('a', 'c', node_index=0)
broker.initialize(put_timestamp, POLICIES.default.idx)
# create "remote" broker
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_broker.initialize(put_timestamp, POLICIES.default.idx)
# add some rows to both db
for i in range(10):
put_timestamp = time.time()
for db in (broker, remote_broker):
path = '/a/c/o_%s' % i
db.put_object(path, put_timestamp, 0, 'content-type', 'etag',
storage_policy_index=db.storage_policy_index)
# now a row to the "local" broker only
broker.put_object('/a/c/o_missing', time.time(), 0,
'content-type', 'etag',
storage_policy_index=broker.storage_policy_index)
# replicate
daemon = replicator.ContainerReplicator({})
part, node = self._get_broker_part_node(remote_broker)
info = broker.get_replication_info()
success = daemon._repl_to_node(node, broker, part, info)
self.assertTrue(success)
# row merge
self.assertEqual(1, daemon.stats['diff'])
local_info = self._get_broker(
'a', 'c', node_index=0).get_info()
remote_info = self._get_broker(
'a', 'c', node_index=1).get_info()
for k, v in local_info.items():
if k == 'id':
continue
self.assertEqual(remote_info[k], v,
"mismatch remote %s %r != %r" % (
k, remote_info[k], v))
def test_sync_remote_can_not_keep_up(self):
put_timestamp = time.time()
# create "local" broker
broker = self._get_broker('a', 'c', node_index=0)
broker.initialize(put_timestamp, POLICIES.default.idx)
# create "remote" broker
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_broker.initialize(put_timestamp, POLICIES.default.idx)
# add some rows to both db's
for i in range(10):
put_timestamp = time.time()
for db in (broker, remote_broker):
obj_name = 'o_%s' % i
db.put_object(obj_name, put_timestamp, 0,
'content-type', 'etag',
storage_policy_index=db.storage_policy_index)
# setup REPLICATE callback to simulate adding rows during merge_items
missing_counter = itertools.count()
def put_more_objects(op, *args):
if op != 'merge_items':
return
path = '/a/c/o_missing_%s' % next(missing_counter)
broker.put_object(path, time.time(), 0, 'content-type', 'etag',
storage_policy_index=db.storage_policy_index)
test_db_replicator.FakeReplConnection = \
test_db_replicator.attach_fake_replication_rpc(
self.rpc, replicate_hook=put_more_objects)
db_replicator.ReplConnection = test_db_replicator.FakeReplConnection
# and add one extra to local db to trigger merge_items
put_more_objects('merge_items')
# limit number of times we'll call merge_items
daemon = replicator.ContainerReplicator({'max_diffs': 10})
# replicate
part, node = self._get_broker_part_node(remote_broker)
info = broker.get_replication_info()
success = daemon._repl_to_node(node, broker, part, info)
self.assertFalse(success)
# back off on the PUTs during replication...
FakeReplConnection = test_db_replicator.attach_fake_replication_rpc(
self.rpc, replicate_hook=None)
db_replicator.ReplConnection = FakeReplConnection
# retry replication
info = broker.get_replication_info()
success = daemon._repl_to_node(node, broker, part, info)
self.assertTrue(success)
# row merge
self.assertEqual(2, daemon.stats['diff'])
self.assertEqual(1, daemon.stats['diff_capped'])
local_info = self._get_broker(
'a', 'c', node_index=0).get_info()
remote_info = self._get_broker(
'a', 'c', node_index=1).get_info()
for k, v in local_info.items():
if k == 'id':
continue
self.assertEqual(remote_info[k], v,
"mismatch remote %s %r != %r" % (
k, remote_info[k], v))
def test_diff_capped_sync(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
put_timestamp = next(ts)
# start off with with a local db that is way behind
broker = self._get_broker('a', 'c', node_index=0)
broker.initialize(put_timestamp, POLICIES.default.idx)
for i in range(50):
broker.put_object(
'o%s' % i, next(ts), 0, 'content-type-old', 'etag',
storage_policy_index=broker.storage_policy_index)
# remote primary db has all the new bits...
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_broker.initialize(put_timestamp, POLICIES.default.idx)
for i in range(100):
remote_broker.put_object(
'o%s' % i, next(ts), 0, 'content-type-new', 'etag',
storage_policy_index=remote_broker.storage_policy_index)
# except there's *one* tiny thing in our local broker that's newer
broker.put_object(
'o101', next(ts), 0, 'content-type-new', 'etag',
storage_policy_index=broker.storage_policy_index)
# setup daemon with smaller per_diff and max_diffs
part, node = self._get_broker_part_node(broker)
daemon = self._get_daemon(node, conf_updates={'per_diff': 10,
'max_diffs': 3})
self.assertEqual(daemon.per_diff, 10)
self.assertEqual(daemon.max_diffs, 3)
# run once and verify diff capped
self._run_once(node, daemon=daemon)
self.assertEqual(1, daemon.stats['diff'])
self.assertEqual(1, daemon.stats['diff_capped'])
# run again and verify fully synced
self._run_once(node, daemon=daemon)
self.assertEqual(1, daemon.stats['diff'])
self.assertEqual(0, daemon.stats['diff_capped'])
# now that we're synced the new item should be in remote db
remote_names = set()
for item in remote_broker.list_objects_iter(500, '', '', '', ''):
name, ts, size, content_type, etag = item
remote_names.add(name)
self.assertEqual(content_type, 'content-type-new')
self.assertTrue('o101' in remote_names)
self.assertEqual(len(remote_names), 101)
self.assertEqual(remote_broker.get_info()['object_count'], 101)
def test_sync_status_change(self):
# setup a local container
broker = self._get_broker('a', 'c', node_index=0)
put_timestamp = time.time()
broker.initialize(put_timestamp, POLICIES.default.idx)
# setup remote container
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_broker.initialize(put_timestamp, POLICIES.default.idx)
# delete local container
broker.delete_db(time.time())
# replicate
daemon = replicator.ContainerReplicator({})
part, node = self._get_broker_part_node(remote_broker)
info = broker.get_replication_info()
success = daemon._repl_to_node(node, broker, part, info)
# nothing to do
self.assertTrue(success)
self.assertEqual(1, daemon.stats['no_change'])
# status in sync
self.assertTrue(remote_broker.is_deleted())
info = broker.get_info()
remote_info = remote_broker.get_info()
self.assertTrue(Timestamp(remote_info['status_changed_at']) >
Timestamp(remote_info['put_timestamp']),
'remote status_changed_at (%s) is not '
'greater than put_timestamp (%s)' % (
remote_info['status_changed_at'],
remote_info['put_timestamp']))
self.assertTrue(Timestamp(remote_info['status_changed_at']) >
Timestamp(info['status_changed_at']),
'remote status_changed_at (%s) is not '
'greater than local status_changed_at (%s)' % (
remote_info['status_changed_at'],
info['status_changed_at']))
@contextmanager
def _wrap_merge_timestamps(self, broker, calls):
def fake_merge_timestamps(*args, **kwargs):
calls.append(args[0])
orig_merge_timestamps(*args, **kwargs)
orig_merge_timestamps = broker.merge_timestamps
broker.merge_timestamps = fake_merge_timestamps
try:
yield True
finally:
broker.merge_timestamps = orig_merge_timestamps
def test_sync_merge_timestamps(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
# setup a local container
broker = self._get_broker('a', 'c', node_index=0)
put_timestamp = next(ts)
broker.initialize(put_timestamp, POLICIES.default.idx)
# setup remote container
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_put_timestamp = next(ts)
remote_broker.initialize(remote_put_timestamp, POLICIES.default.idx)
# replicate, expect call to merge_timestamps on remote and local
daemon = replicator.ContainerReplicator({})
part, node = self._get_broker_part_node(remote_broker)
info = broker.get_replication_info()
local_calls = []
remote_calls = []
with self._wrap_merge_timestamps(broker, local_calls):
with self._wrap_merge_timestamps(broker, remote_calls):
success = daemon._repl_to_node(node, broker, part, info)
self.assertTrue(success)
self.assertEqual(1, len(remote_calls))
self.assertEqual(1, len(local_calls))
self.assertEqual(remote_put_timestamp,
broker.get_info()['put_timestamp'])
self.assertEqual(remote_put_timestamp,
remote_broker.get_info()['put_timestamp'])
# replicate again, no changes so expect no calls to merge_timestamps
info = broker.get_replication_info()
local_calls = []
remote_calls = []
with self._wrap_merge_timestamps(broker, local_calls):
with self._wrap_merge_timestamps(broker, remote_calls):
success = daemon._repl_to_node(node, broker, part, info)
self.assertTrue(success)
self.assertEqual(0, len(remote_calls))
self.assertEqual(0, len(local_calls))
self.assertEqual(remote_put_timestamp,
broker.get_info()['put_timestamp'])
self.assertEqual(remote_put_timestamp,
remote_broker.get_info()['put_timestamp'])
def test_sync_bogus_db_quarantines(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
policy = random.choice(list(POLICIES))
# create "local" broker
local_broker = self._get_broker('a', 'c', node_index=0)
local_broker.initialize(next(ts), policy.idx)
# create "remote" broker
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_broker.initialize(next(ts), policy.idx)
db_path = local_broker.db_file
self.assertTrue(os.path.exists(db_path)) # sanity check
old_inode = os.stat(db_path).st_ino
_orig_get_info = backend.ContainerBroker.get_info
def fail_like_bad_db(broker):
if broker.db_file == local_broker.db_file:
raise sqlite3.OperationalError("no such table: container_info")
else:
return _orig_get_info(broker)
part, node = self._get_broker_part_node(remote_broker)
with mock.patch('swift.container.backend.ContainerBroker.get_info',
fail_like_bad_db):
# Have the remote node replicate to local; local should see its
# corrupt DB, quarantine it, and act like the DB wasn't ever there
# in the first place.
daemon = self._run_once(node)
self.assertTrue(os.path.exists(db_path))
# Make sure we didn't just keep the old DB, but quarantined it and
# made a fresh copy.
new_inode = os.stat(db_path).st_ino
self.assertNotEqual(old_inode, new_inode)
self.assertEqual(daemon.stats['failure'], 0)
def _replication_scenarios(self, *scenarios, **kwargs):
remote_wins = kwargs.get('remote_wins', False)
# these tests are duplicated because of the differences in replication
# when row counts cause full rsync vs. merge
scenarios = scenarios or (
'no_row', 'local_row', 'remote_row', 'both_rows')
for scenario_name in scenarios:
ts = itertools.count(int(time.time()))
policy = random.choice(list(POLICIES))
remote_policy = random.choice(
[p for p in POLICIES if p is not policy])
broker = self._get_broker('a', 'c', node_index=0)
remote_broker = self._get_broker('a', 'c', node_index=1)
yield ts, policy, remote_policy, broker, remote_broker
# variations on different replication scenarios
variations = {
'no_row': (),
'local_row': (broker,),
'remote_row': (remote_broker,),
'both_rows': (broker, remote_broker),
}
dbs = variations[scenario_name]
obj_ts = next(ts)
for db in dbs:
db.put_object('/a/c/o', obj_ts, 0, 'content-type', 'etag',
storage_policy_index=db.storage_policy_index)
# replicate
part, node = self._get_broker_part_node(broker)
daemon = self._run_once(node)
self.assertEqual(0, daemon.stats['failure'])
# in sync
local_info = self._get_broker(
'a', 'c', node_index=0).get_info()
remote_info = self._get_broker(
'a', 'c', node_index=1).get_info()
if remote_wins:
expected = remote_policy.idx
err = 'local policy did not change to match remote ' \
'for replication row scenario %s' % scenario_name
else:
expected = policy.idx
err = 'local policy changed to match remote ' \
'for replication row scenario %s' % scenario_name
self.assertEqual(local_info['storage_policy_index'], expected, err)
self.assertEqual(remote_info['storage_policy_index'],
local_info['storage_policy_index'])
test_db_replicator.TestReplicatorSync.tearDown(self)
test_db_replicator.TestReplicatorSync.setUp(self)
def test_sync_local_create_policy_over_newer_remote_create(self):
for setup in self._replication_scenarios():
ts, policy, remote_policy, broker, remote_broker = setup
# create "local" broker
broker.initialize(next(ts), policy.idx)
# create "remote" broker
remote_broker.initialize(next(ts), remote_policy.idx)
def test_sync_local_create_policy_over_newer_remote_delete(self):
for setup in self._replication_scenarios():
ts, policy, remote_policy, broker, remote_broker = setup
# create older "local" broker
broker.initialize(next(ts), policy.idx)
# create "remote" broker
remote_broker.initialize(next(ts), remote_policy.idx)
# delete "remote" broker
remote_broker.delete_db(next(ts))
def test_sync_local_create_policy_over_older_remote_delete(self):
# remote_row & both_rows cases are covered by
# "test_sync_remote_half_delete_policy_over_newer_local_create"
for setup in self._replication_scenarios(
'no_row', 'local_row'):
ts, policy, remote_policy, broker, remote_broker = setup
# create older "remote" broker
remote_broker.initialize(next(ts), remote_policy.idx)
# delete older "remote" broker
remote_broker.delete_db(next(ts))
# create "local" broker
broker.initialize(next(ts), policy.idx)
def test_sync_local_half_delete_policy_over_newer_remote_create(self):
# no_row & remote_row cases are covered by
# "test_sync_remote_create_policy_over_older_local_delete"
for setup in self._replication_scenarios('local_row', 'both_rows'):
ts, policy, remote_policy, broker, remote_broker = setup
# create older "local" broker
broker.initialize(next(ts), policy.idx)
# half delete older "local" broker
broker.delete_db(next(ts))
# create "remote" broker
remote_broker.initialize(next(ts), remote_policy.idx)
def test_sync_local_recreate_policy_over_newer_remote_create(self):
for setup in self._replication_scenarios():
ts, policy, remote_policy, broker, remote_broker = setup
# create "local" broker
broker.initialize(next(ts), policy.idx)
# older recreate "local" broker
broker.delete_db(next(ts))
recreate_timestamp = next(ts)
broker.update_put_timestamp(recreate_timestamp)
broker.update_status_changed_at(recreate_timestamp)
# create "remote" broker
remote_broker.initialize(next(ts), remote_policy.idx)
def test_sync_local_recreate_policy_over_older_remote_create(self):
for setup in self._replication_scenarios():
ts, policy, remote_policy, broker, remote_broker = setup
# create older "remote" broker
remote_broker.initialize(next(ts), remote_policy.idx)
# create "local" broker
broker.initialize(next(ts), policy.idx)
# recreate "local" broker
broker.delete_db(next(ts))
recreate_timestamp = next(ts)
broker.update_put_timestamp(recreate_timestamp)
broker.update_status_changed_at(recreate_timestamp)
def test_sync_local_recreate_policy_over_newer_remote_delete(self):
for setup in self._replication_scenarios():
ts, policy, remote_policy, broker, remote_broker = setup
# create "local" broker
broker.initialize(next(ts), policy.idx)
# create "remote" broker
remote_broker.initialize(next(ts), remote_policy.idx)
# recreate "local" broker
broker.delete_db(next(ts))
recreate_timestamp = next(ts)
broker.update_put_timestamp(recreate_timestamp)
broker.update_status_changed_at(recreate_timestamp)
# older delete "remote" broker
remote_broker.delete_db(next(ts))
def test_sync_local_recreate_policy_over_older_remote_delete(self):
for setup in self._replication_scenarios():
ts, policy, remote_policy, broker, remote_broker = setup
# create "local" broker
broker.initialize(next(ts), policy.idx)
# create "remote" broker
remote_broker.initialize(next(ts), remote_policy.idx)
# older delete "remote" broker
remote_broker.delete_db(next(ts))
# recreate "local" broker
broker.delete_db(next(ts))
recreate_timestamp = next(ts)
broker.update_put_timestamp(recreate_timestamp)
broker.update_status_changed_at(recreate_timestamp)
def test_sync_local_recreate_policy_over_older_remote_recreate(self):
for setup in self._replication_scenarios():
ts, policy, remote_policy, broker, remote_broker = setup
# create "remote" broker
remote_broker.initialize(next(ts), remote_policy.idx)
# create "local" broker
broker.initialize(next(ts), policy.idx)
# older recreate "remote" broker
remote_broker.delete_db(next(ts))
remote_recreate_timestamp = next(ts)
remote_broker.update_put_timestamp(remote_recreate_timestamp)
remote_broker.update_status_changed_at(remote_recreate_timestamp)
# recreate "local" broker
broker.delete_db(next(ts))
local_recreate_timestamp = next(ts)
broker.update_put_timestamp(local_recreate_timestamp)
broker.update_status_changed_at(local_recreate_timestamp)
def test_sync_remote_create_policy_over_newer_local_create(self):
for setup in self._replication_scenarios(remote_wins=True):
ts, policy, remote_policy, broker, remote_broker = setup
# create older "remote" broker
remote_broker.initialize(next(ts), remote_policy.idx)
# create "local" broker
broker.initialize(next(ts), policy.idx)
def test_sync_remote_create_policy_over_newer_local_delete(self):
for setup in self._replication_scenarios(remote_wins=True):
ts, policy, remote_policy, broker, remote_broker = setup
# create older "remote" broker
remote_broker.initialize(next(ts), remote_policy.idx)
# create "local" broker
broker.initialize(next(ts), policy.idx)
# delete "local" broker
broker.delete_db(next(ts))
def test_sync_remote_create_policy_over_older_local_delete(self):
# local_row & both_rows cases are covered by
# "test_sync_local_half_delete_policy_over_newer_remote_create"
for setup in self._replication_scenarios(
'no_row', 'remote_row', remote_wins=True):
ts, policy, remote_policy, broker, remote_broker = setup
# create older "local" broker
broker.initialize(next(ts), policy.idx)
# delete older "local" broker
broker.delete_db(next(ts))
# create "remote" broker
remote_broker.initialize(next(ts), remote_policy.idx)
def test_sync_remote_half_delete_policy_over_newer_local_create(self):
# no_row & both_rows cases are covered by
# "test_sync_local_create_policy_over_older_remote_delete"
for setup in self._replication_scenarios('remote_row', 'both_rows',
remote_wins=True):
ts, policy, remote_policy, broker, remote_broker = setup
# create older "remote" broker
remote_broker.initialize(next(ts), remote_policy.idx)
# half delete older "remote" broker
remote_broker.delete_db(next(ts))
# create "local" broker
broker.initialize(next(ts), policy.idx)
def test_sync_remote_recreate_policy_over_newer_local_create(self):
for setup in self._replication_scenarios(remote_wins=True):
ts, policy, remote_policy, broker, remote_broker = setup
# create "remote" broker
remote_broker.initialize(next(ts), remote_policy.idx)
# older recreate "remote" broker
remote_broker.delete_db(next(ts))
recreate_timestamp = next(ts)
remote_broker.update_put_timestamp(recreate_timestamp)
remote_broker.update_status_changed_at(recreate_timestamp)
# create "local" broker
broker.initialize(next(ts), policy.idx)
def test_sync_remote_recreate_policy_over_older_local_create(self):
for setup in self._replication_scenarios(remote_wins=True):
ts, policy, remote_policy, broker, remote_broker = setup
# create older "local" broker
broker.initialize(next(ts), policy.idx)
# create "remote" broker
remote_broker.initialize(next(ts), remote_policy.idx)
# recreate "remote" broker
remote_broker.delete_db(next(ts))
recreate_timestamp = next(ts)
remote_broker.update_put_timestamp(recreate_timestamp)
remote_broker.update_status_changed_at(recreate_timestamp)
def test_sync_remote_recreate_policy_over_newer_local_delete(self):
for setup in self._replication_scenarios(remote_wins=True):
ts, policy, remote_policy, broker, remote_broker = setup
# create "local" broker
broker.initialize(next(ts), policy.idx)
# create "remote" broker
remote_broker.initialize(next(ts), remote_policy.idx)
# recreate "remote" broker
remote_broker.delete_db(next(ts))
remote_recreate_timestamp = next(ts)
remote_broker.update_put_timestamp(remote_recreate_timestamp)
remote_broker.update_status_changed_at(remote_recreate_timestamp)
# older delete "local" broker
broker.delete_db(next(ts))
def test_sync_remote_recreate_policy_over_older_local_delete(self):
for setup in self._replication_scenarios(remote_wins=True):
ts, policy, remote_policy, broker, remote_broker = setup
# create "local" broker
broker.initialize(next(ts), policy.idx)
# create "remote" broker
remote_broker.initialize(next(ts), remote_policy.idx)
# older delete "local" broker
broker.delete_db(next(ts))
# recreate "remote" broker
remote_broker.delete_db(next(ts))
remote_recreate_timestamp = next(ts)
remote_broker.update_put_timestamp(remote_recreate_timestamp)
remote_broker.update_status_changed_at(remote_recreate_timestamp)
def test_sync_remote_recreate_policy_over_older_local_recreate(self):
for setup in self._replication_scenarios(remote_wins=True):
ts, policy, remote_policy, broker, remote_broker = setup
# create older "local" broker
broker.initialize(next(ts), policy.idx)
# create "remote" broker
remote_broker.initialize(next(ts), remote_policy.idx)
# older recreate "local" broker
broker.delete_db(next(ts))
local_recreate_timestamp = next(ts)
broker.update_put_timestamp(local_recreate_timestamp)
broker.update_status_changed_at(local_recreate_timestamp)
# recreate "remote" broker
remote_broker.delete_db(next(ts))
remote_recreate_timestamp = next(ts)
remote_broker.update_put_timestamp(remote_recreate_timestamp)
remote_broker.update_status_changed_at(remote_recreate_timestamp)
def test_sync_to_remote_with_misplaced(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
# create "local" broker
policy = random.choice(list(POLICIES))
broker = self._get_broker('a', 'c', node_index=0)
broker.initialize(next(ts), policy.idx)
# create "remote" broker
remote_policy = random.choice([p for p in POLICIES if p is not
policy])
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_broker.initialize(next(ts), remote_policy.idx)
# add misplaced row to remote_broker
remote_broker.put_object(
'/a/c/o', next(ts), 0, 'content-type',
'etag', storage_policy_index=remote_broker.storage_policy_index)
# since this row matches policy index or remote, it shows up in count
self.assertEqual(remote_broker.get_info()['object_count'], 1)
self.assertEqual([], remote_broker.get_misplaced_since(-1, 1))
# replicate
part, node = self._get_broker_part_node(broker)
daemon = self._run_once(node)
# since our local broker has no rows to push it logs as no_change
self.assertEqual(1, daemon.stats['no_change'])
self.assertEqual(0, broker.get_info()['object_count'])
# remote broker updates it's policy index; this makes the remote
# broker's object count change
info = remote_broker.get_info()
expectations = {
'object_count': 0,
'storage_policy_index': policy.idx,
}
for key, value in expectations.items():
self.assertEqual(info[key], value)
# but it also knows those objects are misplaced now
misplaced = remote_broker.get_misplaced_since(-1, 100)
self.assertEqual(len(misplaced), 1)
# we also pushed out to node 3 with rsync
self.assertEqual(1, daemon.stats['rsync'])
third_broker = self._get_broker('a', 'c', node_index=2)
info = third_broker.get_info()
for key, value in expectations.items():
self.assertEqual(info[key], value)
def test_misplaced_rows_replicate_and_enqueue(self):
# force all timestamps to fall in same hour
ts = (Timestamp(t) for t in
itertools.count(int(time.time()) // 3600 * 3600))
policy = random.choice(list(POLICIES))
broker = self._get_broker('a', 'c', node_index=0)
broker.initialize(next(ts).internal, policy.idx)
remote_policy = random.choice([p for p in POLICIES if p is not
policy])
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_broker.initialize(next(ts).internal, remote_policy.idx)
# add a misplaced row to *local* broker
obj_put_timestamp = next(ts).internal
broker.put_object(
'o', obj_put_timestamp, 0, 'content-type',
'etag', storage_policy_index=remote_policy.idx)
misplaced = broker.get_misplaced_since(-1, 10)
self.assertEqual(len(misplaced), 1)
# since this row is misplaced it doesn't show up in count
self.assertEqual(broker.get_info()['object_count'], 0)
# add another misplaced row to *local* broker with composite timestamp
ts_data = next(ts)
ts_ctype = next(ts)
ts_meta = next(ts)
broker.put_object(
'o2', ts_data.internal, 0, 'content-type',
'etag', storage_policy_index=remote_policy.idx,
ctype_timestamp=ts_ctype.internal, meta_timestamp=ts_meta.internal)
misplaced = broker.get_misplaced_since(-1, 10)
self.assertEqual(len(misplaced), 2)
# since this row is misplaced it doesn't show up in count
self.assertEqual(broker.get_info()['object_count'], 0)
# replicate
part, node = self._get_broker_part_node(broker)
daemon = self._run_once(node)
# push to remote, and third node was missing (also maybe reconciler)
self.assertTrue(2 < daemon.stats['rsync'] <= 3, daemon.stats['rsync'])
# grab the rsynced instance of remote_broker
remote_broker = self._get_broker('a', 'c', node_index=1)
# remote has misplaced rows too now
misplaced = remote_broker.get_misplaced_since(-1, 10)
self.assertEqual(len(misplaced), 2)
# and the correct policy_index and object_count
info = remote_broker.get_info()
expectations = {
'object_count': 0,
'storage_policy_index': policy.idx,
}
for key, value in expectations.items():
self.assertEqual(info[key], value)
# and we should have also enqueued these rows in a single reconciler,
# since we forced the object timestamps to be in the same hour.
reconciler = daemon.get_reconciler_broker(misplaced[0]['created_at'])
# but it may not be on the same node as us anymore though...
reconciler = self._get_broker(reconciler.account,
reconciler.container, node_index=0)
self.assertEqual(reconciler.get_info()['object_count'], 2)
objects = reconciler.list_objects_iter(
10, '', None, None, None, None, storage_policy_index=0)
self.assertEqual(len(objects), 2)
expected = ('%s:/a/c/o' % remote_policy.idx, obj_put_timestamp, 0,
'application/x-put', obj_put_timestamp)
self.assertEqual(objects[0], expected)
# the second object's listing has ts_meta as its last modified time
# but its full composite timestamp is in the hash field.
expected = ('%s:/a/c/o2' % remote_policy.idx, ts_meta.internal, 0,
'application/x-put',
encode_timestamps(ts_data, ts_ctype, ts_meta))
self.assertEqual(objects[1], expected)
# having safely enqueued to the reconciler we can advance
# our sync pointer
self.assertEqual(broker.get_reconciler_sync(), 2)
def test_multiple_out_sync_reconciler_enqueue_normalize(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
policy = random.choice(list(POLICIES))
broker = self._get_broker('a', 'c', node_index=0)
broker.initialize(next(ts), policy.idx)
remote_policy = random.choice([p for p in POLICIES if p is not
policy])
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_broker.initialize(next(ts), remote_policy.idx)
# add some rows to brokers
for db in (broker, remote_broker):
for p in (policy, remote_policy):
db.put_object('o-%s' % p.name, next(ts), 0, 'content-type',
'etag', storage_policy_index=p.idx)
db._commit_puts()
expected_policy_stats = {
policy.idx: {'object_count': 1, 'bytes_used': 0},
remote_policy.idx: {'object_count': 1, 'bytes_used': 0},
}
for db in (broker, remote_broker):
policy_stats = db.get_policy_stats()
self.assertEqual(policy_stats, expected_policy_stats)
# each db has 2 rows, 4 total
all_items = set()
for db in (broker, remote_broker):
items = db.get_items_since(-1, 4)
all_items.update(
(item['name'], item['created_at']) for item in items)
self.assertEqual(4, len(all_items))
# replicate both ways
part, node = self._get_broker_part_node(broker)
self._run_once(node)
part, node = self._get_broker_part_node(remote_broker)
self._run_once(node)
# only the latest timestamps should survive
most_recent_items = {}
for name, timestamp in all_items:
most_recent_items[name] = max(
timestamp, most_recent_items.get(name, ''))
self.assertEqual(2, len(most_recent_items))
for db in (broker, remote_broker):
items = db.get_items_since(-1, 4)
self.assertEqual(len(items), len(most_recent_items))
for item in items:
self.assertEqual(most_recent_items[item['name']],
item['created_at'])
# and the reconciler also collapses updates
reconciler_containers = set()
for item in all_items:
_name, timestamp = item
reconciler_containers.add(
get_reconciler_container_name(timestamp))
reconciler_items = set()
for reconciler_container in reconciler_containers:
for node_index in range(3):
reconciler = self._get_broker(MISPLACED_OBJECTS_ACCOUNT,
reconciler_container,
node_index=node_index)
items = reconciler.get_items_since(-1, 4)
reconciler_items.update(
(item['name'], item['created_at']) for item in items)
# they can't *both* be in the wrong policy ;)
self.assertEqual(1, len(reconciler_items))
for reconciler_name, timestamp in reconciler_items:
_policy_index, path = reconciler_name.split(':', 1)
a, c, name = path.lstrip('/').split('/')
self.assertEqual(most_recent_items[name], timestamp)
@contextmanager
def _wrap_update_reconciler_sync(self, broker, calls):
def wrapper_function(*args, **kwargs):
calls.append(args)
orig_function(*args, **kwargs)
orig_function = broker.update_reconciler_sync
broker.update_reconciler_sync = wrapper_function
try:
yield True
finally:
broker.update_reconciler_sync = orig_function
def test_post_replicate_hook(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
broker = self._get_broker('a', 'c', node_index=0)
broker.initialize(next(ts), 0)
broker.put_object('foo', next(ts), 0, 'text/plain', 'xyz', deleted=0,
storage_policy_index=0)
info = broker.get_replication_info()
self.assertEqual(1, info['max_row'])
self.assertEqual(-1, broker.get_reconciler_sync())
daemon = replicator.ContainerReplicator({})
calls = []
with self._wrap_update_reconciler_sync(broker, calls):
daemon._post_replicate_hook(broker, info, [])
self.assertEqual(1, len(calls))
# repeated call to _post_replicate_hook with no change to info
# should not call update_reconciler_sync
calls = []
with self._wrap_update_reconciler_sync(broker, calls):
daemon._post_replicate_hook(broker, info, [])
self.assertEqual(0, len(calls))
def test_update_sync_store_exception(self):
class FakeContainerSyncStore(object):
def update_sync_store(self, broker):
raise OSError(1, '1')
daemon = replicator.ContainerReplicator({}, logger=self.logger)
daemon.sync_store = FakeContainerSyncStore()
ts_iter = make_timestamp_iter()
broker = self._get_broker('a', 'c', node_index=0)
timestamp = next(ts_iter)
broker.initialize(timestamp.internal, POLICIES.default.idx)
info = broker.get_replication_info()
daemon._post_replicate_hook(broker, info, [])
log_lines = self.logger.get_lines_for_level('error')
self.assertEqual(1, len(log_lines))
self.assertIn('Failed to update sync_store', log_lines[0])
def test_update_sync_store(self):
klass = 'swift.container.sync_store.ContainerSyncStore'
daemon = replicator.ContainerReplicator({})
daemon.sync_store = sync_store.ContainerSyncStore(
daemon.root, daemon.logger, daemon.mount_check)
ts_iter = make_timestamp_iter()
broker = self._get_broker('a', 'c', node_index=0)
timestamp = next(ts_iter)
broker.initialize(timestamp.internal, POLICIES.default.idx)
info = broker.get_replication_info()
with mock.patch(klass + '.remove_synced_container') as mock_remove:
with mock.patch(klass + '.add_synced_container') as mock_add:
daemon._post_replicate_hook(broker, info, [])
self.assertEqual(0, mock_remove.call_count)
self.assertEqual(0, mock_add.call_count)
timestamp = next(ts_iter)
# sync-to and sync-key empty - remove from store
broker.update_metadata(
{'X-Container-Sync-To': ('', timestamp.internal),
'X-Container-Sync-Key': ('', timestamp.internal)})
with mock.patch(klass + '.remove_synced_container') as mock_remove:
with mock.patch(klass + '.add_synced_container') as mock_add:
daemon._post_replicate_hook(broker, info, [])
self.assertEqual(0, mock_add.call_count)
mock_remove.assert_called_once_with(broker)
timestamp = next(ts_iter)
# sync-to is not empty sync-key is empty - remove from store
broker.update_metadata(
{'X-Container-Sync-To': ('a', timestamp.internal)})
with mock.patch(klass + '.remove_synced_container') as mock_remove:
with mock.patch(klass + '.add_synced_container') as mock_add:
daemon._post_replicate_hook(broker, info, [])
self.assertEqual(0, mock_add.call_count)
mock_remove.assert_called_once_with(broker)
timestamp = next(ts_iter)
# sync-to is empty sync-key is not empty - remove from store
broker.update_metadata(
{'X-Container-Sync-To': ('', timestamp.internal),
'X-Container-Sync-Key': ('secret', timestamp.internal)})
with mock.patch(klass + '.remove_synced_container') as mock_remove:
with mock.patch(klass + '.add_synced_container') as mock_add:
daemon._post_replicate_hook(broker, info, [])
self.assertEqual(0, mock_add.call_count)
mock_remove.assert_called_once_with(broker)
timestamp = next(ts_iter)
# sync-to, sync-key both not empty - add to store
broker.update_metadata(
{'X-Container-Sync-To': ('a', timestamp.internal),
'X-Container-Sync-Key': ('secret', timestamp.internal)})
with mock.patch(klass + '.remove_synced_container') as mock_remove:
with mock.patch(klass + '.add_synced_container') as mock_add:
daemon._post_replicate_hook(broker, info, [])
mock_add.assert_called_once_with(broker)
self.assertEqual(0, mock_remove.call_count)
timestamp = next(ts_iter)
# container is removed - need to remove from store
broker.delete_db(timestamp.internal)
broker.update_metadata(
{'X-Container-Sync-To': ('a', timestamp.internal),
'X-Container-Sync-Key': ('secret', timestamp.internal)})
with mock.patch(klass + '.remove_synced_container') as mock_remove:
with mock.patch(klass + '.add_synced_container') as mock_add:
daemon._post_replicate_hook(broker, info, [])
self.assertEqual(0, mock_add.call_count)
mock_remove.assert_called_once_with(broker)
def test_sync_triggers_sync_store_update(self):
klass = 'swift.container.sync_store.ContainerSyncStore'
ts_iter = make_timestamp_iter()
# Create two containers as follows:
# broker_1 which is not set for sync
# broker_2 which is set for sync and then unset
# test that while replicating both we see no activity
# for broker_1, and the anticipated activity for broker_2
broker_1 = self._get_broker('a', 'c', node_index=0)
broker_1.initialize(next(ts_iter).internal, POLICIES.default.idx)
broker_2 = self._get_broker('b', 'd', node_index=0)
broker_2.initialize(next(ts_iter).internal, POLICIES.default.idx)
broker_2.update_metadata(
{'X-Container-Sync-To': ('a', next(ts_iter).internal),
'X-Container-Sync-Key': ('secret', next(ts_iter).internal)})
# replicate once according to broker_1
# relying on the fact that FakeRing would place both
# in the same partition.
part, node = self._get_broker_part_node(broker_1)
with mock.patch(klass + '.remove_synced_container') as mock_remove:
with mock.patch(klass + '.add_synced_container') as mock_add:
self._run_once(node)
self.assertEqual(1, mock_add.call_count)
self.assertEqual(broker_2.db_file, mock_add.call_args[0][0].db_file)
self.assertEqual(0, mock_remove.call_count)
broker_2.update_metadata(
{'X-Container-Sync-To': ('', next(ts_iter).internal)})
# replicate once this time according to broker_2
# relying on the fact that FakeRing would place both
# in the same partition.
part, node = self._get_broker_part_node(broker_2)
with mock.patch(klass + '.remove_synced_container') as mock_remove:
with mock.patch(klass + '.add_synced_container') as mock_add:
self._run_once(node)
self.assertEqual(0, mock_add.call_count)
self.assertEqual(1, mock_remove.call_count)
self.assertEqual(broker_2.db_file, mock_remove.call_args[0][0].db_file)
def test_cleanup_post_replicate(self):
broker = self._get_broker('a', 'c', node_index=0)
put_timestamp = Timestamp.now()
broker.initialize(put_timestamp.internal, POLICIES.default.idx)
orig_info = broker.get_replication_info()
daemon = replicator.ContainerReplicator({}, logger=self.logger)
# db should not be here, replication ok, deleted
res = daemon.cleanup_post_replicate(broker, orig_info, [True] * 3)
self.assertTrue(res)
self.assertFalse(os.path.exists(broker.db_file))
self.assertEqual(['Successfully deleted db %s' % broker.db_file],
daemon.logger.get_lines_for_level('debug'))
daemon.logger.clear()
# failed replication, not deleted
broker.initialize(put_timestamp.internal, POLICIES.default.idx)
orig_info = broker.get_replication_info()
res = daemon.cleanup_post_replicate(broker, orig_info,
[False, True, True])
self.assertTrue(res)
self.assertTrue(os.path.exists(broker.db_file))
self.assertEqual(['Not deleting db %s (2/3 success)' % broker.db_file],
daemon.logger.get_lines_for_level('debug'))
daemon.logger.clear()
# db has shard ranges, not deleted
broker.enable_sharding(Timestamp.now())
broker.merge_shard_ranges(
[ShardRange('.shards_a/c', Timestamp.now(), '', 'm')])
self.assertTrue(broker.sharding_required()) # sanity check
res = daemon.cleanup_post_replicate(broker, orig_info, [True] * 3)
self.assertTrue(res)
self.assertTrue(os.path.exists(broker.db_file))
self.assertEqual(
['Not deleting db %s (requires sharding, state unsharded)' %
broker.db_file],
daemon.logger.get_lines_for_level('debug'))
daemon.logger.clear()
# db sharding, not deleted
self._goto_sharding_state(broker, Timestamp.now())
self.assertTrue(broker.sharding_required()) # sanity check
orig_info = broker.get_replication_info()
res = daemon.cleanup_post_replicate(broker, orig_info, [True] * 3)
self.assertTrue(res)
self.assertTrue(os.path.exists(broker.db_file))
self.assertEqual(
['Not deleting db %s (requires sharding, state sharding)' %
broker.db_file],
daemon.logger.get_lines_for_level('debug'))
daemon.logger.clear()
# db sharded, should not be here, failed replication, not deleted
self._goto_sharded_state(broker)
self.assertFalse(broker.sharding_required()) # sanity check
res = daemon.cleanup_post_replicate(broker, orig_info,
[True, False, True])
self.assertTrue(res)
self.assertTrue(os.path.exists(broker.db_file))
self.assertEqual(['Not deleting db %s (2/3 success)' %
broker.db_file],
daemon.logger.get_lines_for_level('debug'))
daemon.logger.clear()
# db sharded, should not be here, new shard ranges (e.g. from reverse
# replication), deleted
broker.merge_shard_ranges(
[ShardRange('.shards_a/c', Timestamp.now(), '', 'm')])
res = daemon.cleanup_post_replicate(broker, orig_info, [True] * 3)
self.assertTrue(res)
self.assertFalse(os.path.exists(broker.db_file))
daemon.logger.clear()
# db sharded, should not be here, replication ok, deleted
broker.initialize(put_timestamp.internal, POLICIES.default.idx)
self.assertTrue(os.path.exists(broker.db_file))
orig_info = broker.get_replication_info()
res = daemon.cleanup_post_replicate(broker, orig_info, [True] * 3)
self.assertTrue(res)
self.assertFalse(os.path.exists(broker.db_file))
self.assertEqual(['Successfully deleted db %s' % broker.db_file],
daemon.logger.get_lines_for_level('debug'))
daemon.logger.clear()
def test_sync_shard_ranges(self):
put_timestamp = Timestamp.now().internal
# create "local" broker
broker = self._get_broker('a', 'c', node_index=0)
broker.initialize(put_timestamp, POLICIES.default.idx)
# create "remote" broker
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_broker.initialize(put_timestamp, POLICIES.default.idx)
def check_replicate(expected_shard_ranges, from_broker, to_broker):
daemon = replicator.ContainerReplicator({}, logger=FakeLogger())
part, node = self._get_broker_part_node(to_broker)
info = broker.get_replication_info()
success = daemon._repl_to_node(node, from_broker, part, info)
self.assertTrue(success)
self.assertEqual(
expected_shard_ranges,
to_broker.get_all_shard_range_data()
)
local_info = self._get_broker(
'a', 'c', node_index=0).get_info()
remote_info = self._get_broker(
'a', 'c', node_index=1).get_info()
for k, v in local_info.items():
if k == 'id':
continue
self.assertEqual(remote_info[k], v,
"mismatch remote %s %r != %r" % (
k, remote_info[k], v))
return daemon
bounds = (('', 'g'), ('g', 'r'), ('r', ''))
shard_ranges = [
ShardRange('.shards_a/sr-%s' % upper, Timestamp.now(), lower,
upper, i + 1, 10 * (i + 1))
for i, (lower, upper) in enumerate(bounds)
]
# add first two shard_ranges to both brokers
for shard_range in shard_ranges[:2]:
for db in (broker, remote_broker):
db.merge_shard_ranges(shard_range)
# now add a shard range and an object to the "local" broker only
broker.merge_shard_ranges(shard_ranges[2])
broker_ranges = broker.get_all_shard_range_data()
self.assertShardRangesEqual(shard_ranges, broker_ranges)
broker.put_object('obj', Timestamp.now().internal, 0, 'text/plain',
EMPTY_ETAG)
# sharding not yet enabled so replication not deferred
daemon = check_replicate(broker_ranges, broker, remote_broker)
self.assertEqual(0, daemon.stats['deferred'])
self.assertEqual(0, daemon.stats['no_change'])
self.assertEqual(0, daemon.stats['rsync'])
self.assertEqual(1, daemon.stats['diff'])
self.assertEqual({'diffs': 1},
daemon.logger.get_increment_counts())
# update one shard range
shard_ranges[1].update_meta(50, 50)
# sharding not yet enabled so replication not deferred, but the two
# brokers' object tables are in sync so no rsync or usync either
daemon = check_replicate(broker_ranges, broker, remote_broker)
self.assertEqual(0, daemon.stats['deferred'])
self.assertEqual(1, daemon.stats['no_change'])
self.assertEqual(0, daemon.stats['rsync'])
self.assertEqual(0, daemon.stats['diff'])
self.assertEqual({'no_changes': 1},
daemon.logger.get_increment_counts())
# now enable local broker for sharding
own_sr = broker.enable_sharding(Timestamp.now())
# update one shard range
shard_ranges[1].update_meta(13, 123)
broker.merge_shard_ranges(shard_ranges[1])
broker_ranges = broker.get_all_shard_range_data()
self.assertShardRangesEqual(shard_ranges + [own_sr], broker_ranges)
def check_stats(daemon):
self.assertEqual(1, daemon.stats['deferred'])
self.assertEqual(0, daemon.stats['no_change'])
self.assertEqual(0, daemon.stats['rsync'])
self.assertEqual(0, daemon.stats['diff'])
self.assertFalse(daemon.logger.get_increments())
daemon = check_replicate(broker_ranges, broker, remote_broker)
check_stats(daemon)
# update one shard range
shard_ranges[1].update_meta(99, 0)
broker.merge_shard_ranges(shard_ranges[1])
# sanity check
broker_ranges = broker.get_all_shard_range_data()
self.assertShardRangesEqual(shard_ranges + [own_sr], broker_ranges)
daemon = check_replicate(broker_ranges, broker, remote_broker)
check_stats(daemon)
# delete one shard range
shard_ranges[0].deleted = 1
shard_ranges[0].timestamp = Timestamp.now()
broker.merge_shard_ranges(shard_ranges[0])
# sanity check
broker_ranges = broker.get_all_shard_range_data()
self.assertShardRangesEqual(shard_ranges + [own_sr], broker_ranges)
daemon = check_replicate(broker_ranges, broker, remote_broker)
check_stats(daemon)
# put a shard range again
shard_ranges[2].timestamp = Timestamp.now()
shard_ranges[2].object_count = 0
broker.merge_shard_ranges(shard_ranges[2])
# sanity check
broker_ranges = broker.get_all_shard_range_data()
self.assertShardRangesEqual(shard_ranges + [own_sr], broker_ranges)
daemon = check_replicate(broker_ranges, broker, remote_broker)
check_stats(daemon)
# update same shard range on local and remote, remote later
shard_ranges[-1].meta_timestamp = Timestamp.now()
shard_ranges[-1].bytes_used += 1000
broker.merge_shard_ranges(shard_ranges[-1])
remote_shard_ranges = remote_broker.get_shard_ranges(
include_deleted=True)
remote_shard_ranges[-1].meta_timestamp = Timestamp.now()
remote_shard_ranges[-1].bytes_used += 2000
remote_broker.merge_shard_ranges(remote_shard_ranges[-1])
# sanity check
remote_broker_ranges = remote_broker.get_all_shard_range_data()
self.assertShardRangesEqual(remote_shard_ranges + [own_sr],
remote_broker_ranges)
self.assertShardRangesNotEqual(shard_ranges, remote_shard_ranges)
daemon = check_replicate(remote_broker_ranges, broker, remote_broker)
check_stats(daemon)
# undelete shard range *on the remote*
deleted_ranges = [sr for sr in remote_shard_ranges if sr.deleted]
self.assertEqual([shard_ranges[0]], deleted_ranges)
deleted_ranges[0].deleted = 0
deleted_ranges[0].timestamp = Timestamp.now()
remote_broker.merge_shard_ranges(deleted_ranges[0])
# sanity check
remote_broker_ranges = remote_broker.get_all_shard_range_data()
self.assertShardRangesEqual(remote_shard_ranges + [own_sr],
remote_broker_ranges)
self.assertShardRangesNotEqual(shard_ranges, remote_shard_ranges)
daemon = check_replicate(remote_broker_ranges, broker, remote_broker)
check_stats(daemon)
# reverse replication direction and expect syncs to propagate
daemon = check_replicate(remote_broker_ranges, remote_broker, broker)
check_stats(daemon)
def test_sync_shard_ranges_error(self):
# verify that replication is not considered successful if
# merge_shard_ranges fails
put_time = Timestamp.now().internal
broker = self._get_broker('a', 'c', node_index=0)
broker.initialize(put_time, POLICIES.default.idx)
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_broker.initialize(put_time, POLICIES.default.idx)
# put an object into local broker
broker.put_object('obj', Timestamp.now().internal, 0, 'text/plain',
EMPTY_ETAG)
# get an own shard range into local broker
broker.enable_sharding(Timestamp.now())
self.assertFalse(broker.sharding_initiated())
replicate_hook = mock.MagicMock()
fake_repl_connection = attach_fake_replication_rpc(
self.rpc, errors={'merge_shard_ranges': [
FakeHTTPResponse(HTTPServerError())]},
replicate_hook=replicate_hook)
db_replicator.ReplConnection = fake_repl_connection
part, node = self._get_broker_part_node(remote_broker)
info = broker.get_replication_info()
daemon = replicator.ContainerReplicator({})
daemon.logger = FakeLogger()
success = daemon._repl_to_node(node, broker, part, info)
self.assertFalse(success)
# broker only has its own shard range so expect objects to be sync'd
self.assertEqual(
['sync', 'get_shard_ranges', 'merge_shard_ranges', 'merge_items',
'merge_syncs'],
[call[0][0] for call in replicate_hook.call_args_list])
error_lines = daemon.logger.get_lines_for_level('error')
self.assertIn('Bad response 500', error_lines[0])
self.assertFalse(error_lines[1:])
self.assertEqual(1, daemon.stats['diff'])
self.assertEqual(1, daemon.logger.get_increment_counts()['diffs'])
def test_sync_shard_ranges_none_to_sync(self):
# verify that merge_shard_ranges is not sent if there are no shard
# ranges to sync
put_time = Timestamp.now().internal
broker = self._get_broker('a', 'c', node_index=0)
broker.initialize(put_time, POLICIES.default.idx)
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_broker.initialize(put_time, POLICIES.default.idx)
# put an object into local broker
broker.put_object('obj', Timestamp.now().internal, 0, 'text/plain',
EMPTY_ETAG)
replicate_hook = mock.MagicMock()
fake_repl_connection = attach_fake_replication_rpc(
self.rpc, replicate_hook=replicate_hook)
db_replicator.ReplConnection = fake_repl_connection
part, node = self._get_broker_part_node(remote_broker)
info = broker.get_replication_info()
daemon = replicator.ContainerReplicator({})
success = daemon._repl_to_node(node, broker, part, info)
self.assertTrue(success)
self.assertEqual(
['sync', 'get_shard_ranges', 'merge_items', 'merge_syncs'],
[call[0][0] for call in replicate_hook.call_args_list])
def test_sync_shard_ranges_with_rsync(self):
broker = self._get_broker('a', 'c', node_index=0)
put_timestamp = time.time()
broker.initialize(put_timestamp, POLICIES.default.idx)
bounds = (('', 'g'), ('g', 'r'), ('r', ''))
shard_ranges = [
ShardRange('.shards_a/sr-%s' % upper, Timestamp.now(), lower,
upper, i + 1, 10 * (i + 1))
for i, (lower, upper) in enumerate(bounds)
]
# add first shard range
own_sr = broker.enable_sharding(Timestamp.now())
broker.merge_shard_ranges(shard_ranges[:1])
# "replicate"
part, node = self._get_broker_part_node(broker)
daemon = self._run_once(node)
self.assertEqual(2, daemon.stats['rsync'])
# complete rsync to all other nodes
def check_replicate(expected_ranges):
for i in range(1, 3):
remote_broker = self._get_broker('a', 'c', node_index=i)
self.assertTrue(os.path.exists(remote_broker.db_file))
self.assertShardRangesEqual(
expected_ranges,
remote_broker.get_shard_ranges(include_deleted=True,
include_own=True)
)
remote_info = remote_broker.get_info()
local_info = self._get_broker(
'a', 'c', node_index=0).get_info()
for k, v in local_info.items():
if k == 'id':
continue
if k == 'hash':
self.assertEqual(remote_info[k], '0' * 32)
continue
if k == 'object_count':
self.assertEqual(remote_info[k], 0)
continue
self.assertEqual(remote_info[k], v,
"mismatch remote %s %r != %r" % (
k, remote_info[k], v))
check_replicate([shard_ranges[0], own_sr])
# delete and add some more shard ranges
shard_ranges[0].deleted = 1
shard_ranges[0].timestamp = Timestamp.now()
for shard_range in shard_ranges:
broker.merge_shard_ranges(shard_range)
daemon = self._run_once(node)
self.assertEqual(2, daemon.stats['deferred'])
check_replicate(shard_ranges + [own_sr])
def check_replicate(self, from_broker, remote_node_index, repl_conf=None,
expect_success=True):
repl_conf = repl_conf or {}
repl_calls = []
rsync_calls = []
def repl_hook(op, *sync_args):
repl_calls.append((op, sync_args))
fake_repl_connection = attach_fake_replication_rpc(
self.rpc, replicate_hook=repl_hook, errors=None)
db_replicator.ReplConnection = fake_repl_connection
daemon = replicator.ContainerReplicator(
repl_conf, logger=debug_logger())
self._install_fake_rsync_file(daemon, rsync_calls)
part, nodes = self._ring.get_nodes(from_broker.account,
from_broker.container)
def find_node(node_index):
for node in nodes:
if node['index'] == node_index:
return node
else:
self.fail('Failed to find node index %s' % remote_node_index)
remote_node = find_node(remote_node_index)
info = from_broker.get_replication_info()
success = daemon._repl_to_node(remote_node, from_broker, part, info)
self.assertEqual(expect_success, success)
return daemon, repl_calls, rsync_calls
def assert_synced_shard_ranges(self, expected, synced_items):
expected.sort(key=lambda sr: (sr.lower, sr.upper))
for item in synced_items:
item.pop('record_type', None)
self.assertEqual([dict(ex) for ex in expected], synced_items)
def assert_info_synced(self, local, remote_node_index, mismatches=None):
mismatches = mismatches or []
mismatches.append('id')
remote = self._get_broker(local.account, local.container,
node_index=remote_node_index)
local_info = local.get_info()
remote_info = remote.get_info()
errors = []
for k, v in local_info.items():
if remote_info.get(k) == v:
if k in mismatches:
errors.append(
"unexpected match remote %s %r == %r" % (
k, remote_info[k], v))
continue
else:
if k not in mismatches:
errors.append(
"unexpected mismatch remote %s %r != %r" % (
k, remote_info[k], v))
if errors:
self.fail('Found sync errors:\n' + '\n'.join(errors))
def assert_shard_ranges_synced(self, local_broker, remote_broker):
self.assertShardRangesEqual(
local_broker.get_shard_ranges(include_deleted=True,
include_own=True),
remote_broker.get_shard_ranges(include_deleted=True,
include_own=True)
)
def _setup_replication_test(self, node_index):
ts_iter = make_timestamp_iter()
policy_idx = POLICIES.default.idx
put_timestamp = Timestamp.now().internal
# create "local" broker
broker = self._get_broker('a', 'c', node_index=node_index)
broker.initialize(put_timestamp, policy_idx)
objs = [{'name': 'blah%03d' % i, 'created_at': next(ts_iter).internal,
'size': i, 'content_type': 'text/plain', 'etag': 'etag%s' % i,
'deleted': 0, 'storage_policy_index': policy_idx}
for i in range(20)]
bounds = (('', 'a'), ('a', 'b'), ('b', 'c'), ('c', ''))
shard_ranges = [
ShardRange(
'.sharded_a/sr-%s' % upper, Timestamp.now(), lower, upper)
for i, (lower, upper) in enumerate(bounds)
]
return {'broker': broker,
'objects': objs,
'shard_ranges': shard_ranges}
def _merge_object(self, broker, objects, index, **kwargs):
if not isinstance(index, slice):
index = slice(index, index + 1)
objs = [dict(obj) for obj in objects[index]]
broker.merge_items(objs)
def _merge_shard_range(self, broker, shard_ranges, index, **kwargs):
broker.merge_shard_ranges(shard_ranges[index:index + 1])
def _goto_sharding_state(self, broker, epoch):
broker.enable_sharding(epoch)
self.assertTrue(broker.set_sharding_state())
self.assertEqual(backend.SHARDING, broker.get_db_state())
def _goto_sharded_state(self, broker):
self.assertTrue(broker.set_sharded_state())
self.assertEqual(backend.SHARDED, broker.get_db_state())
def _assert_local_sharded_in_sync(self, local_broker, local_id):
daemon, repl_calls, rsync_calls = self.check_replicate(local_broker, 1)
self.assertEqual(['sync', 'get_shard_ranges', 'merge_shard_ranges'],
[call[0] for call in repl_calls])
self.assertEqual(1, daemon.stats['deferred'])
self.assertEqual(0, daemon.stats['rsync'])
self.assertEqual(0, daemon.stats['diff'])
self.assertFalse(rsync_calls)
# new db sync
self.assertEqual(local_id, repl_calls[0][1][2])
# ...but we still get a merge_shard_ranges for shard ranges
self.assert_synced_shard_ranges(
local_broker.get_shard_ranges(include_own=True),
repl_calls[2][1][0])
self.assertEqual(local_id, repl_calls[2][1][1])
def _check_only_shard_ranges_replicated(self, local_broker,
remote_node_index,
repl_conf,
expected_shard_ranges,
expect_success=True):
# expected_shard_ranges is expected final list of sync'd ranges
daemon, repl_calls, rsync_calls = self.check_replicate(
local_broker, remote_node_index, repl_conf,
expect_success=expect_success)
# we always expect only shard ranges to end in abort
self.assertEqual(1, daemon.stats['deferred'])
self.assertEqual(0, daemon.stats['diff'])
self.assertEqual(0, daemon.stats['rsync'])
self.assertEqual(['sync', 'get_shard_ranges', 'merge_shard_ranges'],
[call[0] for call in repl_calls])
self.assertFalse(rsync_calls)
# sync
local_id = local_broker.get_info()['id']
self.assertEqual(local_id, repl_calls[0][1][2])
# get_shard_ranges
self.assertEqual((), repl_calls[1][1])
# merge_shard_ranges for sending local shard ranges
self.assertShardRangesEqual(expected_shard_ranges, repl_calls[2][1][0])
self.assertEqual(local_id, repl_calls[2][1][1])
remote_broker = self._get_broker(
local_broker.account, local_broker.container, node_index=1)
self.assertNotEqual(local_id, remote_broker.get_info()['id'])
self.assert_shard_ranges_synced(remote_broker, local_broker)
def test_replication_local_unsharded_remote_missing(self):
context = self._setup_replication_test(0)
local_broker = context['broker']
local_id = local_broker.get_info()['id']
objs = context['objects']
self._merge_object(index=0, **context)
daemon, repl_calls, rsync_calls = self.check_replicate(local_broker, 1)
self.assert_info_synced(local_broker, 1)
self.assertEqual(1, daemon.stats['rsync'])
self.assertEqual(['sync', 'complete_rsync'],
[call[0] for call in repl_calls])
self.assertEqual(local_id, repl_calls[1][1][0])
self.assertEqual(os.path.basename(local_broker.db_file),
repl_calls[1][1][1])
self.assertEqual(local_broker.db_file, rsync_calls[0][0])
self.assertEqual(local_id, os.path.basename(rsync_calls[0][1]))
self.assertFalse(rsync_calls[1:])
remote_broker = self._get_broker('a', 'c', node_index=1)
self.assert_shard_ranges_synced(local_broker, remote_broker)
self.assertTrue(os.path.exists(remote_broker._db_file))
self.assertNotEqual(local_id, remote_broker.get_info()['id'])
self.assertEqual(objs[:1], remote_broker.get_objects())
def _check_replication_local_unsharded_remote_sharded(self, repl_conf):
context = self._setup_replication_test(0)
local_broker = context['broker']
local_id = local_broker.get_info()['id']
self._merge_object(index=slice(0, 6), **context)
remote_context = self._setup_replication_test(1)
self._merge_object(index=4, **remote_context)
remote_broker = remote_context['broker']
epoch = Timestamp.now()
self._goto_sharding_state(remote_broker, epoch=epoch)
remote_context['shard_ranges'][0].object_count = 101
remote_context['shard_ranges'][0].bytes_used = 1010
remote_context['shard_ranges'][0].state = ShardRange.ACTIVE
self._merge_shard_range(index=0, **remote_context)
self._merge_object(index=5, **remote_context)
self._goto_sharded_state(remote_broker)
self.assertEqual(backend.SHARDED, remote_broker.get_db_state())
self._check_only_shard_ranges_replicated(
local_broker, 1, repl_conf,
remote_broker.get_shard_ranges(include_own=True))
remote_broker = self._get_broker(
local_broker.account, local_broker.container, node_index=1)
self.assertEqual(backend.SHARDED, remote_broker.get_db_state())
self.assertFalse(os.path.exists(remote_broker._db_file))
self.assertNotEqual(local_id, remote_broker.get_info()['id'])
self.assertEqual(remote_context['objects'][5:6],
remote_broker.get_objects())
# Now that we have shard ranges, we're never considered in-sync :-/
self._check_only_shard_ranges_replicated(
local_broker, 1, repl_conf,
remote_broker.get_shard_ranges(include_own=True))
def test_replication_local_unsharded_remote_sharded(self):
self._check_replication_local_unsharded_remote_sharded({})
def test_replication_local_unsharded_remote_sharded_large_diff(self):
self._check_replication_local_unsharded_remote_sharded({'per_diff': 1})
def _check_replication_local_sharding_remote_missing(self, repl_conf):
local_context = self._setup_replication_test(0)
local_broker = local_context['broker']
self._merge_object(index=0, **local_context)
self._merge_object(index=1, **local_context)
epoch = Timestamp.now()
self._goto_sharding_state(local_broker, epoch)
self._merge_shard_range(index=0, **local_context)
self._merge_object(index=slice(2, 8), **local_context)
objs = local_context['objects']
daemon, repl_calls, rsync_calls = self.check_replicate(
local_broker, 1, repl_conf=repl_conf)
self.assertEqual(['sync', 'complete_rsync'],
[call[0] for call in repl_calls])
self.assertEqual(1, daemon.stats['rsync'])
self.assertEqual(0, daemon.stats['deferred'])
self.assertEqual(0, daemon.stats['diff'])
# fresh db is sync'd first...
fresh_id = local_broker.get_info()['id']
self.assertEqual(fresh_id, repl_calls[0][1][2])
self.assertEqual(fresh_id, repl_calls[1][1][0])
# retired db is not sync'd at all
old_broker = self.backend(
local_broker._db_file, account=local_broker.account,
container=local_broker.container, force_db_file=True)
old_id = old_broker.get_info()['id']
bad_calls = []
for call in repl_calls:
if old_id in call[1]:
bad_calls.append(
'old db id %r in %r call args %r' % (
old_id, call[0], call[1]))
if bad_calls:
self.fail('Found some bad calls:\n' + '\n'.join(bad_calls))
# complete_rsync
self.assertEqual(os.path.basename(local_broker.db_file),
repl_calls[1][1][1])
self.assertEqual(local_broker.db_file, rsync_calls[0][0])
self.assertEqual(fresh_id, os.path.basename(rsync_calls[0][1]))
self.assertFalse(rsync_calls[1:])
# TODO: make these stats better; in sharding state local broker pulls
# stats for 2 objects from old db, whereas remote thinks it's sharded
# and has an empty shard range table
self.assert_info_synced(local_broker, 1, mismatches=[
'object_count', 'bytes_used', 'db_state'])
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_id = remote_broker.get_info()['id']
self.assertNotEqual(old_id, remote_id)
self.assertNotEqual(fresh_id, remote_id)
self.assertEqual(
[remote_broker.db_file], get_db_files(remote_broker.db_file))
self.assertEqual(os.path.basename(remote_broker.db_file),
os.path.basename(local_broker.db_file))
self.assertEqual(epoch, remote_broker.db_epoch)
# remote db has only the misplaced objects
self.assertEqual(objs[2:8], remote_broker.get_objects())
self.assert_shard_ranges_synced(local_broker, remote_broker)
# replicate again, check asserts abort
self._check_only_shard_ranges_replicated(
local_broker, 1, repl_conf,
local_broker.get_shard_ranges(include_own=True))
# sanity
remote_broker = self._get_broker('a', 'c', node_index=1)
self.assertEqual(
[remote_broker.db_file], get_db_files(remote_broker.db_file))
self.assertEqual(os.path.basename(remote_broker.db_file),
os.path.basename(local_broker.db_file))
self.assertEqual(objs[2:8], remote_broker.get_objects())
self.assertEqual(epoch, remote_broker.db_epoch)
def test_replication_local_sharding_remote_missing(self):
self._check_replication_local_sharding_remote_missing({})
def test_replication_local_sharding_remote_missing_large_diff(self):
# the local shard db has large diff with respect to the old db
self._check_replication_local_sharding_remote_missing({'per_diff': 1})
def _check_replication_local_sharding_remote_unsharded(self, repl_conf):
local_context = self._setup_replication_test(0)
self._merge_object(index=slice(0, 3), **local_context)
local_broker = local_context['broker']
epoch = Timestamp.now()
self._goto_sharding_state(local_broker, epoch)
self._merge_shard_range(index=0, **local_context)
self._merge_object(index=slice(3, 11), **local_context)
remote_context = self._setup_replication_test(1)
self._merge_object(index=11, **remote_context)
self._check_only_shard_ranges_replicated(
local_broker, 1, repl_conf,
local_broker.get_shard_ranges(include_own=True))
remote_broker = self._get_broker('a', 'c', node_index=1)
self.assertEqual(
[remote_broker._db_file], get_db_files(remote_broker.db_file))
self.assertEqual(remote_context['objects'][11:12],
remote_broker.get_objects())
self.assert_info_synced(
local_broker, 1,
mismatches=['db_state', 'object_count', 'bytes_used',
'status_changed_at', 'hash'])
self._check_only_shard_ranges_replicated(
local_broker, 1, repl_conf,
local_broker.get_shard_ranges(include_own=True))
def test_replication_local_sharding_remote_unsharded(self):
self._check_replication_local_sharding_remote_unsharded({})
def test_replication_local_sharding_remote_unsharded_large_diff(self):
self._check_replication_local_sharding_remote_unsharded(
{'per_diff': 1})
def _check_only_sync(self, local_broker, remote_node_index, repl_conf):
daemon, repl_calls, rsync_calls = self.check_replicate(
local_broker, remote_node_index, repl_conf,
expect_success=False)
# When talking to an old (pre-2.18.0) container server, abort
# replication when we're sharding or sharded. Wait for the
# rolling upgrade that's presumably in-progress to finish instead.
self.assertEqual(1, daemon.stats['deferred'])
self.assertEqual(0, daemon.stats['diff'])
self.assertEqual(0, daemon.stats['rsync'])
self.assertEqual(['sync'],
[call[0] for call in repl_calls])
self.assertFalse(rsync_calls)
lines = daemon.logger.get_lines_for_level('warning')
self.assertIn('unable to replicate shard ranges', lines[0])
self.assertIn('refusing to replicate objects', lines[1])
self.assertFalse(lines[2:])
# sync
local_id = local_broker.get_info()['id']
self.assertEqual(local_id, repl_calls[0][1][2])
remote_broker = self._get_broker(
local_broker.account, local_broker.container, node_index=1)
self.assertNotEqual(local_id, remote_broker.get_info()['id'])
self.assertEqual([], remote_broker.get_shard_ranges())
def _check_replication_local_sharding_remote_presharding(self, repl_conf):
local_context = self._setup_replication_test(0)
self._merge_object(index=slice(0, 3), **local_context)
local_broker = local_context['broker']
epoch = Timestamp.now()
self._goto_sharding_state(local_broker, epoch)
self._merge_shard_range(index=0, **local_context)
self._merge_object(index=slice(3, 11), **local_context)
remote_context = self._setup_replication_test(1)
self._merge_object(index=11, **remote_context)
orig_get_remote_info = \
replicator.ContainerReplicatorRpc._get_synced_replication_info
def presharding_get_remote_info(*args):
rinfo = orig_get_remote_info(*args)
del rinfo['shard_max_row']
return rinfo
with mock.patch('swift.container.replicator.'
'ContainerReplicatorRpc._get_synced_replication_info',
presharding_get_remote_info):
self._check_only_sync(local_broker, 1, repl_conf)
remote_broker = self._get_broker('a', 'c', node_index=1)
self.assertEqual(
[remote_broker._db_file], get_db_files(remote_broker.db_file))
self.assertEqual(remote_context['objects'][11:12],
remote_broker.get_objects())
self.assert_info_synced(
local_broker, 1,
mismatches=['db_state', 'object_count', 'bytes_used',
'status_changed_at', 'hash'])
self._check_only_sync(local_broker, 1, repl_conf)
def test_replication_local_sharding_remote_presharding(self):
self._check_replication_local_sharding_remote_presharding({})
def test_replication_local_sharding_remote_presharding_large_diff(self):
self._check_replication_local_sharding_remote_presharding(
{'per_diff': 1})
def _check_replication_local_sharding_remote_sharding(self, repl_conf):
local_context = self._setup_replication_test(0)
self._merge_object(index=slice(0, 5), **local_context)
local_broker = local_context['broker']
epoch = Timestamp.now()
self._goto_sharding_state(local_broker, epoch)
self._merge_shard_range(index=0, **local_context)
self._merge_object(index=slice(5, 10), **local_context)
remote_context = self._setup_replication_test(1)
self._merge_object(index=12, **remote_context)
# take snapshot of info now before transition to sharding...
orig_remote_info = remote_context['broker'].get_info()
remote_broker = remote_context['broker']
self._goto_sharding_state(remote_broker, epoch)
self._merge_shard_range(index=0, **remote_context)
self._merge_object(index=13, **remote_context)
self._check_only_shard_ranges_replicated(
local_broker, 1, repl_conf,
remote_broker.get_shard_ranges(include_own=True))
# in sharding state brokers only reports object stats from old db, and
# they are different
self.assert_info_synced(
local_broker, 1, mismatches=['object_count', 'bytes_used',
'status_changed_at', 'hash'])
remote_broker = self._get_broker('a', 'c', node_index=1)
shard_db = make_db_file_path(remote_broker._db_file, epoch)
self.assertEqual([remote_broker._db_file, shard_db],
get_db_files(remote_broker.db_file))
shard_db = make_db_file_path(remote_broker._db_file, epoch)
self.assertEqual([remote_broker._db_file, shard_db],
get_db_files(remote_broker.db_file))
# no local objects have been sync'd to remote shard db
self.assertEqual(remote_context['objects'][13:14],
remote_broker.get_objects())
# remote *old db* is unchanged
remote_old_broker = self.backend(
remote_broker._db_file, account=remote_broker.account,
container=remote_broker.container, force_db_file=True)
self.assertEqual(remote_context['objects'][12:13],
remote_old_broker.get_objects())
self.assertFalse(remote_old_broker.get_shard_ranges())
remote_old_info = remote_old_broker.get_info()
orig_remote_info.pop('db_state')
remote_old_info.pop('db_state')
self.assertEqual(orig_remote_info, remote_old_info)
self._check_only_shard_ranges_replicated(
local_broker, 1, repl_conf,
local_broker.get_shard_ranges(include_own=True))
def test_replication_local_sharding_remote_sharding(self):
self._check_replication_local_sharding_remote_sharding({})
def test_replication_local_sharding_remote_sharding_large_diff(self):
self._check_replication_local_sharding_remote_sharding({'per_diff': 1})
def test_replication_local_sharded_remote_missing(self):
local_context = self._setup_replication_test(0)
local_broker = local_context['broker']
epoch = Timestamp.now()
self._goto_sharding_state(local_broker, epoch)
local_context['shard_ranges'][0].object_count = 99
local_context['shard_ranges'][0].state = ShardRange.ACTIVE
self._merge_shard_range(index=0, **local_context)
self._merge_object(index=slice(0, 3), **local_context)
self._goto_sharded_state(local_broker)
objs = local_context['objects']
daemon, repl_calls, rsync_calls = self.check_replicate(local_broker, 1)
self.assertEqual(['sync', 'complete_rsync'],
[call[0] for call in repl_calls])
self.assertEqual(1, daemon.stats['rsync'])
# sync
local_id = local_broker.get_info()['id']
self.assertEqual(local_id, repl_calls[0][1][2])
# complete_rsync
self.assertEqual(local_id, repl_calls[1][1][0])
self.assertEqual(
os.path.basename(local_broker.db_file), repl_calls[1][1][1])
self.assertEqual(local_broker.db_file, rsync_calls[0][0])
self.assertEqual(local_id, os.path.basename(rsync_calls[0][1]))
self.assertFalse(rsync_calls[1:])
self.assert_info_synced(local_broker, 1)
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_id = remote_broker.get_info()['id']
self.assertNotEqual(local_id, remote_id)
shard_db = make_db_file_path(remote_broker._db_file, epoch)
self.assertEqual([shard_db],
get_db_files(remote_broker.db_file))
self.assertEqual(objs[:3], remote_broker.get_objects())
self.assertEqual(local_broker.get_shard_ranges(),
remote_broker.get_shard_ranges())
# sanity check - in sync
self._assert_local_sharded_in_sync(local_broker, local_id)
remote_broker = self._get_broker('a', 'c', node_index=1)
shard_db = make_db_file_path(remote_broker._db_file, epoch)
self.assertEqual([shard_db],
get_db_files(remote_broker.db_file))
# the remote broker object_count comes from replicated shard range...
self.assertEqual(99, remote_broker.get_info()['object_count'])
# these are replicated misplaced objects...
self.assertEqual(objs[:3], remote_broker.get_objects())
self.assertEqual(local_broker.get_shard_ranges(),
remote_broker.get_shard_ranges())
def _check_replication_local_sharded_remote_unsharded(self, repl_conf):
local_context = self._setup_replication_test(0)
local_broker = local_context['broker']
epoch = Timestamp.now()
self._goto_sharding_state(local_broker, epoch)
local_context['shard_ranges'][0].object_count = 99
local_context['shard_ranges'][0].state = ShardRange.ACTIVE
self._merge_shard_range(index=0, **local_context)
self._merge_object(index=slice(0, 3), **local_context)
self._goto_sharded_state(local_broker)
remote_context = self._setup_replication_test(1)
self._merge_object(index=4, **remote_context)
self._check_only_shard_ranges_replicated(
local_broker, 1, repl_conf,
local_broker.get_shard_ranges(include_own=True),
expect_success=True)
# sharded broker takes object count from shard range whereas remote
# unsharded broker takes it from object table
self.assert_info_synced(
local_broker, 1,
mismatches=['db_state', 'object_count', 'bytes_used',
'status_changed_at', 'hash'])
remote_broker = self._get_broker('a', 'c', node_index=1)
self.assertEqual([remote_broker._db_file],
get_db_files(remote_broker.db_file))
self.assertEqual(remote_context['objects'][4:5],
remote_broker.get_objects())
self._check_only_shard_ranges_replicated(
local_broker, 1, repl_conf,
local_broker.get_shard_ranges(include_own=True),
expect_success=True)
remote_broker = self._get_broker('a', 'c', node_index=1)
self.assertEqual([remote_broker._db_file],
get_db_files(remote_broker.db_file))
self.assertEqual(remote_context['objects'][4:5],
remote_broker.get_objects())
def test_replication_local_sharded_remote_unsharded(self):
self._check_replication_local_sharded_remote_unsharded({})
def test_replication_local_sharded_remote_unsharded_large_diff(self):
self._check_replication_local_sharded_remote_unsharded({'per_diff': 1})
def _check_replication_local_sharded_remote_sharding(self, repl_conf):
local_context = self._setup_replication_test(0)
local_broker = local_context['broker']
epoch = Timestamp.now()
self._goto_sharding_state(local_broker, epoch=epoch)
local_context['shard_ranges'][0].object_count = 99
local_context['shard_ranges'][0].bytes_used = 999
local_context['shard_ranges'][0].state = ShardRange.ACTIVE
self._merge_shard_range(index=0, **local_context)
self._merge_object(index=slice(0, 5), **local_context)
self._goto_sharded_state(local_broker)
remote_context = self._setup_replication_test(1)
self._merge_object(index=6, **remote_context)
remote_broker = remote_context['broker']
remote_info_orig = remote_broker.get_info()
self._goto_sharding_state(remote_broker, epoch=epoch)
self._merge_shard_range(index=0, **remote_context)
self._merge_object(index=7, **remote_context)
self._check_only_shard_ranges_replicated(
local_broker, 1, repl_conf,
# remote has newer timestamp for shard range
remote_broker.get_shard_ranges(include_own=True),
expect_success=True)
# sharded broker takes object count from shard range whereas remote
# sharding broker takes it from object table
self.assert_info_synced(
local_broker, 1,
mismatches=['db_state', 'object_count', 'bytes_used',
'status_changed_at', 'hash'])
remote_broker = self._get_broker('a', 'c', node_index=1)
shard_db = make_db_file_path(remote_broker._db_file, epoch)
self.assertEqual([remote_broker._db_file, shard_db],
get_db_files(remote_broker.db_file))
# remote fresh db objects are unchanged
self.assertEqual(remote_context['objects'][7:8],
remote_broker.get_objects())
# remote old hash.db objects are unchanged
remote_old_broker = self.backend(
remote_broker._db_file, account=remote_broker.account,
container=remote_broker.container, force_db_file=True)
self.assertEqual(
remote_context['objects'][6:7],
remote_old_broker.get_objects())
remote_info = remote_old_broker.get_info()
remote_info_orig.pop('db_state')
remote_info.pop('db_state')
self.assertEqual(remote_info_orig, remote_info)
self.assertEqual(local_broker.get_shard_ranges(),
remote_broker.get_shard_ranges())
self._check_only_shard_ranges_replicated(
local_broker, 1, repl_conf,
remote_broker.get_shard_ranges(include_own=True),
expect_success=True)
def test_replication_local_sharded_remote_sharding(self):
self._check_replication_local_sharded_remote_sharding({})
def test_replication_local_sharded_remote_sharding_large_diff(self):
self._check_replication_local_sharded_remote_sharding({'per_diff': 1})
def _check_replication_local_sharded_remote_sharded(self, repl_conf):
local_context = self._setup_replication_test(0)
local_broker = local_context['broker']
epoch = Timestamp.now()
self._goto_sharding_state(local_broker, epoch)
local_context['shard_ranges'][0].object_count = 99
local_context['shard_ranges'][0].bytes_used = 999
local_context['shard_ranges'][0].state = ShardRange.ACTIVE
self._merge_shard_range(index=0, **local_context)
self._merge_object(index=slice(0, 6), **local_context)
self._goto_sharded_state(local_broker)
remote_context = self._setup_replication_test(1)
self._merge_object(index=6, **remote_context)
remote_broker = remote_context['broker']
self._goto_sharding_state(remote_broker, epoch)
remote_context['shard_ranges'][0].object_count = 101
remote_context['shard_ranges'][0].bytes_used = 1010
remote_context['shard_ranges'][0].state = ShardRange.ACTIVE
self._merge_shard_range(index=0, **remote_context)
self._merge_object(index=7, **remote_context)
self._goto_sharded_state(remote_broker)
self._check_only_shard_ranges_replicated(
local_broker, 1, repl_conf,
# remote has newer timestamp for shard range
remote_broker.get_shard_ranges(include_own=True),
expect_success=True)
self.assert_info_synced(
local_broker, 1,
mismatches=['status_changed_at', 'hash'])
remote_broker = self._get_broker('a', 'c', node_index=1)
shard_db = make_db_file_path(remote_broker._db_file, epoch)
self.assertEqual([shard_db],
get_db_files(remote_broker.db_file))
self.assertEqual(remote_context['objects'][7:8],
remote_broker.get_objects())
# remote shard range was newer than local so object count is not
# updated by sync'd shard range
self.assertEqual(
101, remote_broker.get_shard_ranges()[0].object_count)
self._check_only_shard_ranges_replicated(
local_broker, 1, repl_conf,
# remote has newer timestamp for shard range
remote_broker.get_shard_ranges(include_own=True),
expect_success=True)
def test_replication_local_sharded_remote_sharded(self):
self._check_replication_local_sharded_remote_sharded({})
def test_replication_local_sharded_remote_sharded_large_diff(self):
self._check_replication_local_sharded_remote_sharded({'per_diff': 1})
def test_replication_rsync_then_merge_aborts_before_merge_sharding(self):
# verify that rsync_then_merge aborts if remote starts sharding during
# the rsync
local_context = self._setup_replication_test(0)
local_broker = local_context['broker']
self._merge_object(index=slice(0, 3), **local_context)
remote_context = self._setup_replication_test(1)
remote_broker = remote_context['broker']
remote_broker.logger = debug_logger()
self._merge_object(index=5, **remote_context)
orig_func = replicator.ContainerReplicatorRpc.rsync_then_merge
def mock_rsync_then_merge(*args):
remote_broker.merge_shard_ranges(
ShardRange('.shards_a/cc', Timestamp.now()))
self._goto_sharding_state(remote_broker, Timestamp.now())
return orig_func(*args)
with mock.patch(
'swift.container.replicator.ContainerReplicatorRpc.'
'rsync_then_merge',
mock_rsync_then_merge):
with mock.patch(
'swift.container.backend.ContainerBroker.'
'get_items_since') as mock_get_items_since:
daemon, repl_calls, rsync_calls = self.check_replicate(
local_broker, 1, expect_success=False,
repl_conf={'per_diff': 1})
mock_get_items_since.assert_not_called()
self.assertEqual(['sync', 'get_shard_ranges', 'rsync_then_merge'],
[call[0] for call in repl_calls])
self.assertEqual(local_broker.db_file, rsync_calls[0][0])
self.assertEqual(local_broker.get_info()['id'],
os.path.basename(rsync_calls[0][1]))
self.assertFalse(rsync_calls[1:])
def test_replication_rsync_then_merge_aborts_before_merge_sharded(self):
# verify that rsync_then_merge aborts if remote completes sharding
# during the rsync
local_context = self._setup_replication_test(0)
local_broker = local_context['broker']
self._merge_object(index=slice(0, 3), **local_context)
remote_context = self._setup_replication_test(1)
remote_broker = remote_context['broker']
remote_broker.logger = debug_logger()
self._merge_object(index=5, **remote_context)
orig_func = replicator.ContainerReplicatorRpc.rsync_then_merge
def mock_rsync_then_merge(*args):
remote_broker.merge_shard_ranges(
ShardRange('.shards_a/cc', Timestamp.now()))
self._goto_sharding_state(remote_broker, Timestamp.now())
self._goto_sharded_state(remote_broker)
return orig_func(*args)
with mock.patch(
'swift.container.replicator.ContainerReplicatorRpc.'
'rsync_then_merge',
mock_rsync_then_merge):
with mock.patch(
'swift.container.backend.ContainerBroker.'
'get_items_since') as mock_get_items_since:
daemon, repl_calls, rsync_calls = self.check_replicate(
local_broker, 1, expect_success=False,
repl_conf={'per_diff': 1})
mock_get_items_since.assert_not_called()
self.assertEqual(['sync', 'get_shard_ranges', 'rsync_then_merge'],
[call[0] for call in repl_calls])
self.assertEqual(local_broker.db_file, rsync_calls[0][0])
self.assertEqual(local_broker.get_info()['id'],
os.path.basename(rsync_calls[0][1]))
self.assertFalse(rsync_calls[1:])
def test_replication_rsync_then_merge_aborts_after_merge_sharding(self):
# verify that rsync_then_merge aborts if remote starts sharding during
# the merge
local_context = self._setup_replication_test(0)
local_broker = local_context['broker']
self._merge_object(index=slice(0, 3), **local_context)
remote_context = self._setup_replication_test(1)
remote_broker = remote_context['broker']
remote_broker.logger = debug_logger()
self._merge_object(index=5, **remote_context)
orig_get_items_since = backend.ContainerBroker.get_items_since
calls = []
def fake_get_items_since(broker, *args):
# remote starts sharding while rpc call is merging
if not calls:
remote_broker.merge_shard_ranges(
ShardRange('.shards_a/cc', Timestamp.now()))
self._goto_sharding_state(remote_broker, Timestamp.now())
calls.append(args)
return orig_get_items_since(broker, *args)
to_patch = 'swift.container.backend.ContainerBroker.get_items_since'
with mock.patch(to_patch, fake_get_items_since), \
mock.patch('swift.common.db_replicator.sleep'), \
mock.patch('swift.container.backend.tpool.execute',
lambda func, *args: func(*args)):
# For some reason, on py3 we start popping Timeouts
# if we let eventlet trampoline...
daemon, repl_calls, rsync_calls = self.check_replicate(
local_broker, 1, expect_success=False,
repl_conf={'per_diff': 1})
self.assertEqual(['sync', 'get_shard_ranges', 'rsync_then_merge'],
[call[0] for call in repl_calls])
self.assertEqual(local_broker.db_file, rsync_calls[0][0])
self.assertEqual(local_broker.get_info()['id'],
os.path.basename(rsync_calls[0][1]))
self.assertFalse(rsync_calls[1:])
def test_replication_rsync_then_merge_aborts_after_merge_sharded(self):
# verify that rsync_then_merge aborts if remote completes sharding
# during the merge
local_context = self._setup_replication_test(0)
local_broker = local_context['broker']
self._merge_object(index=slice(0, 3), **local_context)
remote_context = self._setup_replication_test(1)
remote_broker = remote_context['broker']
remote_broker.logger = debug_logger()
self._merge_object(index=5, **remote_context)
orig_get_items_since = backend.ContainerBroker.get_items_since
calls = []
def fake_get_items_since(broker, *args):
# remote starts sharding while rpc call is merging
result = orig_get_items_since(broker, *args)
if calls:
remote_broker.merge_shard_ranges(
ShardRange('.shards_a/cc', Timestamp.now()))
self._goto_sharding_state(remote_broker, Timestamp.now())
self._goto_sharded_state(remote_broker)
calls.append(args)
return result
to_patch = 'swift.container.backend.ContainerBroker.get_items_since'
with mock.patch(to_patch, fake_get_items_since), \
mock.patch('swift.common.db_replicator.sleep'), \
mock.patch('swift.container.backend.tpool.execute',
lambda func, *args: func(*args)):
# For some reason, on py3 we start popping Timeouts
# if we let eventlet trampoline...
daemon, repl_calls, rsync_calls = self.check_replicate(
local_broker, 1, expect_success=False,
repl_conf={'per_diff': 1})
self.assertEqual(['sync', 'get_shard_ranges', 'rsync_then_merge'],
[call[0] for call in repl_calls])
self.assertEqual(local_broker.db_file, rsync_calls[0][0])
self.assertEqual(local_broker.get_info()['id'],
os.path.basename(rsync_calls[0][1]))
self.assertFalse(rsync_calls[1:])
if __name__ == '__main__':
unittest.main()