Merge "py3: port account/container replicators"
This commit is contained in:
commit
168dc91bd9
@ -997,6 +997,7 @@ class DatabaseBroker(object):
|
||||
timestamp will be removed.
|
||||
:returns: True if conn.commit() should be called
|
||||
"""
|
||||
timestamp = Timestamp(timestamp)
|
||||
try:
|
||||
row = conn.execute('SELECT metadata FROM %s_stat' %
|
||||
self.db_type).fetchone()
|
||||
@ -1008,7 +1009,7 @@ class DatabaseBroker(object):
|
||||
md = json.loads(md)
|
||||
keys_to_delete = []
|
||||
for key, (value, value_timestamp) in md.items():
|
||||
if value == '' and value_timestamp < timestamp:
|
||||
if value == '' and Timestamp(value_timestamp) < timestamp:
|
||||
keys_to_delete.append(key)
|
||||
if keys_to_delete:
|
||||
for key in keys_to_delete:
|
||||
|
@ -476,7 +476,7 @@ class Replicator(Daemon):
|
||||
elif response.status == HTTP_INSUFFICIENT_STORAGE:
|
||||
raise DriveNotMounted()
|
||||
elif 200 <= response.status < 300:
|
||||
rinfo = json.loads(response.data)
|
||||
rinfo = json.loads(response.data.decode('ascii'))
|
||||
local_sync = broker.get_sync(rinfo['id'], incoming=False)
|
||||
if rinfo.get('metadata', ''):
|
||||
broker.update_metadata(json.loads(rinfo['metadata']))
|
||||
|
@ -1266,7 +1266,7 @@ class ContainerBroker(DatabaseBroker):
|
||||
:param source: if defined, update incoming_sync with the source
|
||||
"""
|
||||
for item in item_list:
|
||||
if isinstance(item['name'], six.text_type):
|
||||
if six.PY2 and isinstance(item['name'], six.text_type):
|
||||
item['name'] = item['name'].encode('utf-8')
|
||||
|
||||
def _really_really_merge_items(conn):
|
||||
@ -1362,7 +1362,7 @@ class ContainerBroker(DatabaseBroker):
|
||||
if isinstance(item, ShardRange):
|
||||
item = dict(item)
|
||||
for col in ('name', 'lower', 'upper'):
|
||||
if isinstance(item[col], six.text_type):
|
||||
if six.PY2 and isinstance(item[col], six.text_type):
|
||||
item[col] = item[col].encode('utf-8')
|
||||
item_list.append(item)
|
||||
|
||||
|
@ -95,12 +95,12 @@ def incorrect_policy_index(info, remote_info):
|
||||
"""
|
||||
if 'storage_policy_index' not in remote_info:
|
||||
return False
|
||||
if remote_info['storage_policy_index'] == \
|
||||
info['storage_policy_index']:
|
||||
if remote_info['storage_policy_index'] == info['storage_policy_index']:
|
||||
return False
|
||||
|
||||
return info['storage_policy_index'] != sorted(
|
||||
[info, remote_info], cmp=cmp_policy_info)[0]['storage_policy_index']
|
||||
# Only return True if remote_info has the better data;
|
||||
# see the docstring for cmp_policy_info
|
||||
return cmp_policy_info(info, remote_info) > 0
|
||||
|
||||
|
||||
def translate_container_headers_to_info(headers):
|
||||
|
@ -66,7 +66,7 @@ class ContainerReplicator(db_replicator.Replicator):
|
||||
def _handle_sync_response(self, node, response, info, broker, http,
|
||||
different_region=False):
|
||||
if is_success(response.status):
|
||||
remote_info = json.loads(response.data)
|
||||
remote_info = json.loads(response.data.decode('ascii'))
|
||||
if incorrect_policy_index(info, remote_info):
|
||||
status_changed_at = Timestamp.now()
|
||||
broker.set_storage_policy_index(
|
||||
@ -129,7 +129,8 @@ class ContainerReplicator(db_replicator.Replicator):
|
||||
def _fetch_and_merge_shard_ranges(self, http, broker):
|
||||
response = http.replicate('get_shard_ranges')
|
||||
if is_success(response.status):
|
||||
broker.merge_shard_ranges(json.loads(response.data))
|
||||
broker.merge_shard_ranges(json.loads(
|
||||
response.data.decode('ascii')))
|
||||
|
||||
def find_local_handoff_for_part(self, part):
|
||||
"""
|
||||
|
@ -29,6 +29,7 @@ import json
|
||||
|
||||
import mock
|
||||
from mock import patch, call
|
||||
import six
|
||||
from six.moves import reload_module
|
||||
|
||||
from swift.container.backend import DATADIR
|
||||
@ -159,6 +160,8 @@ def _mock_process(*args):
|
||||
|
||||
class ReplHttp(object):
|
||||
def __init__(self, response=None, set_status=200):
|
||||
if isinstance(response, six.text_type):
|
||||
response = response.encode('ascii')
|
||||
self.response = response
|
||||
self.set_status = set_status
|
||||
replicated = False
|
||||
@ -689,7 +692,7 @@ class TestDBReplicator(unittest.TestCase):
|
||||
class FakeResponse(object):
|
||||
def __init__(self, status, rinfo):
|
||||
self._status = status
|
||||
self.data = json.dumps(rinfo)
|
||||
self.data = json.dumps(rinfo).encode('ascii')
|
||||
|
||||
@property
|
||||
def status(self):
|
||||
|
@ -33,11 +33,10 @@ from swift.common.storage_policy import POLICIES
|
||||
|
||||
from test.unit.common import test_db_replicator
|
||||
from test.unit import patch_policies, make_timestamp_iter, mock_check_drive, \
|
||||
debug_logger, EMPTY_ETAG, FakeLogger
|
||||
debug_logger, EMPTY_ETAG, FakeLogger, attach_fake_replication_rpc, \
|
||||
FakeHTTPResponse
|
||||
from contextlib import contextmanager
|
||||
|
||||
from test.unit.common.test_db_replicator import attach_fake_replication_rpc
|
||||
|
||||
|
||||
@patch_policies
|
||||
class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
@ -967,7 +966,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
most_recent_items = {}
|
||||
for name, timestamp in all_items:
|
||||
most_recent_items[name] = max(
|
||||
timestamp, most_recent_items.get(name, -1))
|
||||
timestamp, most_recent_items.get(name, ''))
|
||||
self.assertEqual(2, len(most_recent_items))
|
||||
|
||||
for db in (broker, remote_broker):
|
||||
@ -1415,7 +1414,8 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
|
||||
replicate_hook = mock.MagicMock()
|
||||
fake_repl_connection = attach_fake_replication_rpc(
|
||||
self.rpc, errors={'merge_shard_ranges': [HTTPServerError()]},
|
||||
self.rpc, errors={'merge_shard_ranges': [
|
||||
FakeHTTPResponse(HTTPServerError())]},
|
||||
replicate_hook=replicate_hook)
|
||||
db_replicator.ReplConnection = fake_repl_connection
|
||||
part, node = self._get_broker_part_node(remote_broker)
|
||||
@ -1518,7 +1518,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
check_replicate(shard_ranges + [own_sr])
|
||||
|
||||
def check_replicate(self, from_broker, remote_node_index, repl_conf=None,
|
||||
expect_success=True, errors=None):
|
||||
expect_success=True):
|
||||
repl_conf = repl_conf or {}
|
||||
repl_calls = []
|
||||
rsync_calls = []
|
||||
@ -1527,7 +1527,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
repl_calls.append((op, sync_args))
|
||||
|
||||
fake_repl_connection = attach_fake_replication_rpc(
|
||||
self.rpc, replicate_hook=repl_hook, errors=errors)
|
||||
self.rpc, replicate_hook=repl_hook, errors=None)
|
||||
db_replicator.ReplConnection = fake_repl_connection
|
||||
daemon = replicator.ContainerReplicator(
|
||||
repl_conf, logger=debug_logger())
|
||||
@ -2316,9 +2316,13 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
calls.append(args)
|
||||
return orig_get_items_since(broker, *args)
|
||||
|
||||
with mock.patch(
|
||||
'swift.container.backend.ContainerBroker.get_items_since',
|
||||
fake_get_items_since):
|
||||
to_patch = 'swift.container.backend.ContainerBroker.get_items_since'
|
||||
with mock.patch(to_patch, fake_get_items_since), \
|
||||
mock.patch('swift.common.db_replicator.sleep'), \
|
||||
mock.patch('swift.container.backend.tpool.execute',
|
||||
lambda func, *args: func(*args)):
|
||||
# For some reason, on py3 we start popping Timeouts
|
||||
# if we let eventlet trampoline...
|
||||
daemon, repl_calls, rsync_calls = self.check_replicate(
|
||||
local_broker, 1, expect_success=False,
|
||||
repl_conf={'per_diff': 1})
|
||||
@ -2355,9 +2359,13 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
calls.append(args)
|
||||
return result
|
||||
|
||||
with mock.patch(
|
||||
'swift.container.backend.ContainerBroker.get_items_since',
|
||||
fake_get_items_since):
|
||||
to_patch = 'swift.container.backend.ContainerBroker.get_items_since'
|
||||
with mock.patch(to_patch, fake_get_items_since), \
|
||||
mock.patch('swift.common.db_replicator.sleep'), \
|
||||
mock.patch('swift.container.backend.tpool.execute',
|
||||
lambda func, *args: func(*args)):
|
||||
# For some reason, on py3 we start popping Timeouts
|
||||
# if we let eventlet trampoline...
|
||||
daemon, repl_calls, rsync_calls = self.check_replicate(
|
||||
local_broker, 1, expect_success=False,
|
||||
repl_conf={'per_diff': 1})
|
||||
|
2
tox.ini
2
tox.ini
@ -32,6 +32,7 @@ commands =
|
||||
test/unit/account/test_auditor.py \
|
||||
test/unit/account/test_backend.py \
|
||||
test/unit/account/test_reaper.py \
|
||||
test/unit/account/test_replicator.py \
|
||||
test/unit/account/test_utils.py \
|
||||
test/unit/cli/test_dispersion_report.py \
|
||||
test/unit/cli/test_form_signature.py \
|
||||
@ -69,6 +70,7 @@ commands =
|
||||
test/unit/common/test_swob.py \
|
||||
test/unit/common/test_utils.py \
|
||||
test/unit/common/test_wsgi.py \
|
||||
test/unit/container/test_replicator.py \
|
||||
test/unit/proxy/controllers/test_info.py}
|
||||
|
||||
[testenv:py36]
|
||||
|
Loading…
Reference in New Issue
Block a user