Quarantine stale EC fragments after checking handoffs

If the reconstructor finds a fragment that appears to be stale then it
will now quarantine the fragment.  Fragments are considered stale if
insufficient fragments at the same timestamp can be found to rebuild
missing fragments, and the number found is less than or equal to a new
reconstructor 'quarantine_threshold' config option.

Before quarantining a fragment the reconstructor will attempt to fetch
fragments from handoff nodes in addition to the usual primary nodes.
The handoff requests are limited by a new 'request_node_count'
config option.

'quarantine_threshold' defaults to zero i.e. no fragments will be
quarantined. 'request node count' defaults to '2 * replicas'.

Closes-Bug: 1655608

Change-Id: I08e1200291833dea3deba32cdb364baa99dc2816
This commit is contained in:
Alistair Coles 2021-04-29 20:48:51 +01:00
parent eeaac713fd
commit 46ea3aeae8
13 changed files with 1239 additions and 142 deletions

View File

@ -390,6 +390,39 @@ use = egg:swift#recon
# the environment (default). For more information, see # the environment (default). For more information, see
# https://bugs.launchpad.net/liberasurecode/+bug/1886088 # https://bugs.launchpad.net/liberasurecode/+bug/1886088
# write_legacy_ec_crc = # write_legacy_ec_crc =
#
# When attempting to reconstruct a missing fragment on another node from a
# fragment on the local node, the reconstructor may fail to fetch sufficient
# fragments to reconstruct the missing fragment. This may be because most or
# all of the remote fragments have been deleted, and the local fragment is
# stale, in which case the reconstructor will never succeed in reconstructing
# the apparently missing fragment and will log errors. If the object's
# tombstones have been reclaimed then the stale fragment will never be deleted
# (see https://bugs.launchpad.net/swift/+bug/1655608). If an operator suspects
# that stale fragments have been re-introduced to the cluster and is seeing
# error logs similar to those in the bug report, then the quarantine_threshold
# option may be set to a value greater than zero. This enables the
# reconstructor to quarantine the stale fragments when it fails to fetch more
# than the quarantine_threshold number of fragments (including the stale
# fragment) during an attempt to reconstruct. For example, setting the
# quarantine_threshold to 1 would cause a fragment to be quarantined if no
# other fragments can be fetched. The value may be reset to zero after the
# reconstructor has run on all affected nodes and the error logs are no longer
# seen.
# Note: the quarantine_threshold applies equally to all policies, but for each
# policy it is effectively capped at (ec_ndata - 1) so that a fragment is never
# quarantined when sufficient fragments exist to reconstruct the object.
# Fragments are not quarantined until they are older than the reclaim_age.
# quarantine_threshold = 0
#
# Sets the maximum number of nodes to which requests will be made before
# quarantining a fragment. You can use '* replicas' at the end to have it use
# the number given times the number of replicas for the ring being used for the
# requests. The minimum number of nodes to which requests are made is the
# number of replicas for the policy minus 1 (the node on which the fragment is
# to be rebuilt). The minimum is only exceeded if request_node_count is
# greater, and only for the purposes of quarantining.
# request_node_count = 2 * replicas
[object-updater] [object-updater]
# You can override the default log routing for this app here (don't use set!): # You can override the default log routing for this app here (don't use set!):

View File

@ -220,8 +220,8 @@ use = egg:swift#proxy
# to use up to replica count threads when waiting on a response. In # to use up to replica count threads when waiting on a response. In
# conjunction with the concurrency_timeout option this will allow swift to send # conjunction with the concurrency_timeout option this will allow swift to send
# out GET/HEAD requests to the storage nodes concurrently and answer as soon as # out GET/HEAD requests to the storage nodes concurrently and answer as soon as
# the minimum number of backend responses are availabe - in replicated contexts # the minimum number of backend responses are available - in replicated
# this will be the first backend replica to respond. # contexts this will be the first backend replica to respond.
# concurrent_gets = off # concurrent_gets = off
# #
# This parameter controls how long to wait before firing off the next # This parameter controls how long to wait before firing off the next

View File

@ -515,6 +515,23 @@ def config_percent_value(value):
raise ValueError("%s: %s" % (str(err), value)) raise ValueError("%s: %s" % (str(err), value))
def config_request_node_count_value(value):
try:
value_parts = value.lower().split()
rnc_value = int(value_parts[0])
except (ValueError, AttributeError):
pass
else:
if len(value_parts) == 1:
return lambda replicas: rnc_value
elif (len(value_parts) == 3 and
value_parts[1] == '*' and
value_parts[2] == 'replicas'):
return lambda replicas: rnc_value * replicas
raise ValueError(
'Invalid request_node_count value: %r' % value)
def append_underscore(prefix): def append_underscore(prefix):
if prefix and not prefix.endswith('_'): if prefix and not prefix.endswith('_'):
prefix += '_' prefix += '_'

View File

@ -12,7 +12,7 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import itertools
import json import json
import errno import errno
import os import os
@ -33,7 +33,8 @@ from swift.common.utils import (
dump_recon_cache, mkdirs, config_true_value, dump_recon_cache, mkdirs, config_true_value,
GreenAsyncPile, Timestamp, remove_file, GreenAsyncPile, Timestamp, remove_file,
load_recon_cache, parse_override_options, distribute_evenly, load_recon_cache, parse_override_options, distribute_evenly,
PrefixLoggerAdapter, remove_directory) PrefixLoggerAdapter, remove_directory, config_request_node_count_value,
non_negative_int)
from swift.common.header_key_dict import HeaderKeyDict from swift.common.header_key_dict import HeaderKeyDict
from swift.common.bufferedhttp import http_connect from swift.common.bufferedhttp import http_connect
from swift.common.daemon import Daemon from swift.common.daemon import Daemon
@ -102,8 +103,8 @@ class ResponseBucket(object):
def __init__(self): def __init__(self):
# count of all responses associated with this Bucket # count of all responses associated with this Bucket
self.num_responses = 0 self.num_responses = 0
# map {frag_index: response} for subset of responses that # map {frag_index: response} for subset of responses that could be used
# could be used to rebuild the missing fragment # to rebuild the missing fragment
self.useful_responses = {} self.useful_responses = {}
# set if a durable timestamp was seen in responses # set if a durable timestamp was seen in responses
self.durable = False self.durable = False
@ -232,6 +233,10 @@ class ObjectReconstructor(Daemon):
'of handoffs_only.') 'of handoffs_only.')
self.rebuild_handoff_node_count = int(conf.get( self.rebuild_handoff_node_count = int(conf.get(
'rebuild_handoff_node_count', 2)) 'rebuild_handoff_node_count', 2))
self.quarantine_threshold = non_negative_int(
conf.get('quarantine_threshold', 0))
self.request_node_count = config_request_node_count_value(
conf.get('request_node_count', '2 * replicas'))
# When upgrading from liberasurecode<=1.5.0, you may want to continue # When upgrading from liberasurecode<=1.5.0, you may want to continue
# writing legacy CRCs until all nodes are upgraded and capabale of # writing legacy CRCs until all nodes are upgraded and capabale of
@ -368,23 +373,24 @@ class ObjectReconstructor(Daemon):
return False return False
return True return True
def _get_response(self, node, part, path, headers, full_path): def _get_response(self, node, policy, partition, path, headers):
""" """
Helper method for reconstruction that GETs a single EC fragment Helper method for reconstruction that GETs a single EC fragment
archive archive
:param node: the node to GET from :param node: the node to GET from
:param part: the partition :param policy: the job policy
:param partition: the partition
:param path: path of the desired EC archive relative to partition dir :param path: path of the desired EC archive relative to partition dir
:param headers: the headers to send :param headers: the headers to send
:param full_path: full path to desired EC archive
:returns: response :returns: response
""" """
full_path = _full_path(node, partition, path, policy)
resp = None resp = None
try: try:
with ConnectionTimeout(self.conn_timeout): with ConnectionTimeout(self.conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], conn = http_connect(node['ip'], node['port'], node['device'],
part, 'GET', path, headers=headers) partition, 'GET', path, headers=headers)
with Timeout(self.node_timeout): with Timeout(self.node_timeout):
resp = conn.getresponse() resp = conn.getresponse()
resp.full_path = full_path resp.full_path = full_path
@ -462,7 +468,11 @@ class ObjectReconstructor(Daemon):
fi_to_rebuild) fi_to_rebuild)
return None return None
if fi_to_rebuild == resp_frag_index: durable_timestamp = resp.headers.get('X-Backend-Durable-Timestamp')
if durable_timestamp:
buckets[Timestamp(durable_timestamp)].durable = True
if resp_frag_index == fi_to_rebuild:
# TODO: With duplicated EC frags it's not unreasonable to find the # TODO: With duplicated EC frags it's not unreasonable to find the
# very fragment we're trying to rebuild exists on another primary # very fragment we're trying to rebuild exists on another primary
# node. In this case we should stream it directly from the remote # node. In this case we should stream it directly from the remote
@ -471,19 +481,43 @@ class ObjectReconstructor(Daemon):
'Found existing frag #%s at %s while rebuilding to %s', 'Found existing frag #%s at %s while rebuilding to %s',
fi_to_rebuild, resp.full_path, fi_to_rebuild, resp.full_path,
_full_path(node, partition, path, policy)) _full_path(node, partition, path, policy))
return None elif resp_frag_index not in bucket.useful_responses:
durable_timestamp = resp.headers.get('X-Backend-Durable-Timestamp')
if durable_timestamp:
buckets[Timestamp(durable_timestamp)].durable = True
if resp_frag_index not in bucket.useful_responses:
bucket.useful_responses[resp_frag_index] = resp bucket.useful_responses[resp_frag_index] = resp
return bucket # else: duplicate frag_index isn't useful for rebuilding
return None
def _make_fragment_requests(self, job, node, datafile_metadata, buckets, return bucket
error_responses, nodes):
def _is_quarantine_candidate(self, policy, buckets, error_responses, df):
# This condition is deliberately strict because it determines if
# more requests will be issued and ultimately if the fragment
# will be quarantined.
if list(error_responses.keys()) != [404]:
# only quarantine if all other responses are 404 so we are
# confident there are no other frags on queried nodes
return False
local_timestamp = Timestamp(df.get_datafile_metadata()['X-Timestamp'])
if list(buckets.keys()) != [local_timestamp]:
# don't quarantine if there's insufficient other timestamp
# frags, or no response for the local frag timestamp: we
# possibly could quarantine, but this unexpected case may be
# worth more investigation
return False
if time.time() - float(local_timestamp) <= df.manager.reclaim_age:
# If the fragment has not yet passed reclaim age then it is
# likely that a tombstone will be reverted to this node, or
# neighbor frags will get reverted from handoffs to *other* nodes
# and we'll discover we *do* have enough to reconstruct. Don't
# quarantine it yet: better that it is cleaned up 'normally'.
return False
bucket = buckets[local_timestamp]
return (bucket.num_responses <= self.quarantine_threshold and
bucket.num_responses < policy.ec_ndata and
df._frag_index in bucket.useful_responses)
def _make_fragment_requests(self, job, node, df, buckets, error_responses):
""" """
Issue requests for fragments to the list of ``nodes`` and sort the Issue requests for fragments to the list of ``nodes`` and sort the
responses into per-timestamp ``buckets`` or per-status responses into per-timestamp ``buckets`` or per-status
@ -492,16 +526,15 @@ class ObjectReconstructor(Daemon):
:param job: job from ssync_sender. :param job: job from ssync_sender.
:param node: node to which we're rebuilding. :param node: node to which we're rebuilding.
:param datafile_metadata: the datafile metadata to attach to :param df: an instance of :class:`~swift.obj.diskfile.BaseDiskFile`.
the rebuilt fragment archive
:param buckets: dict of per-timestamp buckets for ok responses. :param buckets: dict of per-timestamp buckets for ok responses.
:param error_responses: dict of per-status lists of error responses. :param error_responses: dict of per-status lists of error responses.
:param nodes: A list of nodes.
:return: A per-timestamp with sufficient responses, or None if :return: A per-timestamp with sufficient responses, or None if
there is no such bucket. there is no such bucket.
""" """
policy = job['policy'] policy = job['policy']
partition = job['partition'] partition = job['partition']
datafile_metadata = df.get_datafile_metadata()
# the fragment index we need to reconstruct is the position index # the fragment index we need to reconstruct is the position index
# of the node we're rebuilding to within the primary part list # of the node we're rebuilding to within the primary part list
@ -521,24 +554,80 @@ class ObjectReconstructor(Daemon):
headers['X-Backend-Fragment-Preferences'] = json.dumps(frag_prefs) headers['X-Backend-Fragment-Preferences'] = json.dumps(frag_prefs)
path = datafile_metadata['name'] path = datafile_metadata['name']
pile = GreenAsyncPile(len(nodes)) ring = policy.object_ring
for _node in nodes: primary_nodes = ring.get_part_nodes(partition)
full_get_path = _full_path(_node, partition, path, policy) # primary_node_count is the maximum number of nodes to consume in a
pile.spawn(self._get_response, _node, partition, # normal rebuild attempt when there is no quarantine candidate,
path, headers, full_get_path) # including the node to which we are rebuilding
primary_node_count = len(primary_nodes)
# don't try and fetch a fragment from the node we're rebuilding to
filtered_primary_nodes = [n for n in primary_nodes
if n['id'] != node['id']]
# concurrency is the number of requests fired off in initial batch
concurrency = len(filtered_primary_nodes)
# max_node_count is the maximum number of nodes to consume when
# verifying a quarantine candidate and is at least primary_node_count
max_node_count = max(primary_node_count,
self.request_node_count(primary_node_count))
pile = GreenAsyncPile(concurrency)
for primary_node in filtered_primary_nodes:
pile.spawn(self._get_response, primary_node, policy, partition,
path, headers)
useful_bucket = None
for resp in pile: for resp in pile:
bucket = self._handle_fragment_response( bucket = self._handle_fragment_response(
node, policy, partition, fi_to_rebuild, path, buckets, node, policy, partition, fi_to_rebuild, path, buckets,
error_responses, resp) error_responses, resp)
if bucket and len(bucket.useful_responses) >= policy.ec_ndata: if bucket and len(bucket.useful_responses) >= policy.ec_ndata:
frag_indexes = list(bucket.useful_responses.keys()) useful_bucket = bucket
self.logger.debug('Reconstruct frag #%s with frag indexes %s' break
% (fi_to_rebuild, frag_indexes))
return bucket
return None
def reconstruct_fa(self, job, node, datafile_metadata): # Once all rebuild nodes have responded, if we have a quarantine
# candidate, go beyond primary_node_count and on to handoffs. The
# first non-404 response will prevent quarantine, but the expected
# common case is all 404 responses so we use some concurrency to get an
# outcome faster at the risk of some unnecessary requests in the
# uncommon case.
if (not useful_bucket and
self._is_quarantine_candidate(
policy, buckets, error_responses, df)):
node_count = primary_node_count
handoff_iter = itertools.islice(ring.get_more_nodes(partition),
max_node_count - node_count)
for handoff_node in itertools.islice(handoff_iter, concurrency):
node_count += 1
pile.spawn(self._get_response, handoff_node, policy, partition,
path, headers)
for resp in pile:
bucket = self._handle_fragment_response(
node, policy, partition, fi_to_rebuild, path, buckets,
error_responses, resp)
if bucket and len(bucket.useful_responses) >= policy.ec_ndata:
useful_bucket = bucket
self.logger.debug(
'Reconstructing frag from handoffs, node_count=%d'
% node_count)
break
elif self._is_quarantine_candidate(
policy, buckets, error_responses, df):
try:
handoff_node = next(handoff_iter)
node_count += 1
pile.spawn(self._get_response, handoff_node, policy,
partition, path, headers)
except StopIteration:
pass
# else: this frag is no longer a quarantine candidate, so we
# could break right here and ignore any remaining responses,
# but given that we may have actually found another frag we'll
# optimistically wait for any remaining responses in case a
# useful bucket is assembled.
return useful_bucket
def reconstruct_fa(self, job, node, df):
""" """
Reconstructs a fragment archive - this method is called from ssync Reconstructs a fragment archive - this method is called from ssync
after a remote node responds that is missing this object - the local after a remote node responds that is missing this object - the local
@ -547,8 +636,7 @@ class ObjectReconstructor(Daemon):
:param job: job from ssync_sender. :param job: job from ssync_sender.
:param node: node to which we're rebuilding. :param node: node to which we're rebuilding.
:param datafile_metadata: the datafile metadata to attach to :param df: an instance of :class:`~swift.obj.diskfile.BaseDiskFile`.
the rebuilt fragment archive
:returns: a DiskFile like class for use by ssync. :returns: a DiskFile like class for use by ssync.
:raises DiskFileQuarantined: if the fragment archive cannot be :raises DiskFileQuarantined: if the fragment archive cannot be
reconstructed and has as a result been quarantined. reconstructed and has as a result been quarantined.
@ -559,6 +647,7 @@ class ObjectReconstructor(Daemon):
# the fragment index we need to reconstruct is the position index # the fragment index we need to reconstruct is the position index
# of the node we're rebuilding to within the primary part list # of the node we're rebuilding to within the primary part list
fi_to_rebuild = node['backend_index'] fi_to_rebuild = node['backend_index']
datafile_metadata = df.get_datafile_metadata()
local_timestamp = Timestamp(datafile_metadata['X-Timestamp']) local_timestamp = Timestamp(datafile_metadata['X-Timestamp'])
path = datafile_metadata['name'] path = datafile_metadata['name']
@ -566,12 +655,13 @@ class ObjectReconstructor(Daemon):
error_responses = defaultdict(list) # map status code -> response list error_responses = defaultdict(list) # map status code -> response list
# don't try and fetch a fragment from the node we're rebuilding to # don't try and fetch a fragment from the node we're rebuilding to
part_nodes = [n for n in policy.object_ring.get_part_nodes(partition)
if n['id'] != node['id']]
useful_bucket = self._make_fragment_requests( useful_bucket = self._make_fragment_requests(
job, node, datafile_metadata, buckets, error_responses, part_nodes) job, node, df, buckets, error_responses)
if useful_bucket: if useful_bucket:
frag_indexes = list(useful_bucket.useful_responses.keys())
self.logger.debug('Reconstruct frag #%s with frag indexes %s'
% (fi_to_rebuild, frag_indexes))
responses = list(useful_bucket.useful_responses.values()) responses = list(useful_bucket.useful_responses.values())
rebuilt_fragment_iter = self.make_rebuilt_fragment_iter( rebuilt_fragment_iter = self.make_rebuilt_fragment_iter(
responses[:policy.ec_ndata], path, policy, fi_to_rebuild) responses[:policy.ec_ndata], path, policy, fi_to_rebuild)
@ -601,6 +691,10 @@ class ObjectReconstructor(Daemon):
errors, 'durable' if durable else 'non-durable', errors, 'durable' if durable else 'non-durable',
full_path, fi_to_rebuild)) full_path, fi_to_rebuild))
if self._is_quarantine_candidate(policy, buckets, error_responses, df):
raise df._quarantine(
df._data_file, "Solitary fragment #%s" % df._frag_index)
raise DiskFileError('Unable to reconstruct EC archive') raise DiskFileError('Unable to reconstruct EC archive')
def _reconstruct(self, policy, fragment_payload, frag_index): def _reconstruct(self, policy, fragment_payload, frag_index):

View File

@ -374,7 +374,7 @@ class Sender(object):
# from the data file only. # from the data file only.
df_alt = self.job.get( df_alt = self.job.get(
'sync_diskfile_builder', lambda *args: df)( 'sync_diskfile_builder', lambda *args: df)(
self.job, self.node, df.get_datafile_metadata()) self.job, self.node, df)
self.send_put(connection, url_path, df_alt, self.send_put(connection, url_path, df_alt,
durable=is_durable) durable=is_durable)
if want.get('meta') and df.data_timestamp != df.timestamp: if want.get('meta') and df.data_timestamp != df.timestamp:

View File

@ -35,7 +35,8 @@ from swift.common.ring import Ring
from swift.common.utils import Watchdog, get_logger, \ from swift.common.utils import Watchdog, get_logger, \
get_remote_client, split_path, config_true_value, generate_trans_id, \ get_remote_client, split_path, config_true_value, generate_trans_id, \
affinity_key_function, affinity_locality_predicate, list_from_csv, \ affinity_key_function, affinity_locality_predicate, list_from_csv, \
register_swift_info, parse_prefixed_conf, config_auto_int_value register_swift_info, parse_prefixed_conf, config_auto_int_value, \
config_request_node_count_value
from swift.common.constraints import check_utf8, valid_api_version from swift.common.constraints import check_utf8, valid_api_version
from swift.proxy.controllers import AccountController, ContainerController, \ from swift.proxy.controllers import AccountController, ContainerController, \
ObjectControllerRouter, InfoController ObjectControllerRouter, InfoController
@ -279,16 +280,8 @@ class Application(object):
conf.get('strict_cors_mode', 't')) conf.get('strict_cors_mode', 't'))
self.node_timings = {} self.node_timings = {}
self.timing_expiry = int(conf.get('timing_expiry', 300)) self.timing_expiry = int(conf.get('timing_expiry', 300))
value = conf.get('request_node_count', '2 * replicas').lower().split() value = conf.get('request_node_count', '2 * replicas')
if len(value) == 1: self.request_node_count = config_request_node_count_value(value)
rnc_value = int(value[0])
self.request_node_count = lambda replicas: rnc_value
elif len(value) == 3 and value[1] == '*' and value[2] == 'replicas':
rnc_value = int(value[0])
self.request_node_count = lambda replicas: rnc_value * replicas
else:
raise ValueError(
'Invalid request_node_count value: %r' % ''.join(value))
# swift_owner_headers are stripped by the account and container # swift_owner_headers are stripped by the account and container
# controllers; we should extend header stripping to object controller # controllers; we should extend header stripping to object controller
# when a privileged object header is implemented. # when a privileged object header is implemented.

View File

@ -33,7 +33,7 @@ from six.moves.http_client import HTTPConnection
from six.moves.urllib.parse import urlparse from six.moves.urllib.parse import urlparse
from swiftclient import get_auth, head_account, client from swiftclient import get_auth, head_account, client
from swift.common import internal_client, direct_client from swift.common import internal_client, direct_client, utils
from swift.common.direct_client import DirectClientException from swift.common.direct_client import DirectClientException
from swift.common.ring import Ring from swift.common.ring import Ring
from swift.common.utils import readconf, renamer, \ from swift.common.utils import readconf, renamer, \
@ -41,6 +41,7 @@ from swift.common.utils import readconf, renamer, \
from swift.common.manager import Manager from swift.common.manager import Manager
from swift.common.storage_policy import POLICIES, EC_POLICY, REPL_POLICY from swift.common.storage_policy import POLICIES, EC_POLICY, REPL_POLICY
from swift.obj.diskfile import get_data_dir from swift.obj.diskfile import get_data_dir
from test.debug_logger import debug_logger
from test.probe import CHECK_SERVER_TIMEOUT, VALIDATE_RSYNC, PROXY_BASE_URL from test.probe import CHECK_SERVER_TIMEOUT, VALIDATE_RSYNC, PROXY_BASE_URL
@ -186,6 +187,8 @@ def store_config_paths(name, configs):
server_names = [name, '%s-replicator' % name] server_names = [name, '%s-replicator' % name]
if name == 'container': if name == 'container':
server_names.append('container-sharder') server_names.append('container-sharder')
elif name == 'object':
server_names.append('object-reconstructor')
for server_name in server_names: for server_name in server_names:
for server in Manager([server_name]): for server in Manager([server_name]):
for i, conf in enumerate(server.conf_files(), 1): for i, conf in enumerate(server.conf_files(), 1):
@ -563,6 +566,15 @@ class ProbeTest(unittest.TestCase):
for ent in os.listdir(ap_dir_fullpath)]) for ent in os.listdir(ap_dir_fullpath)])
return async_pendings return async_pendings
def run_custom_daemon(self, klass, conf_section, conf_index,
custom_conf, **kwargs):
conf_file = self.configs[conf_section][conf_index]
conf = utils.readconf(conf_file, conf_section)
conf.update(custom_conf)
daemon = klass(conf, debug_logger('probe'))
daemon.run_once(**kwargs)
return daemon
class ReplProbeTest(ProbeTest): class ReplProbeTest(ProbeTest):
@ -679,6 +691,7 @@ class ECProbeTest(ProbeTest):
self.direct_get(onode, opart, require_durable=require_durable) self.direct_get(onode, opart, require_durable=require_durable)
except direct_client.DirectClientException as err: except direct_client.DirectClientException as err:
self.assertEqual(err.http_status, status) self.assertEqual(err.http_status, status)
return err
else: else:
self.fail('Node data on %r was not fully destroyed!' % (onode,)) self.fail('Node data on %r was not fully destroyed!' % (onode,))

View File

@ -13,7 +13,7 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import itertools
from contextlib import contextmanager from contextlib import contextmanager
import unittest import unittest
import uuid import uuid
@ -22,7 +22,9 @@ import time
import six import six
from swift.common.direct_client import DirectClientException from swift.common.direct_client import DirectClientException
from swift.common.manager import Manager
from swift.common.utils import md5 from swift.common.utils import md5
from swift.obj.reconstructor import ObjectReconstructor
from test.probe.common import ECProbeTest from test.probe.common import ECProbeTest
from swift.common import direct_client from swift.common import direct_client
@ -369,6 +371,160 @@ class TestReconstructorRebuild(ECProbeTest):
'X-Backend-Storage-Policy-Index': int(self.policy)}) 'X-Backend-Storage-Policy-Index': int(self.policy)})
self.assertNotIn('X-Delete-At', headers) self.assertNotIn('X-Delete-At', headers)
def test_rebuild_quarantines_lonely_frag(self):
# fail one device while the object is deleted so we are left with one
# fragment and some tombstones
failed_node = self.onodes[0]
device_path = self.device_dir(failed_node)
self.kill_drive(device_path)
self.assert_direct_get_fails(failed_node, self.opart, 507) # sanity
# delete object
client.delete_object(self.url, self.token, self.container_name,
self.object_name)
# check we have tombstones
for node in self.onodes[1:]:
err = self.assert_direct_get_fails(node, self.opart, 404)
self.assertIn('X-Backend-Timestamp', err.http_headers)
# run the reconstructor with zero reclaim age to clean up tombstones
for conf_index in self.configs['object-reconstructor'].keys():
self.run_custom_daemon(
ObjectReconstructor, 'object-reconstructor', conf_index,
{'reclaim_age': '0'})
# check we no longer have tombstones
for node in self.onodes[1:]:
err = self.assert_direct_get_fails(node, self.opart, 404)
self.assertNotIn('X-Timestamp', err.http_headers)
# revive the failed device and check it has a fragment
self.revive_drive(device_path)
self.assert_direct_get_succeeds(failed_node, self.opart)
# restart proxy to clear error-limiting so that the revived drive
# participates again
Manager(['proxy-server']).restart()
# client GET will fail with 503 ...
with self.assertRaises(ClientException) as cm:
client.get_object(self.url, self.token, self.container_name,
self.object_name)
self.assertEqual(503, cm.exception.http_status)
# ... but client GET succeeds
headers = client.head_object(self.url, self.token, self.container_name,
self.object_name)
for key in self.headers_post:
self.assertIn(key, headers)
self.assertEqual(self.headers_post[key], headers[key])
# run the reconstructor without quarantine_threshold set
error_lines = []
warning_lines = []
for conf_index in self.configs['object-reconstructor'].keys():
reconstructor = self.run_custom_daemon(
ObjectReconstructor, 'object-reconstructor', conf_index,
{'reclaim_age': '0'})
logger = reconstructor.logger.logger
error_lines.append(logger.get_lines_for_level('error'))
warning_lines.append(logger.get_lines_for_level('warning'))
# check logs for errors
found_lines = False
for lines in error_lines:
if not lines:
continue
self.assertFalse(found_lines, error_lines)
found_lines = True
for line in itertools.islice(lines, 0, 6, 2):
self.assertIn(
'Unable to get enough responses (1/4 from 1 ok '
'responses)', line, lines)
for line in itertools.islice(lines, 1, 7, 2):
self.assertIn(
'Unable to get enough responses (4 x 404 error '
'responses)', line, lines)
self.assertTrue(found_lines, 'error lines not found')
for lines in warning_lines:
self.assertEqual([], lines)
# check we have still have a single fragment and no tombstones
self.assert_direct_get_succeeds(failed_node, self.opart)
for node in self.onodes[1:]:
err = self.assert_direct_get_fails(node, self.opart, 404)
self.assertNotIn('X-Timestamp', err.http_headers)
# run the reconstructor to quarantine the lonely frag
error_lines = []
warning_lines = []
for conf_index in self.configs['object-reconstructor'].keys():
reconstructor = self.run_custom_daemon(
ObjectReconstructor, 'object-reconstructor', conf_index,
{'reclaim_age': '0', 'quarantine_threshold': '1'})
logger = reconstructor.logger.logger
error_lines.append(logger.get_lines_for_level('error'))
warning_lines.append(logger.get_lines_for_level('warning'))
# check logs for errors
found_lines = False
for index, lines in enumerate(error_lines):
if not lines:
continue
self.assertFalse(found_lines, error_lines)
found_lines = True
for line in itertools.islice(lines, 0, 6, 2):
self.assertIn(
'Unable to get enough responses (1/4 from 1 ok '
'responses)', line, lines)
for line in itertools.islice(lines, 1, 7, 2):
self.assertIn(
'Unable to get enough responses (6 x 404 error '
'responses)', line, lines)
self.assertTrue(found_lines, 'error lines not found')
# check logs for quarantine warning
found_lines = False
for lines in warning_lines:
if not lines:
continue
self.assertFalse(found_lines, warning_lines)
found_lines = True
self.assertEqual(1, len(lines), lines)
self.assertIn('Quarantined object', lines[0])
self.assertTrue(found_lines, 'warning lines not found')
# check we have nothing
for node in self.onodes:
err = self.assert_direct_get_fails(node, self.opart, 404)
self.assertNotIn('X-Backend-Timestamp', err.http_headers)
# client HEAD and GET now both 404
with self.assertRaises(ClientException) as cm:
client.get_object(self.url, self.token, self.container_name,
self.object_name)
self.assertEqual(404, cm.exception.http_status)
with self.assertRaises(ClientException) as cm:
client.head_object(self.url, self.token, self.container_name,
self.object_name)
self.assertEqual(404, cm.exception.http_status)
# run the reconstructor once more - should see no errors in logs!
error_lines = []
warning_lines = []
for conf_index in self.configs['object-reconstructor'].keys():
reconstructor = self.run_custom_daemon(
ObjectReconstructor, 'object-reconstructor', conf_index,
{'reclaim_age': '0', 'quarantine_threshold': '1'})
logger = reconstructor.logger.logger
error_lines.append(logger.get_lines_for_level('error'))
warning_lines.append(logger.get_lines_for_level('warning'))
for lines in error_lines:
self.assertEqual([], lines)
for lines in warning_lines:
self.assertEqual([], lines)
if six.PY2: if six.PY2:
class TestReconstructorRebuildUTF8(TestReconstructorRebuild): class TestReconstructorRebuildUTF8(TestReconstructorRebuild):

View File

@ -40,7 +40,6 @@ from test.probe import PROXY_BASE_URL
from test.probe.brain import BrainSplitter from test.probe.brain import BrainSplitter
from test.probe.common import ReplProbeTest, get_server_number, \ from test.probe.common import ReplProbeTest, get_server_number, \
wait_for_server_to_hangup wait_for_server_to_hangup
from test.debug_logger import debug_logger
import mock import mock
@ -415,12 +414,8 @@ class BaseTestContainerSharding(ReplProbeTest):
additional_args='--partitions=%s' % part) additional_args='--partitions=%s' % part)
def run_custom_sharder(self, conf_index, custom_conf, **kwargs): def run_custom_sharder(self, conf_index, custom_conf, **kwargs):
conf_file = self.configs['container-sharder'][conf_index] return self.run_custom_daemon(ContainerSharder, 'container-sharder',
conf = utils.readconf(conf_file, 'container-sharder') conf_index, custom_conf, **kwargs)
conf.update(custom_conf)
sharder = ContainerSharder(conf, logger=debug_logger('probe'))
sharder.run_once(**kwargs)
return sharder
class TestContainerShardingNonUTF8(BaseTestContainerSharding): class TestContainerShardingNonUTF8(BaseTestContainerSharding):

View File

@ -18,6 +18,7 @@ from __future__ import print_function
import hashlib import hashlib
from test import annotate_failure
from test.debug_logger import debug_logger from test.debug_logger import debug_logger
from test.unit import temptree, make_timestamp_iter, with_tempdir, \ from test.unit import temptree, make_timestamp_iter, with_tempdir, \
mock_timestamp_now, FakeIterable mock_timestamp_now, FakeIterable
@ -3156,6 +3157,27 @@ cluster_dfw1 = http://dfw1.host/v1/
'less than 100, not "{}"'.format(val), 'less than 100, not "{}"'.format(val),
cm.exception.args[0]) cm.exception.args[0])
def test_config_request_node_count_value(self):
def do_test(value, replicas, expected):
self.assertEqual(
expected,
utils.config_request_node_count_value(value)(replicas))
do_test('0', 10, 0)
do_test('1 * replicas', 3, 3)
do_test('1 * replicas', 11, 11)
do_test('2 * replicas', 3, 6)
do_test('2 * replicas', 11, 22)
do_test('11', 11, 11)
do_test('10', 11, 10)
do_test('12', 11, 12)
for bad in ('1.1', 1.1, 'auto', 'bad',
'2.5 * replicas', 'two * replicas'):
with annotate_failure(bad):
with self.assertRaises(ValueError):
utils.config_request_node_count_value(bad)
def test_config_auto_int_value(self): def test_config_auto_int_value(self):
expectations = { expectations = {
# (value, default) : expected, # (value, default) : expected,

View File

@ -34,7 +34,7 @@ from gzip import GzipFile
from shutil import rmtree from shutil import rmtree
from six.moves.urllib.parse import unquote from six.moves.urllib.parse import unquote
from swift.common import utils from swift.common import utils
from swift.common.exceptions import DiskFileError from swift.common.exceptions import DiskFileError, DiskFileQuarantined
from swift.common.header_key_dict import HeaderKeyDict from swift.common.header_key_dict import HeaderKeyDict
from swift.common.utils import dump_recon_cache, md5 from swift.common.utils import dump_recon_cache, md5
from swift.obj import diskfile, reconstructor as object_reconstructor from swift.obj import diskfile, reconstructor as object_reconstructor
@ -42,6 +42,7 @@ from swift.common import ring
from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy, from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy,
POLICIES, EC_POLICY) POLICIES, EC_POLICY)
from swift.obj.reconstructor import SYNC, REVERT from swift.obj.reconstructor import SYNC, REVERT
from test import annotate_failure
from test.debug_logger import debug_logger from test.debug_logger import debug_logger
from test.unit import (patch_policies, mocked_http_conn, FabricatedRing, from test.unit import (patch_policies, mocked_http_conn, FabricatedRing,
@ -785,10 +786,8 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
def do_test(stat_code): def do_test(stat_code):
with mocked_http_conn(stat_code): with mocked_http_conn(stat_code):
resp = self.reconstructor._get_response(node, part, resp = self.reconstructor._get_response(
path='nada', node, self.policy, part, path='nada', headers={})
headers={},
full_path='nada/nada')
return resp return resp
for status in (200, 400, 404, 503): for status in (200, 400, 404, 503):
@ -4528,17 +4527,27 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
class TestReconstructFragmentArchive(BaseTestObjectReconstructor): class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
obj_path = b'/a/c/o' # subclass overrides this obj_name = b'o' # subclass overrides this
def setUp(self): def setUp(self):
super(TestReconstructFragmentArchive, self).setUp() super(TestReconstructFragmentArchive, self).setUp()
self.obj_path = b'/a/c/' + self.obj_name
self.obj_timestamp = self.ts() self.obj_timestamp = self.ts()
self.obj_metadata = {
'name': self.obj_path, def _create_fragment(self, frag_index, body=b'test data'):
'Content-Length': '0', utils.mkdirs(os.path.join(self.devices, 'sda1'))
'ETag': 'etag', df_mgr = self.reconstructor._df_router[self.policy]
'X-Timestamp': self.obj_timestamp.normal if six.PY2:
} obj_name = self.obj_name
else:
obj_name = self.obj_name.decode('utf8')
self.df = df_mgr.get_diskfile('sda1', 9, 'a', 'c', obj_name,
policy=self.policy)
write_diskfile(self.df, self.obj_timestamp, data=body,
frag_index=frag_index)
self.df.open()
self.logger.clear()
return self.df
def test_reconstruct_fa_no_errors(self): def test_reconstruct_fa_no_errors(self):
job = { job = {
@ -4565,9 +4574,9 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
called_headers = [] called_headers = []
orig_func = object_reconstructor.ObjectReconstructor._get_response orig_func = object_reconstructor.ObjectReconstructor._get_response
def _get_response_hook(self, node, part, path, headers, policy): def _get_response_hook(self, node, policy, part, path, headers):
called_headers.append(headers) called_headers.append(headers)
return orig_func(self, node, part, path, headers, policy) return orig_func(self, node, policy, part, path, headers)
codes, body_iter, headers = zip(*responses) codes, body_iter, headers = zip(*responses)
get_response_path = \ get_response_path = \
@ -4576,7 +4585,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
with mocked_http_conn( with mocked_http_conn(
*codes, body_iter=body_iter, headers=headers): *codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa( df = self.reconstructor.reconstruct_fa(
job, node, self.obj_metadata) job, node, self._create_fragment(2, body=b''))
self.assertEqual(0, df.content_length) self.assertEqual(0, df.content_length)
fixed_body = b''.join(df.reader()) fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual(len(fixed_body), len(broken_body))
@ -4635,7 +4644,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
with mocked_http_conn(*codes, body_iter=body_iter, with mocked_http_conn(*codes, body_iter=body_iter,
headers=headers_iter): headers=headers_iter):
df = self.reconstructor.reconstruct_fa( df = self.reconstructor.reconstruct_fa(
job, node, dict(self.obj_metadata)) job, node, self._create_fragment(2))
fixed_body = b''.join(df.reader()) fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual( self.assertEqual(
@ -4676,7 +4685,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
with mocked_http_conn(*codes, body_iter=body_iter, with mocked_http_conn(*codes, body_iter=body_iter,
headers=headers_iter): headers=headers_iter):
df = self.reconstructor.reconstruct_fa( df = self.reconstructor.reconstruct_fa(
job, node, dict(self.obj_metadata)) job, node, self._create_fragment(2))
fixed_body = b''.join(df.reader()) fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual( self.assertEqual(
@ -4722,7 +4731,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
with mocked_http_conn(*codes, body_iter=body_iter, with mocked_http_conn(*codes, body_iter=body_iter,
headers=headers_iter): headers=headers_iter):
df = self.reconstructor.reconstruct_fa( df = self.reconstructor.reconstruct_fa(
job, node, dict(self.obj_metadata)) job, node, self._create_fragment(2))
fixed_body = b''.join(df.reader()) fixed_body = b''.join(df.reader())
# ... this bad response should be ignored like any other failure # ... this bad response should be ignored like any other failure
self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual(len(fixed_body), len(broken_body))
@ -4761,7 +4770,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
with mocked_http_conn(*codes, body_iter=body_iter, with mocked_http_conn(*codes, body_iter=body_iter,
headers=headers_iter): headers=headers_iter):
df = self.reconstructor.reconstruct_fa( df = self.reconstructor.reconstruct_fa(
job, node, dict(self.obj_metadata)) job, node, self._create_fragment(2))
fixed_body = b''.join(df.reader()) fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual( self.assertEqual(
@ -4783,7 +4792,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
range(policy.object_ring.replicas - 1)] range(policy.object_ring.replicas - 1)]
with mocked_http_conn(*codes): with mocked_http_conn(*codes):
self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa, self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa,
job, node, self.obj_metadata) job, node, self._create_fragment(2))
error_lines = self.logger.get_lines_for_level('error') error_lines = self.logger.get_lines_for_level('error')
# # of replicas failed and one more error log to report not enough # # of replicas failed and one more error log to report not enough
# responses to reconstruct. # responses to reconstruct.
@ -4799,6 +4808,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
self.assertFalse(self.logger.get_lines_for_level('warning')) self.assertFalse(self.logger.get_lines_for_level('warning'))
def test_reconstruct_fa_all_404s_fails(self): def test_reconstruct_fa_all_404s_fails(self):
self._create_fragment(2)
job = { job = {
'partition': 0, 'partition': 0,
'policy': self.policy, 'policy': self.policy,
@ -4811,7 +4821,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
codes = [404 for i in range(policy.object_ring.replicas - 1)] codes = [404 for i in range(policy.object_ring.replicas - 1)]
with mocked_http_conn(*codes): with mocked_http_conn(*codes):
self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa, self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa,
job, node, self.obj_metadata) job, node, self.df)
error_lines = self.logger.get_lines_for_level('error') error_lines = self.logger.get_lines_for_level('error')
# only 1 log to report not enough responses # only 1 log to report not enough responses
self.assertEqual(1, len(error_lines)) self.assertEqual(1, len(error_lines))
@ -4823,7 +4833,51 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
# no warning # no warning
self.assertFalse(self.logger.get_lines_for_level('warning')) self.assertFalse(self.logger.get_lines_for_level('warning'))
def test_reconstruct_fa_all_404s_fails_custom_request_node_count(self):
# verify that when quarantine_threshold is not set the number of
# requests is capped at replicas - 1 regardless of request_node_count
self._create_fragment(2)
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
ring = self.policy.object_ring
# sanity check: number of handoffs available == replicas
self.assertEqual(ring.max_more_nodes, ring.replicas)
for request_node_count in (0,
self.policy.ec_ndata - 1,
ring.replicas + 1,
2 * ring.replicas - 1,
2 * ring.replicas,
3 * ring.replicas,
99 * ring.replicas):
with annotate_failure(request_node_count):
self.logger.clear()
self.reconstructor.request_node_count = \
lambda replicas: request_node_count
# request count capped at num primaries - 1
exp_requests = ring.replicas - 1
codes = [404 for i in range(exp_requests)]
with mocked_http_conn(*codes):
self.assertRaises(DiskFileError,
self.reconstructor.reconstruct_fa,
job, node, self.df)
error_lines = self.logger.get_lines_for_level('error')
# only 1 log to report not enough responses
self.assertEqual(1, len(error_lines))
self.assertIn(
'Unable to get enough responses (%s x 404 error responses)'
% exp_requests,
error_lines[0],
"Unexpected error line found: %s" % error_lines[0])
# no warning
self.assertFalse(self.logger.get_lines_for_level('warning'))
def test_reconstruct_fa_mixture_of_errors_fails(self): def test_reconstruct_fa_mixture_of_errors_fails(self):
self._create_fragment(2)
job = { job = {
'partition': 0, 'partition': 0,
'policy': self.policy, 'policy': self.policy,
@ -4839,7 +4893,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
range(policy.object_ring.replicas - 4)] range(policy.object_ring.replicas - 4)]
with mocked_http_conn(*codes): with mocked_http_conn(*codes):
self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa, self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa,
job, node, self.obj_metadata) job, node, self.df)
exp_timeouts = len([c for c in codes if isinstance(c, Timeout)]) exp_timeouts = len([c for c in codes if isinstance(c, Timeout)])
exp_404s = len([c for c in codes if c == 404]) exp_404s = len([c for c in codes if c == 404])
exp_507s = len([c for c in codes if c == 507]) exp_507s = len([c for c in codes if c == 507])
@ -4901,7 +4955,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
codes, body_iter, headers = zip(*responses) codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa( df = self.reconstructor.reconstruct_fa(
job, node, self.obj_metadata) job, node, self._create_fragment(2))
fixed_body = b''.join(df.reader()) fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual( self.assertEqual(
@ -4941,7 +4995,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
codes, body_iter, headers = zip(*responses) codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa( df = self.reconstructor.reconstruct_fa(
job, node, dict(self.obj_metadata)) job, node, self._create_fragment(2))
fixed_body = b''.join(df.reader()) fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual( self.assertEqual(
@ -4958,7 +5012,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
codes, body_iter, headers = zip(*responses) codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa( df = self.reconstructor.reconstruct_fa(
job, node, dict(self.obj_metadata)) job, node, self._create_fragment(2))
fixed_body = b''.join(df.reader()) fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual( self.assertEqual(
@ -4995,7 +5049,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
codes, body_iter, headers = zip(*responses) codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa( df = self.reconstructor.reconstruct_fa(
job, node, dict(self.obj_metadata)) job, node, self._create_fragment(2))
fixed_body = b''.join(df.reader()) fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual( self.assertEqual(
@ -5016,7 +5070,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
codes, body_iter, headers = zip(*responses) codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa( df = self.reconstructor.reconstruct_fa(
job, node, dict(self.obj_metadata)) job, node, self._create_fragment(2))
fixed_body = b''.join(df.reader()) fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual( self.assertEqual(
@ -5080,7 +5134,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
codes, body_iter, headers = zip(*responses) codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa, self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa,
job, node, self.obj_metadata) job, node, self._create_fragment(2))
error_lines = self.logger.get_lines_for_level('error') error_lines = self.logger.get_lines_for_level('error')
# 1 error log per etag to report not enough responses # 1 error log per etag to report not enough responses
@ -5111,6 +5165,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
self.assertFalse(self.logger.get_lines_for_level('warning')) self.assertFalse(self.logger.get_lines_for_level('warning'))
def test_reconstruct_fa_with_mixed_etags_same_timestamp_fail(self): def test_reconstruct_fa_with_mixed_etags_same_timestamp_fail(self):
self._create_fragment(2)
job = { job = {
'partition': 0, 'partition': 0,
'policy': self.policy, 'policy': self.policy,
@ -5157,7 +5212,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
codes, body_iter, headers = zip(*responses) codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa, self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa,
job, node, self.obj_metadata) job, node, self.df)
error_lines = self.logger.get_lines_for_level('error') error_lines = self.logger.get_lines_for_level('error')
self.assertGreater(len(error_lines), 1) self.assertGreater(len(error_lines), 1)
@ -5219,7 +5274,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
codes, body_iter, headers = zip(*responses) codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa( df = self.reconstructor.reconstruct_fa(
job, node, self.obj_metadata) job, node, self._create_fragment(2))
fixed_body = b''.join(df.reader()) fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual( self.assertEqual(
@ -5234,7 +5289,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
# redundant frag found once in first ec_ndata responses # redundant frag found once in first ec_ndata responses
self.assertIn( self.assertIn(
'Found existing frag #%s at' % broken_index, 'Found existing frag #%s at' % broken_index,
debug_log_lines[0]) debug_log_lines[0], debug_log_lines)
# N.B. in the future, we could avoid those check because # N.B. in the future, we could avoid those check because
# definitely sending the copy rather than reconstruct will # definitely sending the copy rather than reconstruct will
@ -5251,6 +5306,537 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
debug_log_lines[1][len(log_prefix):]) debug_log_lines[1][len(log_prefix):])
self.assertNotIn(broken_index, got_frag_index_list) self.assertNotIn(broken_index, got_frag_index_list)
def test_quarantine_threshold_conf(self):
reconstructor = object_reconstructor.ObjectReconstructor({})
self.assertEqual(0, reconstructor.quarantine_threshold)
reconstructor = object_reconstructor.ObjectReconstructor(
{'quarantine_threshold': '0'})
self.assertEqual(0, reconstructor.quarantine_threshold)
reconstructor = object_reconstructor.ObjectReconstructor(
{'quarantine_threshold': '1'})
self.assertEqual(1, reconstructor.quarantine_threshold)
reconstructor = object_reconstructor.ObjectReconstructor(
{'quarantine_threshold': 2.0})
self.assertEqual(2, reconstructor.quarantine_threshold)
for bad in ('1.1', 1.1, '-1', -1, 'auto', 'bad'):
with annotate_failure(bad):
with self.assertRaises(ValueError):
object_reconstructor.ObjectReconstructor(
{'quarantine_threshold': bad})
def test_request_node_count_conf(self):
# default is 1 * replicas
reconstructor = object_reconstructor.ObjectReconstructor({})
self.assertEqual(6, reconstructor.request_node_count(3))
self.assertEqual(22, reconstructor.request_node_count(11))
def do_test(value, replicas, expected):
reconstructor = object_reconstructor.ObjectReconstructor(
{'request_node_count': value})
self.assertEqual(expected,
reconstructor.request_node_count(replicas))
do_test('0', 10, 0)
do_test('1 * replicas', 3, 3)
do_test('1 * replicas', 11, 11)
do_test('2 * replicas', 3, 6)
do_test('2 * replicas', 11, 22)
do_test('11', 11, 11)
do_test('10', 11, 10)
do_test('12', 11, 12)
for bad in ('1.1', 1.1, 'auto', 'bad',
'2.5 * replicas', 'two * replicas'):
with annotate_failure(bad):
with self.assertRaises(ValueError):
object_reconstructor.ObjectReconstructor(
{'request_node_count': bad})
def _do_test_reconstruct_insufficient_frags(
self, extra_conf, num_frags, other_responses,
local_frag_index=2, frag_index_to_rebuild=1,
resp_timestamps=None, resp_etags=None):
# num_frags is number of ok responses, other_responses is bad responses
# By default frag_index_to_rebuild is less than local_frag_index and
# all frag responses have indexes >= local_frag_index
self.assertGreater(num_frags, 0)
self.logger.clear()
self._configure_reconstructor(**extra_conf)
self._create_fragment(local_frag_index)
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[frag_index_to_rebuild]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = (b'rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data, usedforsecurity=False).hexdigest()
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
frags = ec_archive_bodies[
local_frag_index:local_frag_index + num_frags]
if resp_etags:
self.assertEqual(len(frags), len(resp_etags))
etags = []
for other_etag in resp_etags:
# use default etag where other_etag is None
etags.append(other_etag if other_etag else etag)
else:
etags = [etag] * len(frags)
def make_header(body):
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etags.pop(0)})
return headers
responses = [(200, frag, make_header(frag)) for frag in frags]
codes, body_iter, headers = zip(*(responses + other_responses))
resp_timestamps = (resp_timestamps if resp_timestamps
else [self.obj_timestamp] * len(codes))
resp_timestamps = [ts.internal for ts in resp_timestamps]
with mocked_http_conn(*codes, body_iter=body_iter,
headers=headers,
timestamps=resp_timestamps):
with self.assertRaises(DiskFileError) as cm:
self.reconstructor.reconstruct_fa(
job, node, self._create_fragment(2))
return cm.exception
def _verify_error_lines(self, num_frags, other_responses,
exp_useful_responses):
error_lines = self.logger.get_lines_for_level('error')
self.assertEqual(2, len(error_lines), error_lines)
self.assertIn(
'Unable to get enough responses (%d/%d from %d ok responses)'
% (exp_useful_responses, self.policy.ec_ndata, num_frags),
error_lines[0])
bad_codes = collections.Counter(
status for status, _, _ in other_responses)
errors = ', '.join('%s x %s' % (num, code)
for code, num in sorted(bad_codes.items()))
self.assertIn('Unable to get enough responses (%s error responses)'
% errors, error_lines[1])
def _assert_diskfile_quarantined(self):
warning_lines = self.logger.get_lines_for_level('warning')
self.assertEqual(1, len(warning_lines), warning_lines)
self.assertIn('Quarantined object', warning_lines[0])
# Check the diskfile has moved to quarantine dir
data_filename = os.path.basename(self.df._data_file)
df_hash = os.path.basename(self.df._datadir)
quarantine_dir = os.path.join(
self.df._device_path, 'quarantined',
diskfile.get_data_dir(self.policy), df_hash)
self.assertTrue(os.path.isdir(quarantine_dir))
quarantine_file = os.path.join(quarantine_dir, data_filename)
self.assertTrue(os.path.isfile(quarantine_file))
with open(quarantine_file, 'r') as fd:
self.assertEqual('test data', fd.read())
self.assertFalse(os.path.exists(self.df._data_file))
def _assert_diskfile_not_quarantined(self):
# Check the diskfile has not moved to quarantine dir
quarantine_dir = os.path.join(
self.df._device_path, 'quarantined')
self.assertFalse(os.path.isdir(quarantine_dir))
self.assertTrue(os.path.exists(self.df._data_file))
with open(self.df._data_file, 'r') as fd:
self.assertEqual('test data', fd.read())
def test_reconstruct_fa_quarantine_threshold_one_rnc_two_replicas(self):
# use default request_node_count == 2 * replicas
num_other_resps = 2 * self.policy.object_ring.replicas - 2
other_responses = [(404, None, None)] * num_other_resps
conf = {'quarantine_threshold': 1, 'reclaim_age': 0}
exc = self._do_test_reconstruct_insufficient_frags(
conf, 1, other_responses)
self.assertIsInstance(exc, DiskFileQuarantined)
self._assert_diskfile_quarantined()
self._verify_error_lines(1, other_responses, 1)
def test_reconstruct_fa_quarantine_threshold_one_rnc_three_replicas(self):
num_other_resps = 3 * self.policy.object_ring.replicas - 2
other_responses = [(404, None, None)] * num_other_resps
conf = {'quarantine_threshold': 1, 'reclaim_age': 0,
'request_node_count': '3 * replicas'}
# set ring get_more_nodes to yield enough handoffs
self.policy.object_ring.max_more_nodes = (
2 * self.policy.object_ring.replicas)
exc = self._do_test_reconstruct_insufficient_frags(
conf, 1, other_responses)
self.assertIsInstance(exc, DiskFileQuarantined)
self._assert_diskfile_quarantined()
self._verify_error_lines(1, other_responses, 1)
def test_reconstruct_fa_quarantine_threshold_one_rnc_four_replicas(self):
# verify handoff search exhausting handoff node iter
num_other_resps = 3 * self.policy.object_ring.replicas - 2
other_responses = [(404, None, None)] * num_other_resps
conf = {'quarantine_threshold': 1, 'reclaim_age': 0,
'request_node_count': '4 * replicas'}
# limit ring get_more_nodes to yield less than
# (request_node_count - 1 * replicas) nodes
self.policy.object_ring.max_more_nodes = (
2 * self.policy.object_ring.replicas)
exc = self._do_test_reconstruct_insufficient_frags(
conf, 1, other_responses)
self.assertIsInstance(exc, DiskFileQuarantined)
self._assert_diskfile_quarantined()
self._verify_error_lines(1, other_responses, 1)
def test_reconstruct_fa_quarantine_threshold_one_rnc_absolute_number(self):
def do_test(rnc_num):
if rnc_num < self.policy.object_ring.replicas:
num_other_resps = self.policy.object_ring.replicas - 2
else:
num_other_resps = rnc_num - 2
other_responses = [(404, None, None)] * num_other_resps
conf = {'quarantine_threshold': 1, 'reclaim_age': 0,
'request_node_count': str(rnc_num)}
# set ring get_more_nodes to yield enough handoffs
self.policy.object_ring.max_more_nodes = (
2 * self.policy.object_ring.replicas)
exc = self._do_test_reconstruct_insufficient_frags(
conf, 1, other_responses)
self.assertIsInstance(exc, DiskFileQuarantined)
self._assert_diskfile_quarantined()
self._verify_error_lines(1, other_responses, 1)
for rnc_num in range(0, 3 * self.policy.object_ring.replicas):
do_test(rnc_num)
def test_reconstruct_fa_quarantine_threshold_two(self):
num_other_resps = 2 * self.policy.object_ring.replicas - 3
other_responses = [(404, None, None)] * num_other_resps
conf = {'quarantine_threshold': 2, 'reclaim_age': 0}
exc = self._do_test_reconstruct_insufficient_frags(
conf, 2, other_responses)
self.assertIsInstance(exc, DiskFileQuarantined)
self._assert_diskfile_quarantined()
self._verify_error_lines(2, other_responses, 2)
def test_reconstruct_fa_no_quarantine_more_than_threshold_frags(self):
# default config
num_other_resps = self.policy.object_ring.replicas - 2
other_responses = [(404, None, None)] * num_other_resps
exc = self._do_test_reconstruct_insufficient_frags(
{'reclaim_age': 0}, 1, other_responses)
self.assertIsInstance(exc, DiskFileError)
self._assert_diskfile_not_quarantined()
# configured quarantine_threshold
for quarantine_threshold in range(self.policy.ec_ndata):
for num_frags in range(quarantine_threshold + 1,
self.policy.ec_ndata):
num_other_resps = (self.policy.object_ring.replicas -
num_frags - 1)
other_responses = [(404, None, None)] * num_other_resps
exc = self._do_test_reconstruct_insufficient_frags(
{'quarantine_threshold': quarantine_threshold,
'reclaim_age': 0},
num_frags, other_responses)
self.assertIsInstance(exc, DiskFileError)
self._assert_diskfile_not_quarantined()
self._verify_error_lines(num_frags, other_responses, num_frags)
warning_lines = self.logger.get_lines_for_level('warning')
self.assertEqual([], warning_lines)
# responses include the frag_index_to_rebuild - verify that response is
# counted against the threshold
num_other_resps = self.policy.object_ring.replicas - 3
other_responses = [(404, None, None)] * num_other_resps
exc = self._do_test_reconstruct_insufficient_frags(
{'quarantine_threshold': 1, 'reclaim_age': 0}, 2, other_responses,
local_frag_index=2, frag_index_to_rebuild=3)
self.assertIsInstance(exc, DiskFileError)
self._assert_diskfile_not_quarantined()
self._verify_error_lines(2, other_responses, 1)
def test_reconstruct_fa_no_quarantine_non_404_response(self):
num_frags = 1
ring = self.policy.object_ring
for bad_status in (400, 503, 507):
# a non-404 in primary responses will prevent quarantine
num_other_resps = ring.replicas - num_frags - 1
other_responses = [(404, None, None)] * (num_other_resps - 1)
other_responses.append((bad_status, None, None))
exc = self._do_test_reconstruct_insufficient_frags(
{'quarantine_threshold': 1, 'reclaim_age': 0},
num_frags, other_responses)
self.assertIsInstance(exc, DiskFileError)
self._assert_diskfile_not_quarantined()
self._verify_error_lines(num_frags, other_responses, num_frags)
warning_lines = self.logger.get_lines_for_level('warning')
self.assertEqual(1, len(warning_lines), warning_lines)
self.assertIn('Invalid response %s' % bad_status, warning_lines[0])
# a non-404 in handoff responses will prevent quarantine; non-404
# is the *final* handoff response...
ring.max_more_nodes = (13 * ring.replicas)
for request_node_count in (2, 3, 13):
num_other_resps = (request_node_count * ring.replicas
- num_frags - 1)
other_responses = [(404, None, None)] * (num_other_resps - 1)
other_responses.append((bad_status, None, None))
with annotate_failure(
'request_node_count=%d' % request_node_count):
exc = self._do_test_reconstruct_insufficient_frags(
{'quarantine_threshold': 1,
'reclaim_age': 0,
'request_node_count': '%s * replicas'
% request_node_count},
num_frags, other_responses)
self.assertIsInstance(exc, DiskFileError)
self._assert_diskfile_not_quarantined()
self._verify_error_lines(num_frags, other_responses, num_frags)
warning_lines = self.logger.get_lines_for_level('warning')
self.assertEqual(1, len(warning_lines), warning_lines)
self.assertIn('Invalid response %s' % bad_status,
warning_lines[0])
# a non-404 in handoff responses will prevent quarantine; non-404
# is part way through all handoffs so not all handoffs are used
# regardless of how big request_node_count is
non_404_handoff = 3
for request_node_count in (2, 3, 13):
# replicas - 1 - num_frags other_responses from primaries,
# plus a batch of replicas - 1 during which non-404 shows up,
# plus some that trickle out before the non-404 shows up, but
# limited to (request_node_count * replicas - num_frags - 1)
# e.g. for 10+4 policy with request_node_count > 2
# - batch of 13 requests go to primaries,
# - 12 other_responses are consumed,
# - then a batch of 13 handoff requests is sent,
# - the non-404 is the 4th response in that batch,
# - so 3 more requests will have been trickled out
batch_size = ring.replicas - 1
num_other_resps = min(
2 * batch_size - num_frags + non_404_handoff,
request_node_count * ring.replicas - 1 - num_frags)
other_responses = [(404, None, None)] * (num_other_resps - 1)
other_responses.insert(
batch_size - num_frags + non_404_handoff,
(bad_status, None, None))
exc = self._do_test_reconstruct_insufficient_frags(
{'quarantine_threshold': 1, 'reclaim_age': 0,
'request_node_count': '%s * replicas'
% request_node_count},
num_frags, other_responses)
self.assertIsInstance(exc, DiskFileError)
self._assert_diskfile_not_quarantined()
self._verify_error_lines(num_frags, other_responses, num_frags)
warning_lines = self.logger.get_lines_for_level('warning')
self.assertEqual(1, len(warning_lines), warning_lines)
self.assertIn('Invalid response %s' % bad_status,
warning_lines[0])
def test_reconstruct_fa_no_quarantine_frag_not_old_enough(self):
# verify that solitary fragment is not quarantined if it has not
# reached reclaim_age
num_other_resps = self.policy.object_ring.replicas - 2
other_responses = [(404, None, None)] * num_other_resps
exc = self._do_test_reconstruct_insufficient_frags(
{'quarantine_threshold': 1, 'reclaim_age': 10000},
1, other_responses)
self.assertIsInstance(exc, DiskFileError)
self._assert_diskfile_not_quarantined()
self._verify_error_lines(1, other_responses, 1)
exc = self._do_test_reconstruct_insufficient_frags(
{'quarantine_threshold': 1}, # default reclaim_age
1, other_responses)
self.assertIsInstance(exc, DiskFileError)
self._assert_diskfile_not_quarantined()
self._verify_error_lines(1, other_responses, 1)
def test_reconstruct_fa_no_quarantine_frag_resp_different_timestamp(self):
# verify that solitary fragment is not quarantined if the only frag
# response is for a different timestamp than the local frag
resp_timestamp = utils.Timestamp(float(self.obj_timestamp) + 1)
num_other_resps = self.policy.object_ring.replicas - 2
other_responses = [(404, None, None)] * num_other_resps
resp_timestamps = [resp_timestamp] * (num_other_resps + 1)
exc = self._do_test_reconstruct_insufficient_frags(
{'quarantine_threshold': 1, 'reclaim_age': 0},
1, other_responses, resp_timestamps=resp_timestamps)
self.assertIsInstance(exc, DiskFileError)
self._assert_diskfile_not_quarantined()
self._verify_error_lines(1, other_responses, 1)
def test_reconstruct_fa_no_quarantine_frag_resp_mixed_timestamps(self):
# verify that solitary fragment is not quarantined if there is a
# response for a frag at different timestamp in addition to the
# response for the solitary local frag
resp_timestamp = utils.Timestamp(float(self.obj_timestamp) + 1)
num_other_resps = self.policy.object_ring.replicas - 3
other_responses = [(404, None, None)] * num_other_resps
resp_timestamps = ([self.obj_timestamp] +
[resp_timestamp] * (num_other_resps + 1))
exc = self._do_test_reconstruct_insufficient_frags(
{'quarantine_threshold': 1, 'reclaim_age': 0},
2, other_responses, resp_timestamps=resp_timestamps)
self.assertIsInstance(exc, DiskFileError)
self._assert_diskfile_not_quarantined()
error_lines = self.logger.get_lines_for_level('error')
self.assertEqual(3, len(error_lines), error_lines)
self.assertIn(
'Unable to get enough responses (1/%d from 1 ok responses)'
% (self.policy.ec_ndata,), error_lines[0])
self.assertIn(
'Unable to get enough responses (1/%d from 1 ok responses)'
% (self.policy.ec_ndata,), error_lines[1])
self.assertIn(
'Unable to get enough responses (%d x 404 error responses)'
% num_other_resps, error_lines[2])
def test_reconstruct_fa_no_quarantine_frag_resp_mixed_etags(self):
# verify that solitary fragment is not quarantined if there is a
# response for a frag with different etag in addition to the
# response for the solitary local frag
etags = [None, 'unexpected_etag']
num_other_resps = self.policy.object_ring.replicas - 3
other_responses = [(404, None, None)] * num_other_resps
exc = self._do_test_reconstruct_insufficient_frags(
{'quarantine_threshold': 1, 'reclaim_age': 0},
2, other_responses, resp_etags=etags)
self.assertIsInstance(exc, DiskFileError)
self._assert_diskfile_not_quarantined()
error_lines = self.logger.get_lines_for_level('error')
self.assertEqual(3, len(error_lines), error_lines)
self.assertIn(
'Mixed Etag', error_lines[0])
self.assertIn(
'Unable to get enough responses (1/%d from 2 ok responses)'
% (self.policy.ec_ndata,), error_lines[1])
self.assertIn(
'Unable to get enough responses (%d x 404 error responses)'
% num_other_resps, error_lines[2])
def _do_test_reconstruct_fa_no_quarantine_bad_headers(self, bad_headers):
# verify that responses with invalid headers count against the
# quarantine_threshold
self._configure_reconstructor(reclaim_age=0, quarantine_threshold=1)
local_frag_index = 2
self._create_fragment(local_frag_index)
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[0]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = (b'rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data, usedforsecurity=False).hexdigest()
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
def make_header(body):
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
return headers
responses = []
body = ec_archive_bodies[2]
headers = make_header(body)
responses.append((200, body, headers))
body = ec_archive_bodies[3]
headers = make_header(body)
headers.update(bad_headers)
responses.append((200, body, headers))
other_responses = ([(404, None, None)] *
(self.policy.object_ring.replicas - 3))
codes, body_iter, headers = zip(*(responses + other_responses))
resp_timestamps = [self.obj_timestamp] * len(codes)
resp_timestamps = [ts.internal for ts in resp_timestamps]
with mocked_http_conn(*codes, body_iter=body_iter,
headers=headers,
timestamps=resp_timestamps):
with self.assertRaises(DiskFileError) as cm:
self.reconstructor.reconstruct_fa(
job, node, self._create_fragment(2))
self.assertIsInstance(cm.exception, DiskFileError)
self._assert_diskfile_not_quarantined()
error_lines = self.logger.get_lines_for_level('error')
self.assertEqual(2, len(error_lines), error_lines)
self.assertIn(
'Unable to get enough responses (1/%d from 1 ok responses)'
% (self.policy.ec_ndata,), error_lines[0])
self.assertIn(
'Unable to get enough responses '
'(1 x unknown, %d x 404 error responses)'
% len(other_responses), error_lines[1])
def test_reconstruct_fa_no_quarantine_invalid_frag_index_header(self):
self._do_test_reconstruct_fa_no_quarantine_bad_headers(
{'X-Object-Sysmeta-Ec-Frag-Index': 'two'})
def test_reconstruct_fa_no_quarantine_missing_frag_index_header(self):
self._do_test_reconstruct_fa_no_quarantine_bad_headers(
{'X-Object-Sysmeta-Ec-Frag-Index': ''})
def test_reconstruct_fa_no_quarantine_missing_timestamp_header(self):
self._do_test_reconstruct_fa_no_quarantine_bad_headers(
{'X-Backend-Data-Timestamp': ''})
def test_reconstruct_fa_no_quarantine_missing_etag_header(self):
self._do_test_reconstruct_fa_no_quarantine_bad_headers(
{'X-Object-Sysmeta-Ec-Etag': ''})
def test_reconstruct_fa_frags_on_handoffs(self):
# just a lonely old frag on primaries: this appears to be a quarantine
# candidate, but unexpectedly the other frags are found on handoffs so
# expect rebuild
# set reclaim_age to 0 to make lonely frag old enugh for quarantine
self._configure_reconstructor(quarantine_threshold=1, reclaim_age=0)
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = (b'rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data, usedforsecurity=False).hexdigest()
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(1)
# arrange for just one 200 to come from a primary, then 404s, then 200s
# from handoffs
responses = list()
for i, body in enumerate(ec_archive_bodies):
if i == 1:
# skip: this is the frag index we're rebuilding; insert 404s
responses.extend(
((404, None, None),) * self.policy.object_ring.replicas)
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
responses.append((200, body, headers))
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(
*codes, body_iter=body_iter, headers=headers,
timestamps=[self.obj_timestamp.internal] * len(codes)):
df = self.reconstructor.reconstruct_fa(
job, node, self._create_fragment(0, body=b''))
self.assertEqual(0, df.content_length)
fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body, usedforsecurity=False).hexdigest(),
md5(broken_body, usedforsecurity=False).hexdigest())
# no error and warning
self.assertFalse(self.logger.get_lines_for_level('error'))
self.assertFalse(self.logger.get_lines_for_level('warning'))
debug_lines = self.logger.get_lines_for_level('debug')
self.assertIn('Reconstructing frag from handoffs, node_count=%d'
% (self.policy.object_ring.replicas * 2), debug_lines)
def test_reconstruct_fa_finds_duplicate_does_not_fail(self): def test_reconstruct_fa_finds_duplicate_does_not_fail(self):
job = { job = {
'partition': 0, 'partition': 0,
@ -5280,7 +5866,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
codes, body_iter, headers = zip(*responses) codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa( df = self.reconstructor.reconstruct_fa(
job, node, self.obj_metadata) job, node, self._create_fragment(2))
fixed_body = b''.join(df.reader()) fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual( self.assertEqual(
@ -5339,7 +5925,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
with mocked_http_conn( with mocked_http_conn(
*codes, body_iter=body_iter, headers=headers) as mock_conn: *codes, body_iter=body_iter, headers=headers) as mock_conn:
df = self.reconstructor.reconstruct_fa( df = self.reconstructor.reconstruct_fa(
job, node, self.obj_metadata) job, node, self._create_fragment(2))
fixed_body = b''.join(df.reader()) fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual( self.assertEqual(
@ -5408,7 +5994,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
with mocked_http_conn( with mocked_http_conn(
*codes, body_iter=body_iter, headers=headers) as mock_conn: *codes, body_iter=body_iter, headers=headers) as mock_conn:
df = self.reconstructor.reconstruct_fa( df = self.reconstructor.reconstruct_fa(
job, node, self.obj_metadata) job, node, self._create_fragment(2))
fixed_body = b''.join(df.reader()) fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual( self.assertEqual(
@ -5439,7 +6025,60 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
@patch_policies(with_ec_default=True) @patch_policies(with_ec_default=True)
class TestReconstructFragmentArchiveUTF8(TestReconstructFragmentArchive): class TestReconstructFragmentArchiveUTF8(TestReconstructFragmentArchive):
# repeat superclass tests with an object path that contains non-ascii chars # repeat superclass tests with an object path that contains non-ascii chars
obj_path = b'/a/c/o\xc3\xa8' obj_name = b'o\xc3\xa8'
@patch_policies([ECStoragePolicy(0, name='ec', is_default=True,
ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=10, ec_nparity=4,
ec_segment_size=4096,
ec_duplication_factor=2),
StoragePolicy(1, name='other')],
fake_ring_args=[{'replicas': 28}, {'replicas': 3}])
class TestReconstructFragmentArchiveECDuplicationFactor(
TestReconstructFragmentArchive):
def test_reconstruct_fa_no_quarantine_duplicate_frags(self):
# verify that quarantine does not happen if the only other response in
# addition to the lonely frag's own response is for the same
# (duplicate) frag index
self._configure_reconstructor(quarantine_threshold=1, reclaim_age=0)
local_frag_index = 2
self._create_fragment(local_frag_index)
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[0]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = (b'rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data, usedforsecurity=False).hexdigest()
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
frags = [
ec_archive_bodies[local_frag_index],
ec_archive_bodies[local_frag_index +
self.policy.ec_n_unique_fragments]]
def make_header(body):
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
return headers
responses = [(200, frag, make_header(frag)) for frag in frags]
other_responses = ([(404, None, None)] *
(self.policy.ec_n_unique_fragments * 2 - 3))
codes, body_iter, headers = zip(*(responses + other_responses))
resp_timestamps = [self.obj_timestamp.internal] * len(codes)
with mocked_http_conn(*codes, body_iter=body_iter,
headers=headers,
timestamps=resp_timestamps):
with self.assertRaises(DiskFileError) as cm:
self.reconstructor.reconstruct_fa(
job, node, self._create_fragment(2))
self.assertIsInstance(cm.exception, DiskFileError)
self._assert_diskfile_not_quarantined()
self._verify_error_lines(2, other_responses, 1)
@patch_policies([ECStoragePolicy(0, name='ec', is_default=True, @patch_policies([ECStoragePolicy(0, name='ec', is_default=True,
@ -5455,6 +6094,13 @@ class TestObjectReconstructorECDuplicationFactor(TestObjectReconstructor):
self.fabricated_ring = FabricatedRing(replicas=28, devices=56) self.fabricated_ring = FabricatedRing(replicas=28, devices=56)
def _test_reconstruct_with_duplicate_frags_no_errors(self, index): def _test_reconstruct_with_duplicate_frags_no_errors(self, index):
utils.mkdirs(os.path.join(self.devices, 'sda1'))
df_mgr = self.reconstructor._df_router[self.policy]
df = df_mgr.get_diskfile('sda1', 9, 'a', 'c', 'o',
policy=self.policy)
write_diskfile(df, self.ts(), data=b'', frag_index=2)
df.open()
job = { job = {
'partition': 0, 'partition': 0,
'policy': self.policy, 'policy': self.policy,
@ -5462,12 +6108,6 @@ class TestObjectReconstructorECDuplicationFactor(TestObjectReconstructor):
part_nodes = self.policy.object_ring.get_part_nodes(0) part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[index] node = part_nodes[index]
node['backend_index'] = self.policy.get_backend_index(node['index']) node['backend_index'] = self.policy.get_backend_index(node['index'])
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345',
}
test_data = (b'rebuild' * self.policy.ec_segment_size)[:-777] test_data = (b'rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data, usedforsecurity=False).hexdigest() etag = md5(test_data, usedforsecurity=False).hexdigest()
@ -5486,9 +6126,9 @@ class TestObjectReconstructorECDuplicationFactor(TestObjectReconstructor):
called_headers = [] called_headers = []
orig_func = object_reconstructor.ObjectReconstructor._get_response orig_func = object_reconstructor.ObjectReconstructor._get_response
def _get_response_hook(self, node, part, path, headers, policy): def _get_response_hook(self, node, policy, part, path, headers):
called_headers.append(headers) called_headers.append(headers)
return orig_func(self, node, part, path, headers, policy) return orig_func(self, node, policy, part, path, headers)
# need parity + 1 node failures to reach duplicated fragments # need parity + 1 node failures to reach duplicated fragments
failed_start_at = ( failed_start_at = (
@ -5505,7 +6145,7 @@ class TestObjectReconstructorECDuplicationFactor(TestObjectReconstructor):
with mocked_http_conn( with mocked_http_conn(
*codes, body_iter=body_iter, headers=headers): *codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa( df = self.reconstructor.reconstruct_fa(
job, node, metadata) job, node, df)
fixed_body = b''.join(df.reader()) fixed_body = b''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual( self.assertEqual(

View File

@ -16,11 +16,9 @@ from collections import defaultdict
import mock import mock
import os import os
import time
import unittest import unittest
import eventlet import eventlet
import itertools
from six.moves import urllib from six.moves import urllib
from swift.common.exceptions import DiskFileNotExist, DiskFileError, \ from swift.common.exceptions import DiskFileNotExist, DiskFileError, \
@ -28,8 +26,7 @@ from swift.common.exceptions import DiskFileNotExist, DiskFileError, \
from swift.common import swob from swift.common import swob
from swift.common import utils from swift.common import utils
from swift.common.storage_policy import POLICIES, EC_POLICY from swift.common.storage_policy import POLICIES, EC_POLICY
from swift.common.utils import Timestamp from swift.obj import ssync_sender, server, diskfile
from swift.obj import ssync_sender, server
from swift.obj.reconstructor import RebuildingECDiskFileStream, \ from swift.obj.reconstructor import RebuildingECDiskFileStream, \
ObjectReconstructor ObjectReconstructor
from swift.obj.replicator import ObjectReplicator from swift.obj.replicator import ObjectReplicator
@ -38,7 +35,7 @@ from test import listen_zero
from test.debug_logger import debug_logger from test.debug_logger import debug_logger
from test.unit.obj.common import BaseTest from test.unit.obj.common import BaseTest
from test.unit import patch_policies, encode_frag_archive_bodies, \ from test.unit import patch_policies, encode_frag_archive_bodies, \
skip_if_no_xattrs, quiet_eventlet_exceptions skip_if_no_xattrs, quiet_eventlet_exceptions, make_timestamp_iter
class TestBaseSsync(BaseTest): class TestBaseSsync(BaseTest):
@ -62,8 +59,7 @@ class TestBaseSsync(BaseTest):
'log_requests': 'false'} 'log_requests': 'false'}
self.rx_logger = debug_logger() self.rx_logger = debug_logger()
self.rx_controller = server.ObjectController(conf, self.rx_logger) self.rx_controller = server.ObjectController(conf, self.rx_logger)
self.ts_iter = (Timestamp(t) self.ts_iter = make_timestamp_iter()
for t in itertools.count(int(time.time())))
self.rx_ip = '127.0.0.1' self.rx_ip = '127.0.0.1'
sock = listen_zero() sock = listen_zero()
self.rx_server = eventlet.spawn( self.rx_server = eventlet.spawn(
@ -653,11 +649,12 @@ class TestSsyncEC(TestBaseSsyncEC):
reconstruct_fa_calls = [] reconstruct_fa_calls = []
def fake_reconstruct_fa(job, node, metadata): def fake_reconstruct_fa(job, node, df):
reconstruct_fa_calls.append((job, node, policy, metadata)) reconstruct_fa_calls.append((job, node, policy, df))
if len(reconstruct_fa_calls) == 2: if len(reconstruct_fa_calls) == 2:
# simulate second reconstruct failing # simulate second reconstruct failing
raise DiskFileError raise DiskFileError
metadata = df.get_datafile_metadata()
content = self._get_object_data(metadata['name'], content = self._get_object_data(metadata['name'],
frag_index=rx_node_index) frag_index=rx_node_index)
return RebuildingECDiskFileStream( return RebuildingECDiskFileStream(
@ -702,7 +699,8 @@ class TestSsyncEC(TestBaseSsyncEC):
# remove the failed df from expected synced df's # remove the failed df from expected synced df's
expect_sync_paths = ['/a/c/o1', '/a/c/o2', '/a/c/o3', '/a/c/o5'] expect_sync_paths = ['/a/c/o1', '/a/c/o2', '/a/c/o3', '/a/c/o5']
failed_path = reconstruct_fa_calls[1][3]['name'] failed_df = reconstruct_fa_calls[1][3]
failed_path = failed_df.get_datafile_metadata()['name']
expect_sync_paths.remove(failed_path) expect_sync_paths.remove(failed_path)
failed_obj = None failed_obj = None
for obj, diskfiles in tx_objs.items(): for obj, diskfiles in tx_objs.items():
@ -843,26 +841,26 @@ class TestSsyncEC(TestBaseSsyncEC):
class FakeResponse(object): class FakeResponse(object):
def __init__(self, frag_index, obj_data, length=None): def __init__(self, frag_index, obj_data, length=None, status=200):
self.headers = {
'X-Object-Sysmeta-Ec-Frag-Index': str(frag_index),
'X-Object-Sysmeta-Ec-Etag': 'the etag',
'X-Backend-Timestamp': '1234567890.12345'
}
self.frag_index = frag_index self.frag_index = frag_index
self.obj_data = obj_data self.obj_data = obj_data
self.data = b'' self.data = b''
self.length = length self.length = length
self.status = 200 self.status = status
def init(self, path): def init(self, path, conf):
if isinstance(self.obj_data, Exception): if isinstance(self.obj_data, Exception):
self.data = self.obj_data self.data = self.obj_data
else: else:
self.data = self.obj_data[path][self.frag_index] self.data = self.obj_data[path][self.frag_index]
self.conf = conf
def getheaders(self): def getheaders(self):
return self.headers return {
'X-Object-Sysmeta-Ec-Frag-Index': str(self.frag_index),
'X-Object-Sysmeta-Ec-Etag': 'the etag',
'X-Backend-Timestamp': self.conf['timestamp'].internal
}
def read(self, length): def read(self, length):
if isinstance(self.data, Exception): if isinstance(self.data, Exception):
@ -878,7 +876,9 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
self.rx_node_index = 0 self.rx_node_index = 0
self.tx_node_index = 1 self.tx_node_index = 1
# create sender side diskfiles... # create sender side diskfiles...ensure their timestamps are in the
# past so that tests that set reclaim_age=0 succeed in reclaiming
self.ts_iter = make_timestamp_iter(offset=-1000)
self.tx_objs = {} self.tx_objs = {}
tx_df_mgr = self.daemon._df_router[self.policy] tx_df_mgr = self.daemon._df_router[self.policy]
t1 = next(self.ts_iter) t1 = next(self.ts_iter)
@ -887,6 +887,8 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
t2 = next(self.ts_iter) t2 = next(self.ts_iter)
self.tx_objs['o2'] = self._create_ondisk_files( self.tx_objs['o2'] = self._create_ondisk_files(
tx_df_mgr, 'o2', self.policy, t2, (self.tx_node_index,)) tx_df_mgr, 'o2', self.policy, t2, (self.tx_node_index,))
self.response_confs = {'/a/c/o1': {'timestamp': t1},
'/a/c/o2': {'timestamp': t2}}
self.suffixes = set() self.suffixes = set()
for diskfiles in list(self.tx_objs.values()): for diskfiles in list(self.tx_objs.values()):
@ -900,7 +902,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
self.frag_length = int( self.frag_length = int(
self.tx_objs['o1'][0].get_metadata()['Content-Length']) self.tx_objs['o1'][0].get_metadata()['Content-Length'])
def _test_reconstructor_sync_job(self, frag_responses): def _test_reconstructor_sync_job(self, frag_responses, custom_conf=None):
# Helper method to mock reconstructor to consume given lists of fake # Helper method to mock reconstructor to consume given lists of fake
# responses while reconstructing a fragment for a sync type job. The # responses while reconstructing a fragment for a sync type job. The
# tests verify that when the reconstructed fragment iter fails in some # tests verify that when the reconstructed fragment iter fails in some
@ -908,25 +910,31 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
# node which have incorrect data. # node which have incorrect data.
# See https://bugs.launchpad.net/swift/+bug/1631144 # See https://bugs.launchpad.net/swift/+bug/1631144
custom_conf = custom_conf if custom_conf else {}
# frag_responses is a list of two lists of responses to each # frag_responses is a list of two lists of responses to each
# reconstructor GET request for a fragment archive. The two items in # reconstructor GET request for a fragment archive. The two items in
# the outer list are lists of responses for each of the two fragments # the outer list are lists of responses for each of the two fragments
# to be reconstructed. Items in the inner lists are responses for each # to be reconstructed, and are used in the order that ssync syncs the
# of the other fragments fetched during the reconstructor rebuild. # fragments. Items in the inner lists are responses for each of the
# other fragments fetched during the reconstructor rebuild.
path_to_responses = {} path_to_responses = {}
fake_get_response_calls = [] fake_get_response_calls = []
def fake_get_response(recon, node, part, path, headers, policy): def fake_get_response(recon, node, policy, part, path, headers):
# select a list of fake responses for this path and return the next # select a list of fake responses for this path and return the next
# from the list # from the list: we don't know the order in which paths will show
# up but we do want frag_responses[0] to be used first, so the
# frag_responses aren't bound to a path until this point
if path not in path_to_responses: if path not in path_to_responses:
path_to_responses[path] = frag_responses.pop(0) path_to_responses[path] = frag_responses.pop(0)
response = path_to_responses[path].pop() response = path_to_responses[path].pop()
# the frag_responses list is in ssync task order, we only know the # the frag_responses list is in ssync task order: we only know the
# path when consuming the responses so initialise the path in the # path when consuming the responses so initialise the path in the
# response now # response now
if response: if response:
response.init(path) response.init(path, self.response_confs[path])
# should be full path but just used for logging...
response.full_path = path
fake_get_response_calls.append(path) fake_get_response_calls.append(path)
return response return response
@ -944,17 +952,19 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
mock.patch.object( mock.patch.object(
self.policy.object_ring, 'get_part_nodes', self.policy.object_ring, 'get_part_nodes',
fake_get_part_nodes): fake_get_part_nodes):
self.reconstructor = ObjectReconstructor( conf = self.daemon_conf
{}, logger=self.logger) conf.update(custom_conf)
self.reconstructor = ObjectReconstructor(conf, logger=self.logger)
job = { job = {
'device': self.device, 'device': self.device,
'partition': self.partition, 'partition': self.partition,
'policy': self.policy, 'policy': self.policy,
'frag_index': self.tx_node_index,
'sync_diskfile_builder': 'sync_diskfile_builder':
self.reconstructor.reconstruct_fa self.reconstructor.reconstruct_fa
} }
sender = ssync_sender.Sender( sender = ssync_sender.Sender(
self.daemon, self.job_node, job, self.suffixes) self.reconstructor, self.job_node, job, self.suffixes)
sender.connect, trace = self.make_connect_wrapper(sender) sender.connect, trace = self.make_connect_wrapper(sender)
sender() sender()
return trace return trace
@ -975,7 +985,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
df = self._open_rx_diskfile( df = self._open_rx_diskfile(
obj_name, self.policy, self.rx_node_index) obj_name, self.policy, self.rx_node_index)
msgs.append('Unexpected rx diskfile for %r with content %r' % msgs.append('Unexpected rx diskfile for %r with content %r' %
(obj_name, ''.join([d for d in df.reader()]))) (obj_name, b''.join([d for d in df.reader()])))
except DiskFileNotExist: except DiskFileNotExist:
pass # expected outcome pass # expected outcome
if msgs: if msgs:
@ -987,6 +997,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
# trampoline for the receiver to write a log # trampoline for the receiver to write a log
eventlet.sleep(0) eventlet.sleep(0)
log_lines = self.rx_logger.get_lines_for_level('warning') log_lines = self.rx_logger.get_lines_for_level('warning')
self.assertEqual(1, len(log_lines), self.rx_logger.all_log_lines())
self.assertIn('ssync subrequest failed with 499', self.assertIn('ssync subrequest failed with 499',
log_lines[0]) log_lines[0])
self.assertFalse(log_lines[1:]) self.assertFalse(log_lines[1:])
@ -1009,7 +1020,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
df = self._open_rx_diskfile( df = self._open_rx_diskfile(
obj_name, self.policy, self.rx_node_index) obj_name, self.policy, self.rx_node_index)
msgs.append('Unexpected rx diskfile for %r with content %r' % msgs.append('Unexpected rx diskfile for %r with content %r' %
(obj_name, ''.join([d for d in df.reader()]))) (obj_name, b''.join([d for d in df.reader()])))
except DiskFileNotExist: except DiskFileNotExist:
pass # expected outcome pass # expected outcome
if msgs: if msgs:
@ -1053,7 +1064,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
df = self._open_rx_diskfile( df = self._open_rx_diskfile(
obj_name, self.policy, self.rx_node_index) obj_name, self.policy, self.rx_node_index)
msgs.append('Unexpected rx diskfile for %r with content %r' % msgs.append('Unexpected rx diskfile for %r with content %r' %
(obj_name, ''.join([d for d in df.reader()]))) (obj_name, b''.join([d for d in df.reader()])))
except DiskFileNotExist: except DiskFileNotExist:
pass # expected outcome pass # expected outcome
if msgs: if msgs:
@ -1111,7 +1122,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
df = self._open_rx_diskfile( df = self._open_rx_diskfile(
obj_name, self.policy, self.rx_node_index) obj_name, self.policy, self.rx_node_index)
msgs.append('Unexpected rx diskfile for %r with content %r' % msgs.append('Unexpected rx diskfile for %r with content %r' %
(obj_name, ''.join([d for d in df.reader()]))) (obj_name, b''.join([d for d in df.reader()])))
except DiskFileNotExist: except DiskFileNotExist:
pass # expected outcome pass # expected outcome
if msgs: if msgs:
@ -1123,6 +1134,109 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
self.assertFalse(self.rx_logger.get_lines_for_level('warning')) self.assertFalse(self.rx_logger.get_lines_for_level('warning'))
self.assertFalse(self.rx_logger.get_lines_for_level('error')) self.assertFalse(self.rx_logger.get_lines_for_level('error'))
def test_sync_reconstructor_quarantines_lonely_frag(self):
# First fragment to sync gets only one response for reconstructor to
# rebuild with, and that response is for the tx_node frag index: it
# should be quarantined, but after that the ssync session should still
# proceeed with rebuilding the second frag.
lonely_frag_responses = [
FakeResponse(i, self.obj_data, status=404)
for i in range(self.policy.ec_ndata + self.policy.ec_nparity)]
lonely_frag_responses[self.tx_node_index].status = 200
frag_responses = [
lonely_frag_responses,
[FakeResponse(i, self.obj_data)
for i in range(self.policy.ec_ndata + self.policy.ec_nparity)]]
# configure reconstructor to quarantine the lonely frag
custom_conf = {'reclaim_age': 0, 'quarantine_threshold': 1}
trace = self._test_reconstructor_sync_job(frag_responses, custom_conf)
results = self._analyze_trace(trace)
self.assertEqual(2, len(results['tx_missing']))
self.assertEqual(2, len(results['rx_missing']))
self.assertEqual(1, len(results['tx_updates']))
self.assertFalse(results['rx_updates'])
self.assertEqual('PUT', results['tx_updates'][0].get('method'))
synced_obj_path = results['tx_updates'][0].get('path')
synced_obj_name = synced_obj_path[-2:]
# verify that the second frag was rebuilt on rx node...
msgs = []
try:
df = self._open_rx_diskfile(
synced_obj_name, self.policy, self.rx_node_index)
self.assertEqual(
self._get_object_data(synced_obj_path,
frag_index=self.rx_node_index),
b''.join([d for d in df.reader()]))
except DiskFileNotExist:
msgs.append('Missing rx diskfile for %r' % synced_obj_name)
# ...and it is still on tx node...
try:
df = self._open_tx_diskfile(
synced_obj_name, self.policy, self.tx_node_index)
self.assertEqual(
self._get_object_data(df._name,
frag_index=self.tx_node_index),
b''.join([d for d in df.reader()]))
except DiskFileNotExist:
msgs.append('Missing tx diskfile for %r' % synced_obj_name)
# verify that the lonely frag was not rebuilt on rx node and was
# removed on tx node
obj_names = list(self.tx_objs)
obj_names.remove(synced_obj_name)
quarantined_obj_name = obj_names[0]
try:
df = self._open_rx_diskfile(
quarantined_obj_name, self.policy, self.rx_node_index)
msgs.append(
'Unexpected rx diskfile for %r with content %r' %
(quarantined_obj_name, b''.join([d for d in df.reader()])))
except DiskFileNotExist:
pass # expected outcome
try:
df = self._open_tx_diskfile(
quarantined_obj_name, self.policy, self.tx_node_index)
msgs.append(
'Unexpected tx diskfile for %r with content %r' %
(quarantined_obj_name, b''.join([d for d in df.reader()])))
except DiskFileNotExist:
pass # expected outcome
if msgs:
self.fail('Failed with:\n%s' % '\n'.join(msgs))
error_lines = self.logger.get_lines_for_level('error')
self.assertEqual(2, len(error_lines), error_lines)
self.assertIn('Unable to get enough responses', error_lines[0])
self.assertIn('Unable to get enough responses', error_lines[1])
warning_lines = self.logger.get_lines_for_level('warning')
self.assertEqual(1, len(warning_lines), warning_lines)
self.assertIn('Quarantined object', warning_lines[0])
# check we have a quarantined data file
df_mgr = self.daemon._df_router[self.policy]
quarantined_df = df_mgr.get_diskfile(
self.device, self.partition, account='a', container='c',
obj=quarantined_obj_name, policy=self.policy,
frag_index=self.tx_node_index)
df_hash = os.path.basename(quarantined_df._datadir)
quarantine_dir = os.path.join(
quarantined_df._device_path, 'quarantined',
diskfile.get_data_dir(self.policy), df_hash)
self.assertTrue(os.path.isdir(quarantine_dir))
data_file = os.listdir(quarantine_dir)[0]
with open(os.path.join(quarantine_dir, data_file), 'rb') as fd:
self.assertEqual(
self._get_object_data(quarantined_df._name,
frag_index=self.tx_node_index),
fd.read())
# trampoline for the receiver to write a log
eventlet.sleep(0)
self.assertFalse(self.rx_logger.get_lines_for_level('warning'))
self.assertFalse(self.rx_logger.get_lines_for_level('error'))
def test_sync_reconstructor_rebuild_ok(self): def test_sync_reconstructor_rebuild_ok(self):
# Sanity test for this class of tests. Both fragments get a full # Sanity test for this class of tests. Both fragments get a full
# complement of responses and rebuild correctly. # complement of responses and rebuild correctly.

View File

@ -4699,10 +4699,30 @@ class TestReplicatedObjectController(
self.assertEqual(resp.status_int, 503) self.assertEqual(resp.status_int, 503)
def test_node_request_setting(self): def test_node_request_setting(self):
baseapp = proxy_server.Application({'request_node_count': '3'}, # default is 2 * replicas
baseapp = proxy_server.Application({},
container_ring=FakeRing(),
account_ring=FakeRing())
self.assertEqual(6, baseapp.request_node_count(3))
def do_test(value, replicas, expected):
baseapp = proxy_server.Application({'request_node_count': value},
container_ring=FakeRing(),
account_ring=FakeRing())
self.assertEqual(expected, baseapp.request_node_count(replicas))
do_test('3', 4, 3)
do_test('1 * replicas', 4, 4)
do_test('2 * replicas', 4, 8)
do_test('4', 4, 4)
do_test('5', 4, 5)
for bad in ('1.1', 1.1, 'auto', 'bad',
'2.5 * replicas', 'two * replicas'):
with self.assertRaises(ValueError):
proxy_server.Application({'request_node_count': bad},
container_ring=FakeRing(), container_ring=FakeRing(),
account_ring=FakeRing()) account_ring=FakeRing())
self.assertEqual(baseapp.request_node_count(3), 3)
def test_iter_nodes(self): def test_iter_nodes(self):
with save_globals(): with save_globals():