py3: port account/container replicators

Change-Id: Ia2662d8f75883e1cc41b9277c65f8b771f56f902
This commit is contained in:
Tim Burke 2018-11-01 00:43:41 +00:00
parent 3420921a33
commit bc4494f24d
8 changed files with 39 additions and 24 deletions

View File

@ -997,6 +997,7 @@ class DatabaseBroker(object):
timestamp will be removed.
:returns: True if conn.commit() should be called
"""
timestamp = Timestamp(timestamp)
try:
row = conn.execute('SELECT metadata FROM %s_stat' %
self.db_type).fetchone()
@ -1008,7 +1009,7 @@ class DatabaseBroker(object):
md = json.loads(md)
keys_to_delete = []
for key, (value, value_timestamp) in md.items():
if value == '' and value_timestamp < timestamp:
if value == '' and Timestamp(value_timestamp) < timestamp:
keys_to_delete.append(key)
if keys_to_delete:
for key in keys_to_delete:

View File

@ -476,7 +476,7 @@ class Replicator(Daemon):
elif response.status == HTTP_INSUFFICIENT_STORAGE:
raise DriveNotMounted()
elif 200 <= response.status < 300:
rinfo = json.loads(response.data)
rinfo = json.loads(response.data.decode('ascii'))
local_sync = broker.get_sync(rinfo['id'], incoming=False)
if rinfo.get('metadata', ''):
broker.update_metadata(json.loads(rinfo['metadata']))

View File

@ -1266,7 +1266,7 @@ class ContainerBroker(DatabaseBroker):
:param source: if defined, update incoming_sync with the source
"""
for item in item_list:
if isinstance(item['name'], six.text_type):
if six.PY2 and isinstance(item['name'], six.text_type):
item['name'] = item['name'].encode('utf-8')
def _really_really_merge_items(conn):
@ -1362,7 +1362,7 @@ class ContainerBroker(DatabaseBroker):
if isinstance(item, ShardRange):
item = dict(item)
for col in ('name', 'lower', 'upper'):
if isinstance(item[col], six.text_type):
if six.PY2 and isinstance(item[col], six.text_type):
item[col] = item[col].encode('utf-8')
item_list.append(item)

View File

@ -95,12 +95,12 @@ def incorrect_policy_index(info, remote_info):
"""
if 'storage_policy_index' not in remote_info:
return False
if remote_info['storage_policy_index'] == \
info['storage_policy_index']:
if remote_info['storage_policy_index'] == info['storage_policy_index']:
return False
return info['storage_policy_index'] != sorted(
[info, remote_info], cmp=cmp_policy_info)[0]['storage_policy_index']
# Only return True if remote_info has the better data;
# see the docstring for cmp_policy_info
return cmp_policy_info(info, remote_info) > 0
def translate_container_headers_to_info(headers):

View File

@ -66,7 +66,7 @@ class ContainerReplicator(db_replicator.Replicator):
def _handle_sync_response(self, node, response, info, broker, http,
different_region=False):
if is_success(response.status):
remote_info = json.loads(response.data)
remote_info = json.loads(response.data.decode('ascii'))
if incorrect_policy_index(info, remote_info):
status_changed_at = Timestamp.now()
broker.set_storage_policy_index(
@ -129,7 +129,8 @@ class ContainerReplicator(db_replicator.Replicator):
def _fetch_and_merge_shard_ranges(self, http, broker):
response = http.replicate('get_shard_ranges')
if is_success(response.status):
broker.merge_shard_ranges(json.loads(response.data))
broker.merge_shard_ranges(json.loads(
response.data.decode('ascii')))
def find_local_handoff_for_part(self, part):
"""

View File

@ -29,6 +29,7 @@ import json
import mock
from mock import patch, call
import six
from six.moves import reload_module
from swift.container.backend import DATADIR
@ -159,6 +160,8 @@ def _mock_process(*args):
class ReplHttp(object):
def __init__(self, response=None, set_status=200):
if isinstance(response, six.text_type):
response = response.encode('ascii')
self.response = response
self.set_status = set_status
replicated = False
@ -689,7 +692,7 @@ class TestDBReplicator(unittest.TestCase):
class FakeResponse(object):
def __init__(self, status, rinfo):
self._status = status
self.data = json.dumps(rinfo)
self.data = json.dumps(rinfo).encode('ascii')
@property
def status(self):

View File

@ -33,11 +33,10 @@ from swift.common.storage_policy import POLICIES
from test.unit.common import test_db_replicator
from test.unit import patch_policies, make_timestamp_iter, mock_check_drive, \
debug_logger, EMPTY_ETAG, FakeLogger
debug_logger, EMPTY_ETAG, FakeLogger, attach_fake_replication_rpc, \
FakeHTTPResponse
from contextlib import contextmanager
from test.unit.common.test_db_replicator import attach_fake_replication_rpc
@patch_policies
class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
@ -967,7 +966,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
most_recent_items = {}
for name, timestamp in all_items:
most_recent_items[name] = max(
timestamp, most_recent_items.get(name, -1))
timestamp, most_recent_items.get(name, ''))
self.assertEqual(2, len(most_recent_items))
for db in (broker, remote_broker):
@ -1415,7 +1414,8 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
replicate_hook = mock.MagicMock()
fake_repl_connection = attach_fake_replication_rpc(
self.rpc, errors={'merge_shard_ranges': [HTTPServerError()]},
self.rpc, errors={'merge_shard_ranges': [
FakeHTTPResponse(HTTPServerError())]},
replicate_hook=replicate_hook)
db_replicator.ReplConnection = fake_repl_connection
part, node = self._get_broker_part_node(remote_broker)
@ -1518,7 +1518,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
check_replicate(shard_ranges + [own_sr])
def check_replicate(self, from_broker, remote_node_index, repl_conf=None,
expect_success=True, errors=None):
expect_success=True):
repl_conf = repl_conf or {}
repl_calls = []
rsync_calls = []
@ -1527,7 +1527,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
repl_calls.append((op, sync_args))
fake_repl_connection = attach_fake_replication_rpc(
self.rpc, replicate_hook=repl_hook, errors=errors)
self.rpc, replicate_hook=repl_hook, errors=None)
db_replicator.ReplConnection = fake_repl_connection
daemon = replicator.ContainerReplicator(
repl_conf, logger=debug_logger())
@ -2316,9 +2316,13 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
calls.append(args)
return orig_get_items_since(broker, *args)
with mock.patch(
'swift.container.backend.ContainerBroker.get_items_since',
fake_get_items_since):
to_patch = 'swift.container.backend.ContainerBroker.get_items_since'
with mock.patch(to_patch, fake_get_items_since), \
mock.patch('swift.common.db_replicator.sleep'), \
mock.patch('swift.container.backend.tpool.execute',
lambda func, *args: func(*args)):
# For some reason, on py3 we start popping Timeouts
# if we let eventlet trampoline...
daemon, repl_calls, rsync_calls = self.check_replicate(
local_broker, 1, expect_success=False,
repl_conf={'per_diff': 1})
@ -2355,9 +2359,13 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
calls.append(args)
return result
with mock.patch(
'swift.container.backend.ContainerBroker.get_items_since',
fake_get_items_since):
to_patch = 'swift.container.backend.ContainerBroker.get_items_since'
with mock.patch(to_patch, fake_get_items_since), \
mock.patch('swift.common.db_replicator.sleep'), \
mock.patch('swift.container.backend.tpool.execute',
lambda func, *args: func(*args)):
# For some reason, on py3 we start popping Timeouts
# if we let eventlet trampoline...
daemon, repl_calls, rsync_calls = self.check_replicate(
local_broker, 1, expect_success=False,
repl_conf={'per_diff': 1})

View File

@ -32,6 +32,7 @@ commands =
test/unit/account/test_auditor.py \
test/unit/account/test_backend.py \
test/unit/account/test_reaper.py \
test/unit/account/test_replicator.py \
test/unit/account/test_utils.py \
test/unit/cli/test_dispersion_report.py \
test/unit/cli/test_form_signature.py \
@ -69,6 +70,7 @@ commands =
test/unit/common/test_swob.py \
test/unit/common/test_utils.py \
test/unit/common/test_wsgi.py \
test/unit/container/test_replicator.py \
test/unit/proxy/controllers/test_info.py}
[testenv:py36]