Merge "EC Fragment Duplication - Foundational Global EC Cluster Support"

This commit is contained in:
Jenkins 2017-02-26 06:26:08 +00:00 committed by Gerrit Code Review
commit 1f36b5dd16
13 changed files with 2994 additions and 1235 deletions

View File

@ -205,6 +205,11 @@ an EC policy can be setup is shown below::
ec_num_parity_fragments = 4 ec_num_parity_fragments = 4
ec_object_segment_size = 1048576 ec_object_segment_size = 1048576
# Duplicated EC fragments is proof-of-concept experimental support to enable
# Global Erasure Coding policies with multiple regions acting as independent
# failure domains. Do not change the default except in development/testing.
ec_duplication_factor = 1
Let's take a closer look at each configuration parameter: Let's take a closer look at each configuration parameter:
* ``name``: This is a standard storage policy parameter. * ``name``: This is a standard storage policy parameter.
@ -223,6 +228,11 @@ Let's take a closer look at each configuration parameter:
comprised of parity. comprised of parity.
* ``ec_object_segment_size``: The amount of data that will be buffered up before * ``ec_object_segment_size``: The amount of data that will be buffered up before
feeding a segment into the encoder/decoder. The default value is 1048576. feeding a segment into the encoder/decoder. The default value is 1048576.
* ``ec_duplication_factor``: The number of duplicate copies for each fragment.
This is now experimental support to enable Global Erasure Coding policies with
multiple regions. Do not change the default except in development/testing. And
please read the "EC Duplication" section below before changing the default
value.
When PyECLib encodes an object, it will break it into N fragments. However, what When PyECLib encodes an object, it will break it into N fragments. However, what
is important during configuration, is how many of those are data and how many is important during configuration, is how many of those are data and how many
@ -273,6 +283,94 @@ For at least the initial version of EC, it is not recommended that an EC scheme
span beyond a single region, neither performance nor functional validation has span beyond a single region, neither performance nor functional validation has
be been done in such a configuration. be been done in such a configuration.
EC Duplication
^^^^^^^^^^^^^^
ec_duplication_factor is an option to make duplicate copies of fragments
of erasure encoded Swift objects. The default value is 1 (not duplicate).
If an erasure code storage policy is configured with a non-default
ec_duplication_factor of N > 1, then the policy will create N duplicates of
each unique fragment that is returned from the configured EC engine.
Duplication of EC fragments is optimal for EC storage policies which require
dispersion of fragment data across failure domains. Without duplication, almost
of common ec parameters like 10-4 cause less assignments than 1/(the number
of failure domains) of the total unique fragments. And usually, it will be less
than the number of data fragments which are required to construct the original
data. To guarantee the number of fragments in a failure domain, the system
requires more parities. On the situation which needs more parity, empirical
testing has shown using duplication is more efficient in the PUT path than
encoding a schema with num_parity > num_data, and Swift EC supports this schema.
You should evaluate which strategy works best in your environment.
e.g. 10-4 and duplication factor of 2 will store 28 fragments (i.e.
(``ec_num_data_fragments`` + ``ec_num_parity_fragments``) *
``ec_duplication_factor``). This \*can\* allow for a failure domain to rebuild
an object to full durability even when \*more\* than 14 fragments are
unavailable.
.. note::
Current EC Duplication is a part work of EC region support so we still
have some known issues to get complete region supports:
Known-Issues:
- Unique fragment dispersion
Currently, Swift \*doesn't\* guarantee the dispersion of unique
fragments' locations in the global distributed cluster being robust
in the disaster recovery case. While the goal is to have duplicates
of each unique fragment placed in each region, it is currently
possible for duplicates of the same unique fragment to be placed in
the same region. Since a set of ``ec_num_data_fragments`` unique
fragments is required to reconstruct an object, the suboptimal
distribution of duplicates across regions may, in some cases, make it
impossible to assemble such a set from a single region.
For example, if we have a Swift cluster with 2 regions, the fragments may
be located like as:
::
r1
#0#d.data
#0#d.data
#2#d.data
#2#d.data
#4#d.data
#4#d.data
r2
#1#d.data
#1#d.data
#3#d.data
#3#d.data
#5#d.data
#5#d.data
In this case, r1 has only the fragments with index 0, 2, 4 and r2 has
the rest of indexes but we need 4 unique indexes to decode. To resolve
the case, the composite ring which enables the operator oriented location
mapping [1] is under development.
1: https://review.openstack.org/#/c/271920/
- Efficient node iteration for read
Since EC fragment duplication requires a set of unique fragment indexes
to decode the original object, it needs efficient node iteration rather
than current. Current Swift is iterating the nodes ordered by sorting
method defined in proxy server config. (i.e. either shuffle, node_timing,
or read_affinity) However, the sorted result could include duplicate
indexes for the first primaries to try to connect even if \*we\* know
it obviously needs more nodes to get unique fragments. Hence, current
Swift may call more backend requests than ec_ndata times frequently even
if no node failures in the object-servers.
The possible solution could be some refactoring work on NodeIter to
provide suitable nodes even if it's fragment duplication but it's still
under development yet.
-------------- --------------
Under the Hood Under the Hood
-------------- --------------

View File

@ -84,7 +84,11 @@ aliases = yellow, orange
#ec_num_data_fragments = 10 #ec_num_data_fragments = 10
#ec_num_parity_fragments = 4 #ec_num_parity_fragments = 4
#ec_object_segment_size = 1048576 #ec_object_segment_size = 1048576
#
# Duplicated EC fragments is proof-of-concept experimental support to enable
# Global Erasure Coding policies with multiple regions acting as independent
# failure domains. Do not change the default except in development/testing.
#ec_duplication_factor = 1
# The swift-constraints section sets the basic constraints on data # The swift-constraints section sets the basic constraints on data
# saved in the swift cluster. These constraints are automatically # saved in the swift cluster. These constraints are automatically

View File

@ -20,7 +20,8 @@ import textwrap
import six import six
from six.moves.configparser import ConfigParser from six.moves.configparser import ConfigParser
from swift.common.utils import ( from swift.common.utils import (
config_true_value, SWIFT_CONF_FILE, whataremyips, list_from_csv) config_true_value, SWIFT_CONF_FILE, whataremyips, list_from_csv,
config_positive_int_value)
from swift.common.ring import Ring, RingData from swift.common.ring import Ring, RingData
from swift.common.utils import quorum_size from swift.common.utils import quorum_size
from swift.common.exceptions import RingLoadError from swift.common.exceptions import RingLoadError
@ -406,7 +407,8 @@ class ECStoragePolicy(BaseStoragePolicy):
def __init__(self, idx, name='', aliases='', is_default=False, def __init__(self, idx, name='', aliases='', is_default=False,
is_deprecated=False, object_ring=None, is_deprecated=False, object_ring=None,
ec_segment_size=DEFAULT_EC_OBJECT_SEGMENT_SIZE, ec_segment_size=DEFAULT_EC_OBJECT_SEGMENT_SIZE,
ec_type=None, ec_ndata=None, ec_nparity=None): ec_type=None, ec_ndata=None, ec_nparity=None,
ec_duplication_factor=1):
super(ECStoragePolicy, self).__init__( super(ECStoragePolicy, self).__init__(
idx=idx, name=name, aliases=aliases, is_default=is_default, idx=idx, name=name, aliases=aliases, is_default=is_default,
@ -489,6 +491,9 @@ class ECStoragePolicy(BaseStoragePolicy):
self._ec_ndata + self.pyeclib_driver.min_parity_fragments_needed() self._ec_ndata + self.pyeclib_driver.min_parity_fragments_needed()
self._fragment_size = None self._fragment_size = None
self._ec_duplication_factor = \
config_positive_int_value(ec_duplication_factor)
@property @property
def ec_type(self): def ec_type(self):
return self._ec_type return self._ec_type
@ -501,6 +506,10 @@ class ECStoragePolicy(BaseStoragePolicy):
def ec_nparity(self): def ec_nparity(self):
return self._ec_nparity return self._ec_nparity
@property
def ec_n_unique_fragments(self):
return self._ec_ndata + self._ec_nparity
@property @property
def ec_segment_size(self): def ec_segment_size(self):
return self._ec_segment_size return self._ec_segment_size
@ -538,11 +547,20 @@ class ECStoragePolicy(BaseStoragePolicy):
""" """
return "%s %d+%d" % (self._ec_type, self._ec_ndata, self._ec_nparity) return "%s %d+%d" % (self._ec_type, self._ec_ndata, self._ec_nparity)
@property
def ec_duplication_factor(self):
return self._ec_duplication_factor
def __repr__(self): def __repr__(self):
extra_info = ''
if self.ec_duplication_factor != 1:
extra_info = ', ec_duplication_factor=%d' % \
self.ec_duplication_factor
return ("%s, EC config(ec_type=%s, ec_segment_size=%d, " return ("%s, EC config(ec_type=%s, ec_segment_size=%d, "
"ec_ndata=%d, ec_nparity=%d)") % \ "ec_ndata=%d, ec_nparity=%d%s)") % \
(super(ECStoragePolicy, self).__repr__(), self.ec_type, (super(ECStoragePolicy, self).__repr__(), self.ec_type,
self.ec_segment_size, self.ec_ndata, self.ec_nparity) self.ec_segment_size, self.ec_ndata, self.ec_nparity,
extra_info)
@classmethod @classmethod
def _config_options_map(cls): def _config_options_map(cls):
@ -552,6 +570,7 @@ class ECStoragePolicy(BaseStoragePolicy):
'ec_object_segment_size': 'ec_segment_size', 'ec_object_segment_size': 'ec_segment_size',
'ec_num_data_fragments': 'ec_ndata', 'ec_num_data_fragments': 'ec_ndata',
'ec_num_parity_fragments': 'ec_nparity', 'ec_num_parity_fragments': 'ec_nparity',
'ec_duplication_factor': 'ec_duplication_factor',
}) })
return options return options
@ -562,13 +581,14 @@ class ECStoragePolicy(BaseStoragePolicy):
info.pop('ec_num_data_fragments') info.pop('ec_num_data_fragments')
info.pop('ec_num_parity_fragments') info.pop('ec_num_parity_fragments')
info.pop('ec_type') info.pop('ec_type')
info.pop('ec_duplication_factor')
return info return info
@property @property
def quorum(self): def quorum(self):
""" """
Number of successful backend requests needed for the proxy to consider Number of successful backend requests needed for the proxy to consider
the client request successful. the client PUT request successful.
The quorum size for EC policies defines the minimum number The quorum size for EC policies defines the minimum number
of data + parity elements required to be able to guarantee of data + parity elements required to be able to guarantee
@ -584,7 +604,7 @@ class ECStoragePolicy(BaseStoragePolicy):
for every erasure coding scheme, consult PyECLib for for every erasure coding scheme, consult PyECLib for
min_parity_fragments_needed() min_parity_fragments_needed()
""" """
return self._ec_quorum_size return self._ec_quorum_size * self.ec_duplication_factor
def load_ring(self, swift_dir): def load_ring(self, swift_dir):
""" """
@ -605,18 +625,35 @@ class ECStoragePolicy(BaseStoragePolicy):
considering the number of nodes in the primary list from the ring. considering the number of nodes in the primary list from the ring.
""" """
nodes_configured = len(ring_data._replica2part2dev_id) configured_fragment_count = len(ring_data._replica2part2dev_id)
if nodes_configured != (self.ec_ndata + self.ec_nparity): required_fragment_count = \
(self.ec_n_unique_fragments) * self.ec_duplication_factor
if configured_fragment_count != required_fragment_count:
raise RingLoadError( raise RingLoadError(
'EC ring for policy %s needs to be configured with ' 'EC ring for policy %s needs to be configured with '
'exactly %d replicas. Got %d.' % ( 'exactly %d replicas. Got %d.' % (
self.name, self.ec_ndata + self.ec_nparity, self.name, required_fragment_count,
nodes_configured)) configured_fragment_count))
self.object_ring = Ring( self.object_ring = Ring(
swift_dir, ring_name=self.ring_name, swift_dir, ring_name=self.ring_name,
validation_hook=validate_ring_data) validation_hook=validate_ring_data)
def get_backend_index(self, node_index):
"""
Backend index for PyECLib
:param node_index: integer of node index
:return: integer of actual fragment index. if param is not an integer,
return None instead
"""
try:
node_index = int(node_index)
except ValueError:
return None
return node_index % self.ec_n_unique_fragments
class StoragePolicyCollection(object): class StoragePolicyCollection(object):
""" """

View File

@ -345,6 +345,21 @@ def config_true_value(value):
(isinstance(value, six.string_types) and value.lower() in TRUE_VALUES) (isinstance(value, six.string_types) and value.lower() in TRUE_VALUES)
def config_positive_int_value(value):
"""
Returns positive int value if it can be cast by int() and it's an
integer > 0. (not including zero) Raises ValueError otherwise.
"""
try:
value = int(value)
if value < 1:
raise ValueError()
except (TypeError, ValueError):
raise ValueError(
'Config option must be an positive int number, not "%s".' % value)
return value
def config_auto_int_value(value, default): def config_auto_int_value(value, default):
""" """
Returns default if value is None or 'auto'. Returns default if value is None or 'auto'.

View File

@ -193,6 +193,8 @@ class ObjectReconstructor(Daemon):
return True return True
def _full_path(self, node, part, path, policy): def _full_path(self, node, part, path, policy):
frag_index = (policy.get_backend_index(node['index'])
if 'index' in node else 'handoff')
return '%(replication_ip)s:%(replication_port)s' \ return '%(replication_ip)s:%(replication_port)s' \
'/%(device)s/%(part)s%(path)s ' \ '/%(device)s/%(part)s%(path)s ' \
'policy#%(policy)d frag#%(frag_index)s' % { 'policy#%(policy)d frag#%(frag_index)s' % {
@ -201,7 +203,7 @@ class ObjectReconstructor(Daemon):
'device': node['device'], 'device': node['device'],
'part': part, 'path': path, 'part': part, 'path': path,
'policy': policy, 'policy': policy,
'frag_index': node.get('index', 'handoff'), 'frag_index': frag_index,
} }
def _get_response(self, node, part, path, headers, policy): def _get_response(self, node, part, path, headers, policy):
@ -217,6 +219,7 @@ class ObjectReconstructor(Daemon):
:class:`~swift.common.storage_policy.BaseStoragePolicy` :class:`~swift.common.storage_policy.BaseStoragePolicy`
:returns: response :returns: response
""" """
full_path = self._full_path(node, part, path, policy)
resp = None resp = None
try: try:
with ConnectionTimeout(self.conn_timeout): with ConnectionTimeout(self.conn_timeout):
@ -224,18 +227,18 @@ class ObjectReconstructor(Daemon):
part, 'GET', path, headers=headers) part, 'GET', path, headers=headers)
with Timeout(self.node_timeout): with Timeout(self.node_timeout):
resp = conn.getresponse() resp = conn.getresponse()
resp.full_path = full_path
if resp.status not in [HTTP_OK, HTTP_NOT_FOUND]: if resp.status not in [HTTP_OK, HTTP_NOT_FOUND]:
self.logger.warning( self.logger.warning(
_("Invalid response %(resp)s from %(full_path)s"), _("Invalid response %(resp)s from %(full_path)s"),
{'resp': resp.status, {'resp': resp.status, 'full_path': full_path})
'full_path': self._full_path(node, part, path, policy)})
resp = None resp = None
elif resp.status == HTTP_NOT_FOUND: elif resp.status == HTTP_NOT_FOUND:
resp = None resp = None
except (Exception, Timeout): except (Exception, Timeout):
self.logger.exception( self.logger.exception(
_("Trying to GET %(full_path)s"), { _("Trying to GET %(full_path)s"), {
'full_path': self._full_path(node, part, path, policy)}) 'full_path': full_path})
return resp return resp
def reconstruct_fa(self, job, node, datafile_metadata): def reconstruct_fa(self, job, node, datafile_metadata):
@ -259,7 +262,7 @@ class ObjectReconstructor(Daemon):
# the fragment index we need to reconstruct is the position index # the fragment index we need to reconstruct is the position index
# of the node we're rebuilding to within the primary part list # of the node we're rebuilding to within the primary part list
fi_to_rebuild = node['index'] fi_to_rebuild = job['policy'].get_backend_index(node['index'])
# KISS send out connection requests to all nodes, see what sticks. # KISS send out connection requests to all nodes, see what sticks.
# Use fragment preferences header to tell other nodes that we want # Use fragment preferences header to tell other nodes that we want
@ -272,40 +275,96 @@ class ObjectReconstructor(Daemon):
headers['X-Backend-Fragment-Preferences'] = json.dumps(frag_prefs) headers['X-Backend-Fragment-Preferences'] = json.dumps(frag_prefs)
pile = GreenAsyncPile(len(part_nodes)) pile = GreenAsyncPile(len(part_nodes))
path = datafile_metadata['name'] path = datafile_metadata['name']
for node in part_nodes: for _node in part_nodes:
pile.spawn(self._get_response, node, job['partition'], pile.spawn(self._get_response, _node, job['partition'],
path, headers, job['policy']) path, headers, job['policy'])
responses = []
etag = None buckets = defaultdict(dict)
etag_buckets = {}
error_resp_count = 0
for resp in pile: for resp in pile:
if not resp: if not resp:
error_resp_count += 1
continue continue
resp.headers = HeaderKeyDict(resp.getheaders()) resp.headers = HeaderKeyDict(resp.getheaders())
if str(fi_to_rebuild) == \ frag_index = resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index')
resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index'): try:
unique_index = int(frag_index)
except (TypeError, ValueError):
# The successful response should include valid X-Object-
# Sysmeta-Ec-Frag-Index but for safety, catching the case
# either missing X-Object-Sysmeta-Ec-Frag-Index or invalid
# frag index to reconstruct and dump warning log for that
self.logger.warning(
'Invalid resp from %s '
'(invalid X-Object-Sysmeta-Ec-Frag-Index)',
resp.full_path)
continue continue
if resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index') in set(
r.headers.get('X-Object-Sysmeta-Ec-Frag-Index')
for r in responses):
continue
responses.append(resp)
etag = sorted(responses, reverse=True,
key=lambda r: Timestamp(
r.headers.get('X-Backend-Timestamp')
))[0].headers.get('X-Object-Sysmeta-Ec-Etag')
responses = [r for r in responses if
r.headers.get('X-Object-Sysmeta-Ec-Etag') == etag]
if len(responses) >= job['policy'].ec_ndata: if fi_to_rebuild == unique_index:
break # TODO: With duplicated EC frags it's not unreasonable to find
else: # the very fragment we're trying to rebuild exists on another
self.logger.error( # primary node. In this case we should stream it directly from
'Unable to get enough responses (%s/%s) ' # the remote node to our target instead of rebuild. But
'to reconstruct %s with ETag %s' % ( # instead we ignore it.
len(responses), job['policy'].ec_ndata, self.logger.debug(
'Found existing frag #%s while rebuilding #%s from %s',
unique_index, fi_to_rebuild, self._full_path(
node, job['partition'], datafile_metadata['name'],
job['policy']))
continue
timestamp = resp.headers.get('X-Backend-Timestamp')
if not timestamp:
self.logger.warning(
'Invalid resp from %s (missing X-Backend-Timestamp)',
resp.full_path)
continue
timestamp = Timestamp(timestamp)
etag = resp.headers.get('X-Object-Sysmeta-Ec-Etag')
if not etag:
self.logger.warning('Invalid resp from %s (missing Etag)',
resp.full_path)
continue
if etag != etag_buckets.setdefault(timestamp, etag):
self.logger.error(
'Mixed Etag (%s, %s) for %s',
etag, etag_buckets[timestamp],
self._full_path(node, job['partition'], self._full_path(node, job['partition'],
datafile_metadata['name'], job['policy']), datafile_metadata['name'], job['policy']))
etag)) continue
if unique_index not in buckets[timestamp]:
buckets[timestamp][unique_index] = resp
if len(buckets[timestamp]) >= job['policy'].ec_ndata:
responses = buckets[timestamp].values()
self.logger.debug(
'Reconstruct frag #%s with frag indexes %s'
% (fi_to_rebuild, list(buckets[timestamp])))
break
else:
for timestamp, resp in sorted(buckets.items()):
etag = etag_buckets[timestamp]
self.logger.error(
'Unable to get enough responses (%s/%s) '
'to reconstruct %s with ETag %s' % (
len(resp), job['policy'].ec_ndata,
self._full_path(node, job['partition'],
datafile_metadata['name'],
job['policy']),
etag))
if error_resp_count:
self.logger.error(
'Unable to get enough responses (%s error responses) '
'to reconstruct %s' % (
error_resp_count,
self._full_path(node, job['partition'],
datafile_metadata['name'],
job['policy'])))
raise DiskFileError('Unable to reconstruct EC archive') raise DiskFileError('Unable to reconstruct EC archive')
rebuilt_fragment_iter = self.make_rebuilt_fragment_iter( rebuilt_fragment_iter = self.make_rebuilt_fragment_iter(
@ -685,7 +744,7 @@ class ObjectReconstructor(Daemon):
A partition may result in multiple jobs. Potentially many A partition may result in multiple jobs. Potentially many
REVERT jobs, and zero or one SYNC job. REVERT jobs, and zero or one SYNC job.
:param local_dev: the local device :param local_dev: the local device (node dict)
:param part_path: full path to partition :param part_path: full path to partition
:param partition: partition number :param partition: partition number
:param policy: the policy :param policy: the policy
@ -745,7 +804,7 @@ class ObjectReconstructor(Daemon):
for node in part_nodes: for node in part_nodes:
if node['id'] == local_dev['id']: if node['id'] == local_dev['id']:
# this partition belongs here, we'll need a sync job # this partition belongs here, we'll need a sync job
frag_index = node['index'] frag_index = policy.get_backend_index(node['index'])
try: try:
suffixes = data_fi_to_suffixes.pop(frag_index) suffixes = data_fi_to_suffixes.pop(frag_index)
except KeyError: except KeyError:
@ -754,7 +813,7 @@ class ObjectReconstructor(Daemon):
job_type=SYNC, job_type=SYNC,
frag_index=frag_index, frag_index=frag_index,
suffixes=suffixes, suffixes=suffixes,
sync_to=_get_partners(frag_index, part_nodes), sync_to=_get_partners(node['index'], part_nodes),
) )
# ssync callback to rebuild missing fragment_archives # ssync callback to rebuild missing fragment_archives
sync_job['sync_diskfile_builder'] = self.reconstruct_fa sync_job['sync_diskfile_builder'] = self.reconstruct_fa
@ -765,11 +824,21 @@ class ObjectReconstructor(Daemon):
ordered_fis = sorted((len(suffixes), fi) for fi, suffixes ordered_fis = sorted((len(suffixes), fi) for fi, suffixes
in data_fi_to_suffixes.items()) in data_fi_to_suffixes.items())
for count, fi in ordered_fis: for count, fi in ordered_fis:
# In single region EC a revert job must sync to the specific
# primary who's node_index matches the data's frag_index. With
# duplicated EC frags a revert job must sync to all primary nodes
# that should be holding this frag_index.
nodes_sync_to = []
node_index = fi
for n in range(policy.ec_duplication_factor):
nodes_sync_to.append(part_nodes[node_index])
node_index += policy.ec_n_unique_fragments
revert_job = build_job( revert_job = build_job(
job_type=REVERT, job_type=REVERT,
frag_index=fi, frag_index=fi,
suffixes=data_fi_to_suffixes[fi], suffixes=data_fi_to_suffixes[fi],
sync_to=[part_nodes[fi]], sync_to=nodes_sync_to,
) )
jobs.append(revert_job) jobs.append(revert_job)

View File

@ -1775,8 +1775,9 @@ def chunk_transformer(policy, nstreams):
frags_by_byte_order = [] frags_by_byte_order = []
for chunk_to_encode in chunks_to_encode: for chunk_to_encode in chunks_to_encode:
frags_by_byte_order.append( encoded_chunks = policy.pyeclib_driver.encode(chunk_to_encode)
policy.pyeclib_driver.encode(chunk_to_encode)) send_chunks = encoded_chunks * policy.ec_duplication_factor
frags_by_byte_order.append(send_chunks)
# Sequential calls to encode() have given us a list that # Sequential calls to encode() have given us a list that
# looks like this: # looks like this:
# #
@ -1801,7 +1802,7 @@ def chunk_transformer(policy, nstreams):
last_bytes = ''.join(buf) last_bytes = ''.join(buf)
if last_bytes: if last_bytes:
last_frags = policy.pyeclib_driver.encode(last_bytes) last_frags = policy.pyeclib_driver.encode(last_bytes)
yield last_frags yield last_frags * policy.ec_duplication_factor
else: else:
yield [''] * nstreams yield [''] * nstreams
@ -2178,6 +2179,7 @@ class ECObjectController(BaseObjectController):
range_specs = self._convert_range(req, policy) range_specs = self._convert_range(req, policy)
safe_iter = GreenthreadSafeIterator(node_iter) safe_iter = GreenthreadSafeIterator(node_iter)
# Sending the request concurrently to all nodes, and responding # Sending the request concurrently to all nodes, and responding
# with the first response isn't something useful for EC as all # with the first response isn't something useful for EC as all
# nodes contain different fragments. Also EC has implemented it's # nodes contain different fragments. Also EC has implemented it's
@ -2204,8 +2206,11 @@ class ECObjectController(BaseObjectController):
# getters in case some unforeseen scenario, or a misbehaving object # getters in case some unforeseen scenario, or a misbehaving object
# server, causes us to otherwise make endless requests e.g. if an # server, causes us to otherwise make endless requests e.g. if an
# object server were to ignore frag_prefs and always respond with # object server were to ignore frag_prefs and always respond with
# a frag that is already in a bucket. # a frag that is already in a bucket. Now we're assuming it should
max_extra_requests = 2 * policy.ec_nparity + policy.ec_ndata # be limit at most 2 * replicas.
max_extra_requests = (
(policy.object_ring.replica_count * 2) - policy.ec_ndata)
for get, parts_iter in pile: for get, parts_iter in pile:
if get.last_status is None: if get.last_status is None:
# We may have spawned getters that find the node iterator # We may have spawned getters that find the node iterator
@ -2322,7 +2327,7 @@ class ECObjectController(BaseObjectController):
logger=self.app.logger, logger=self.app.logger,
need_multiphase=True) need_multiphase=True)
def _determine_chunk_destinations(self, putters): def _determine_chunk_destinations(self, putters, policy):
""" """
Given a list of putters, return a dict where the key is the putter Given a list of putters, return a dict where the key is the putter
and the value is the node index to use. and the value is the node index to use.
@ -2331,6 +2336,10 @@ class ECObjectController(BaseObjectController):
(in the primary part list) as the primary that the handoff is standing (in the primary part list) as the primary that the handoff is standing
in for. This lets erasure-code fragment archives wind up on the in for. This lets erasure-code fragment archives wind up on the
preferred local primary nodes when possible. preferred local primary nodes when possible.
:param putters: a list of swift.proxy.controllers.obj.MIMEPutter
instance
:param policy: A policy instance
""" """
# Give each putter a "chunk index": the index of the # Give each putter a "chunk index": the index of the
# transformed chunk that we'll send to it. # transformed chunk that we'll send to it.
@ -2351,9 +2360,34 @@ class ECObjectController(BaseObjectController):
# nodes. Holes occur when a storage node is down, in which # nodes. Holes occur when a storage node is down, in which
# case the connection is not replaced, and when a storage node # case the connection is not replaced, and when a storage node
# returns 507, in which case a handoff is used to replace it. # returns 507, in which case a handoff is used to replace it.
holes = [x for x in range(len(putters))
if x not in chunk_index.values()]
# lack_list is a dict of list to keep hole indexes
# e.g. if we have 2 holes for index 0 with ec_duplication_factor=2
# lack_list is like {0: [0], 1: [0]}, and then, if 1 hole found
# for index 1, lack_list will be {0: [0, 1], 1: [0]}.
# After that, holes will be filled from bigger key
# (i.e. 1:[0] at first)
# Grouping all missing fragment indexes for each unique_index
unique_index_to_holes = collections.defaultdict(list)
available_indexes = chunk_index.values()
for node_index in range(policy.object_ring.replica_count):
if node_index not in available_indexes:
unique_index = policy.get_backend_index(node_index)
unique_index_to_holes[unique_index].append(node_index)
# Set the missing index to lack_list
lack_list = collections.defaultdict(list)
for unique_index, holes in unique_index_to_holes.items():
for lack_tier, hole_node_index in enumerate(holes):
lack_list[lack_tier].append(hole_node_index)
# Extract the lack_list to a flat list
holes = []
for lack_tier, indexes in sorted(lack_list.items(), reverse=True):
holes.extend(indexes)
# Fill chunk_index list with the hole list
for hole, p in zip(holes, handoff_conns): for hole, p in zip(holes, handoff_conns):
chunk_index[p] = hole chunk_index[p] = hole
return chunk_index return chunk_index
@ -2405,7 +2439,8 @@ class ECObjectController(BaseObjectController):
# build our chunk index dict to place handoffs in the # build our chunk index dict to place handoffs in the
# same part nodes index as the primaries they are covering # same part nodes index as the primaries they are covering
chunk_index = self._determine_chunk_destinations(putters) chunk_index = self._determine_chunk_destinations(
putters, policy)
for putter in putters: for putter in putters:
putter.spawn_sender_greenthread( putter.spawn_sender_greenthread(
@ -2456,7 +2491,7 @@ class ECObjectController(BaseObjectController):
# Update any footers set by middleware with EC footers # Update any footers set by middleware with EC footers
trail_md = trailing_metadata( trail_md = trailing_metadata(
policy, etag_hasher, policy, etag_hasher,
bytes_transferred, ci) bytes_transferred, policy.get_backend_index(ci))
trail_md.update(footers) trail_md.update(footers)
# Etag footer must always be hash of what we sent # Etag footer must always be hash of what we sent
trail_md['Etag'] = chunk_hashers[ci].hexdigest() trail_md['Etag'] = chunk_hashers[ci].hexdigest()

View File

@ -21,6 +21,7 @@ import copy
import logging import logging
import errno import errno
from six.moves import range from six.moves import range
from six import BytesIO
import sys import sys
from contextlib import contextmanager, closing from contextlib import contextmanager, closing
from collections import defaultdict, Iterable from collections import defaultdict, Iterable
@ -34,13 +35,14 @@ from tempfile import mkdtemp
from shutil import rmtree from shutil import rmtree
import signal import signal
import json import json
import random
from swift.common.utils import Timestamp, NOTICE from swift.common.utils import Timestamp, NOTICE
from test import get_config from test import get_config
from swift.common import utils from swift.common import utils
from swift.common.header_key_dict import HeaderKeyDict from swift.common.header_key_dict import HeaderKeyDict
from swift.common.ring import Ring, RingData from swift.common.ring import Ring, RingData
from swift.obj import server
from hashlib import md5 from hashlib import md5
import logging.handlers import logging.handlers
@ -48,6 +50,7 @@ from six.moves.http_client import HTTPException
from swift.common import storage_policy from swift.common import storage_policy
from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy, from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy,
VALID_EC_TYPES) VALID_EC_TYPES)
from swift.common import swob
import functools import functools
import six.moves.cPickle as pickle import six.moves.cPickle as pickle
from gzip import GzipFile from gzip import GzipFile
@ -213,6 +216,7 @@ class FakeRing(Ring):
self._base_port = base_port self._base_port = base_port
self.max_more_nodes = max_more_nodes self.max_more_nodes = max_more_nodes
self._part_shift = 32 - part_power self._part_shift = 32 - part_power
self._init_device_char()
# 9 total nodes (6 more past the initial 3) is the cap, no matter if # 9 total nodes (6 more past the initial 3) is the cap, no matter if
# this is set higher, or R^2 for R replicas # this is set higher, or R^2 for R replicas
self.set_replicas(replicas) self.set_replicas(replicas)
@ -221,9 +225,18 @@ class FakeRing(Ring):
def _reload(self): def _reload(self):
self._rtime = time.time() self._rtime = time.time()
@property
def device_char(self):
return next(self._device_char_iter)
def _init_device_char(self):
self._device_char_iter = itertools.cycle(
['sd%s' % chr(ord('a') + x) for x in range(26)])
def set_replicas(self, replicas): def set_replicas(self, replicas):
self.replicas = replicas self.replicas = replicas
self._devs = [] self._devs = []
self._init_device_char()
for x in range(self.replicas): for x in range(self.replicas):
ip = '10.0.0.%s' % x ip = '10.0.0.%s' % x
port = self._base_port + x port = self._base_port + x
@ -233,7 +246,7 @@ class FakeRing(Ring):
'replication_ip': ip, 'replication_ip': ip,
'port': port, 'port': port,
'replication_port': port, 'replication_port': port,
'device': 'sd' + (chr(ord('a') + x)), 'device': self.device_char,
'zone': x % 3, 'zone': x % 3,
'region': x % 2, 'region': x % 2,
'id': x, 'id': x,
@ -290,7 +303,7 @@ class FabricatedRing(Ring):
self.devices = devices self.devices = devices
self.nodes = nodes self.nodes = nodes
self.port = port self.port = port
self.replicas = 6 self.replicas = replicas
self.part_power = part_power self.part_power = part_power
self._part_shift = 32 - self.part_power self._part_shift = 32 - self.part_power
self._reload() self._reload()
@ -1092,6 +1105,30 @@ def requires_o_tmpfile_support(func):
return wrapper return wrapper
class StubResponse(object):
def __init__(self, status, body='', headers=None, frag_index=None):
self.status = status
self.body = body
self.readable = BytesIO(body)
self.headers = HeaderKeyDict(headers)
if frag_index is not None:
self.headers['X-Object-Sysmeta-Ec-Frag-Index'] = frag_index
fake_reason = ('Fake', 'This response is a lie.')
self.reason = swob.RESPONSE_REASONS.get(status, fake_reason)[0]
def getheader(self, header_name, default=None):
return self.headers.get(header_name, default)
def getheaders(self):
if 'Content-Length' not in self.headers:
self.headers['Content-Length'] = len(self.body)
return self.headers.items()
def read(self, amt=0):
return self.readable.read(amt)
def encode_frag_archive_bodies(policy, body): def encode_frag_archive_bodies(policy, body):
""" """
Given a stub body produce a list of complete frag_archive bodies as Given a stub body produce a list of complete frag_archive bodies as
@ -1119,3 +1156,128 @@ def encode_frag_archive_bodies(policy, body):
ec_archive_bodies = [''.join(frags) ec_archive_bodies = [''.join(frags)
for frags in zip(*fragment_payloads)] for frags in zip(*fragment_payloads)]
return ec_archive_bodies return ec_archive_bodies
def make_ec_object_stub(test_body, policy, timestamp):
segment_size = policy.ec_segment_size
test_body = test_body or (
'test' * segment_size)[:-random.randint(1, 1000)]
timestamp = timestamp or utils.Timestamp(time.time())
etag = md5(test_body).hexdigest()
ec_archive_bodies = encode_frag_archive_bodies(policy, test_body)
return {
'body': test_body,
'etag': etag,
'frags': ec_archive_bodies,
'timestamp': timestamp
}
def fake_ec_node_response(node_frags, policy):
"""
Given a list of entries for each node in ring order, where the entries
are a dict (or list of dicts) which describes the fragment (or
fragments) that are on the node; create a function suitable for use
with capture_http_requests that will accept a req object and return a
response that will suitably fake the behavior of an object server who
had the given fragments on disk at the time.
:param node_frags: a list. Each item in the list describes the
fragments that are on a node; each item is a dict or list of dicts,
each dict describing a single fragment; where the item is a list,
repeated calls to get_response will return fragments in the order
of the list; each dict has keys:
- obj: an object stub, as generated by _make_ec_object_stub,
that defines all of the fragments that compose an object
at a specific timestamp.
- frag: the index of a fragment to be selected from the object
stub
- durable (optional): True if the selected fragment is durable
:param policy: storage policy to return
"""
node_map = {} # maps node ip and port to node index
all_nodes = []
call_count = {} # maps node index to get_response call count for node
def _build_node_map(req, policy):
node_key = lambda n: (n['ip'], n['port'])
part = utils.split_path(req['path'], 5, 5, True)[1]
all_nodes.extend(policy.object_ring.get_part_nodes(part))
all_nodes.extend(policy.object_ring.get_more_nodes(part))
for i, node in enumerate(all_nodes):
node_map[node_key(node)] = i
call_count[i] = 0
# normalize node_frags to a list of fragments for each node even
# if there's only one fragment in the dataset provided.
for i, frags in enumerate(node_frags):
if isinstance(frags, dict):
node_frags[i] = [frags]
def get_response(req):
requested_policy = int(
req['headers']['X-Backend-Storage-Policy-Index'])
if int(policy) != requested_policy:
AssertionError(
"Requested polciy doesn't fit the fake response policy")
if not node_map:
_build_node_map(req, policy)
try:
node_index = node_map[(req['ip'], req['port'])]
except KeyError:
raise Exception("Couldn't find node %s:%s in %r" % (
req['ip'], req['port'], all_nodes))
try:
frags = node_frags[node_index]
except IndexError:
raise Exception('Found node %r:%r at index %s - '
'but only got %s stub response nodes' % (
req['ip'], req['port'], node_index,
len(node_frags)))
if not frags:
return StubResponse(404)
# determine response fragment (if any) for this call
resp_frag = frags[call_count[node_index]]
call_count[node_index] += 1
frag_prefs = req['headers'].get('X-Backend-Fragment-Preferences')
if not (frag_prefs or resp_frag.get('durable', True)):
return StubResponse(404)
# prepare durable timestamp and backend frags header for this node
obj_stub = resp_frag['obj']
ts2frags = defaultdict(list)
durable_timestamp = None
for frag in frags:
ts_frag = frag['obj']['timestamp']
if frag.get('durable', True):
durable_timestamp = ts_frag.internal
ts2frags[ts_frag].append(frag['frag'])
try:
body = obj_stub['frags'][resp_frag['frag']]
except IndexError as err:
raise Exception(
'Frag index %s not defined: node index %s, frags %r\n%s' %
(resp_frag['frag'], node_index, [f['frag'] for f in frags],
err))
headers = {
'X-Object-Sysmeta-Ec-Content-Length': len(obj_stub['body']),
'X-Object-Sysmeta-Ec-Etag': obj_stub['etag'],
'X-Object-Sysmeta-Ec-Frag-Index':
policy.get_backend_index(resp_frag['frag']),
'X-Backend-Timestamp': obj_stub['timestamp'].internal,
'X-Timestamp': obj_stub['timestamp'].normal,
'X-Backend-Data-Timestamp': obj_stub['timestamp'].internal,
'X-Backend-Fragments':
server._make_backend_fragments_header(ts2frags)
}
if durable_timestamp:
headers['X-Backend-Durable-Timestamp'] = durable_timestamp
return StubResponse(200, body, headers)
return get_response

View File

@ -19,6 +19,7 @@ import unittest
import os import os
import mock import mock
from functools import partial from functools import partial
from six.moves.configparser import ConfigParser from six.moves.configparser import ConfigParser
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
from test.unit import patch_policies, FakeRing, temptree, DEFAULT_TEST_EC_TYPE from test.unit import patch_policies, FakeRing, temptree, DEFAULT_TEST_EC_TYPE
@ -169,7 +170,11 @@ class TestStoragePolicies(unittest.TestCase):
StoragePolicy(2, 'cee', False), StoragePolicy(2, 'cee', False),
ECStoragePolicy(10, 'ten', ECStoragePolicy(10, 'ten',
ec_type=DEFAULT_TEST_EC_TYPE, ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=10, ec_nparity=3)] ec_ndata=10, ec_nparity=3),
ECStoragePolicy(11, 'eleven',
ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=10, ec_nparity=3,
ec_duplication_factor=2)]
policies = StoragePolicyCollection(test_policies) policies = StoragePolicyCollection(test_policies)
for policy in policies: for policy in policies:
policy_repr = repr(policy) policy_repr = repr(policy)
@ -185,6 +190,10 @@ class TestStoragePolicies(unittest.TestCase):
policy.ec_nparity in policy_repr) policy.ec_nparity in policy_repr)
self.assertTrue('ec_segment_size=%s' % self.assertTrue('ec_segment_size=%s' %
policy.ec_segment_size in policy_repr) policy.ec_segment_size in policy_repr)
if policy.ec_duplication_factor > 1:
self.assertTrue('ec_duplication_factor=%s' %
policy.ec_duplication_factor in
policy_repr)
collection_repr = repr(policies) collection_repr = repr(policies)
collection_repr_lines = collection_repr.splitlines() collection_repr_lines = collection_repr.splitlines()
self.assertTrue( self.assertTrue(
@ -443,16 +452,21 @@ class TestStoragePolicies(unittest.TestCase):
ECStoragePolicy(0, 'ec8-2', aliases='zeus, jupiter', ECStoragePolicy(0, 'ec8-2', aliases='zeus, jupiter',
ec_type=DEFAULT_TEST_EC_TYPE, ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=8, ec_nparity=2, ec_ndata=8, ec_nparity=2,
object_ring=FakeRing(replicas=8), object_ring=FakeRing(replicas=10),
is_default=True), is_default=True),
ECStoragePolicy(1, 'ec10-4', aliases='athena, minerva', ECStoragePolicy(1, 'ec10-4', aliases='athena, minerva',
ec_type=DEFAULT_TEST_EC_TYPE, ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=10, ec_nparity=4, ec_ndata=10, ec_nparity=4,
object_ring=FakeRing(replicas=10)), object_ring=FakeRing(replicas=14)),
ECStoragePolicy(2, 'ec4-2', aliases='poseidon, neptune', ECStoragePolicy(2, 'ec4-2', aliases='poseidon, neptune',
ec_type=DEFAULT_TEST_EC_TYPE, ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=4, ec_nparity=2, ec_ndata=4, ec_nparity=2,
object_ring=FakeRing(replicas=7)), object_ring=FakeRing(replicas=6)),
ECStoragePolicy(3, 'ec4-2-dup', aliases='uzuki, rin',
ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=4, ec_nparity=2,
ec_duplication_factor=2,
object_ring=FakeRing(replicas=12)),
] ]
ec_policies = StoragePolicyCollection(good_test_policies_EC) ec_policies = StoragePolicyCollection(good_test_policies_EC)
@ -460,6 +474,10 @@ class TestStoragePolicies(unittest.TestCase):
self.assertEqual(ec_policies.get_by_name(name), ec_policies[0]) self.assertEqual(ec_policies.get_by_name(name), ec_policies[0])
for name in ('ec10-4', 'athena', 'minerva'): for name in ('ec10-4', 'athena', 'minerva'):
self.assertEqual(ec_policies.get_by_name(name), ec_policies[1]) self.assertEqual(ec_policies.get_by_name(name), ec_policies[1])
for name in ('ec4-2', 'poseidon', 'neptune'):
self.assertEqual(ec_policies.get_by_name(name), ec_policies[2])
for name in ('ec4-2-dup', 'uzuki', 'rin'):
self.assertEqual(ec_policies.get_by_name(name), ec_policies[3])
# Testing parsing of conf files/text # Testing parsing of conf files/text
good_ec_conf = self._conf(""" good_ec_conf = self._conf("""
@ -478,6 +496,14 @@ class TestStoragePolicies(unittest.TestCase):
ec_type = %(ec_type)s ec_type = %(ec_type)s
ec_num_data_fragments = 10 ec_num_data_fragments = 10
ec_num_parity_fragments = 4 ec_num_parity_fragments = 4
[storage-policy:2]
name = ec4-2-dup
aliases = uzuki, rin
policy_type = erasure_coding
ec_type = %(ec_type)s
ec_num_data_fragments = 4
ec_num_parity_fragments = 2
ec_duplication_factor = 2
""" % {'ec_type': DEFAULT_TEST_EC_TYPE}) """ % {'ec_type': DEFAULT_TEST_EC_TYPE})
ec_policies = parse_storage_policies(good_ec_conf) ec_policies = parse_storage_policies(good_ec_conf)
@ -485,6 +511,8 @@ class TestStoragePolicies(unittest.TestCase):
ec_policies[0]) ec_policies[0])
self.assertEqual(ec_policies.get_by_name('ec10-4'), self.assertEqual(ec_policies.get_by_name('ec10-4'),
ec_policies.get_by_name('poseidon')) ec_policies.get_by_name('poseidon'))
self.assertEqual(ec_policies.get_by_name('ec4-2-dup'),
ec_policies.get_by_name('uzuki'))
name_repeat_ec_conf = self._conf(""" name_repeat_ec_conf = self._conf("""
[storage-policy:0] [storage-policy:0]
@ -1243,11 +1271,16 @@ class TestStoragePolicies(unittest.TestCase):
ec_ndata=8, ec_nparity=2), ec_ndata=8, ec_nparity=2),
ECStoragePolicy(11, 'df10-6', ec_type='flat_xor_hd_4', ECStoragePolicy(11, 'df10-6', ec_type='flat_xor_hd_4',
ec_ndata=10, ec_nparity=6), ec_ndata=10, ec_nparity=6),
ECStoragePolicy(12, 'ec4-2-dup', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=4, ec_nparity=2, ec_duplication_factor=2),
] ]
for ec_policy in test_ec_policies: for ec_policy in test_ec_policies:
k = ec_policy.ec_ndata k = ec_policy.ec_ndata
expected_size = \ expected_size = (
k + ec_policy.pyeclib_driver.min_parity_fragments_needed() (k + ec_policy.pyeclib_driver.min_parity_fragments_needed())
* ec_policy.ec_duplication_factor
)
self.assertEqual(expected_size, ec_policy.quorum) self.assertEqual(expected_size, ec_policy.quorum)
def test_validate_ring(self): def test_validate_ring(self):
@ -1259,25 +1292,27 @@ class TestStoragePolicies(unittest.TestCase):
ec_ndata=10, ec_nparity=4), ec_ndata=10, ec_nparity=4),
ECStoragePolicy(2, 'ec4-2', ec_type=DEFAULT_TEST_EC_TYPE, ECStoragePolicy(2, 'ec4-2', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=4, ec_nparity=2), ec_ndata=4, ec_nparity=2),
ECStoragePolicy(3, 'ec4-2-2dup', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=4, ec_nparity=2,
ec_duplication_factor=2)
] ]
actual_load_ring_replicas = [8, 10, 7] actual_load_ring_replicas = [8, 10, 7, 11]
policies = StoragePolicyCollection(test_policies) policies = StoragePolicyCollection(test_policies)
def create_mock_ring_data(num_replica): class MockRingData(object):
class mock_ring_data_klass(object): def __init__(self, num_replica):
def __init__(self): self._replica2part2dev_id = [0] * num_replica
self._replica2part2dev_id = [0] * num_replica
return mock_ring_data_klass()
for policy, ring_replicas in zip(policies, actual_load_ring_replicas): for policy, ring_replicas in zip(policies, actual_load_ring_replicas):
with mock.patch('swift.common.ring.ring.RingData.load', with mock.patch('swift.common.ring.ring.RingData.load',
return_value=create_mock_ring_data(ring_replicas)): return_value=MockRingData(ring_replicas)):
necessary_replica_num = \
policy.ec_n_unique_fragments * policy.ec_duplication_factor
with mock.patch( with mock.patch(
'swift.common.ring.ring.validate_configuration'): 'swift.common.ring.ring.validate_configuration'):
msg = 'EC ring for policy %s needs to be configured with ' \ msg = 'EC ring for policy %s needs to be configured with ' \
'exactly %d replicas.' % \ 'exactly %d replicas.' % \
(policy.name, policy.ec_ndata + policy.ec_nparity) (policy.name, necessary_replica_num)
self.assertRaisesWithMessage(RingLoadError, msg, self.assertRaisesWithMessage(RingLoadError, msg,
policy.load_ring, 'mock') policy.load_ring, 'mock')
@ -1332,6 +1367,7 @@ class TestStoragePolicies(unittest.TestCase):
'ec_num_data_fragments': 10, 'ec_num_data_fragments': 10,
'ec_num_parity_fragments': 3, 'ec_num_parity_fragments': 3,
'ec_object_segment_size': DEFAULT_EC_OBJECT_SEGMENT_SIZE, 'ec_object_segment_size': DEFAULT_EC_OBJECT_SEGMENT_SIZE,
'ec_duplication_factor': 1,
}, },
(10, False): { (10, False): {
'name': 'ten', 'name': 'ten',
@ -1348,12 +1384,30 @@ class TestStoragePolicies(unittest.TestCase):
'ec_num_data_fragments': 10, 'ec_num_data_fragments': 10,
'ec_num_parity_fragments': 3, 'ec_num_parity_fragments': 3,
'ec_object_segment_size': DEFAULT_EC_OBJECT_SEGMENT_SIZE, 'ec_object_segment_size': DEFAULT_EC_OBJECT_SEGMENT_SIZE,
'ec_duplication_factor': 1,
}, },
(11, False): { (11, False): {
'name': 'done', 'name': 'done',
'aliases': 'done', 'aliases': 'done',
'deprecated': True, 'deprecated': True,
}, },
# enabled ec with ec_duplication
(12, True): {
'name': 'twelve',
'aliases': 'twelve',
'default': False,
'deprecated': False,
'policy_type': EC_POLICY,
'ec_type': DEFAULT_TEST_EC_TYPE,
'ec_num_data_fragments': 10,
'ec_num_parity_fragments': 3,
'ec_object_segment_size': DEFAULT_EC_OBJECT_SEGMENT_SIZE,
'ec_duplication_factor': 2,
},
(12, False): {
'name': 'twelve',
'aliases': 'twelve',
},
} }
self.maxDiff = None self.maxDiff = None
for policy in policies: for policy in policies:

View File

@ -2577,6 +2577,33 @@ cluster_dfw1 = http://dfw1.host/v1/
finally: finally:
utils.TRUE_VALUES = orig_trues utils.TRUE_VALUES = orig_trues
def test_config_positive_int_value(self):
expectations = {
# value : expected,
'1': 1,
1: 1,
'2': 2,
'1024': 1024,
'0x01': ValueError,
'asdf': ValueError,
None: ValueError,
0: ValueError,
-1: ValueError,
'1.2': ValueError, # string expresses float should be value error
}
for value, expected in expectations.items():
try:
rv = utils.config_positive_int_value(value)
except Exception as e:
if e.__class__ is not expected:
raise
else:
self.assertEqual(
'Config option must be an positive int number, '
'not "%s".' % value, e.message)
else:
self.assertEqual(expected, rv)
def test_config_auto_int_value(self): def test_config_auto_int_value(self):
expectations = { expectations = {
# (value, default) : expected, # (value, default) : expected,

View File

@ -84,7 +84,8 @@ def setup_servers(the_object_server=object_server, extra_conf=None):
mkdirs(_testdir) mkdirs(_testdir)
rmtree(_testdir) rmtree(_testdir)
for drive in ('sda1', 'sdb1', 'sdc1', 'sdd1', 'sde1', for drive in ('sda1', 'sdb1', 'sdc1', 'sdd1', 'sde1',
'sdf1', 'sdg1', 'sdh1', 'sdi1'): 'sdf1', 'sdg1', 'sdh1', 'sdi1', 'sdj1',
'sdk1', 'sdl1'):
mkdirs(os.path.join(_testdir, drive, 'tmp')) mkdirs(os.path.join(_testdir, drive, 'tmp'))
conf = {'devices': _testdir, 'swift_dir': _testdir, conf = {'devices': _testdir, 'swift_dir': _testdir,
'mount_check': 'false', 'allowed_headers': 'mount_check': 'false', 'allowed_headers':
@ -100,9 +101,13 @@ def setup_servers(the_object_server=object_server, extra_conf=None):
obj1lis = listen(('localhost', 0)) obj1lis = listen(('localhost', 0))
obj2lis = listen(('localhost', 0)) obj2lis = listen(('localhost', 0))
obj3lis = listen(('localhost', 0)) obj3lis = listen(('localhost', 0))
objsocks = [obj1lis, obj2lis, obj3lis] obj4lis = listen(('localhost', 0))
obj5lis = listen(('localhost', 0))
obj6lis = listen(('localhost', 0))
objsocks = [obj1lis, obj2lis, obj3lis, obj4lis, obj5lis, obj6lis]
context["test_sockets"] = \ context["test_sockets"] = \
(prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis, obj2lis, obj3lis) (prolis, acc1lis, acc2lis, con1lis, con2lis, obj1lis, obj2lis, obj3lis,
obj4lis, obj5lis, obj6lis)
account_ring_path = os.path.join(_testdir, 'account.ring.gz') account_ring_path = os.path.join(_testdir, 'account.ring.gz')
account_devs = [ account_devs = [
{'port': acc1lis.getsockname()[1]}, {'port': acc1lis.getsockname()[1]},
@ -120,7 +125,10 @@ def setup_servers(the_object_server=object_server, extra_conf=None):
StoragePolicy(1, 'one', False), StoragePolicy(1, 'one', False),
StoragePolicy(2, 'two', False), StoragePolicy(2, 'two', False),
ECStoragePolicy(3, 'ec', ec_type=DEFAULT_TEST_EC_TYPE, ECStoragePolicy(3, 'ec', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=2, ec_nparity=1, ec_segment_size=4096)]) ec_ndata=2, ec_nparity=1, ec_segment_size=4096),
ECStoragePolicy(4, 'ec-dup', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=2, ec_nparity=1, ec_segment_size=4096,
ec_duplication_factor=2)])
obj_rings = { obj_rings = {
0: ('sda1', 'sdb1'), 0: ('sda1', 'sdb1'),
1: ('sdc1', 'sdd1'), 1: ('sdc1', 'sdd1'),
@ -136,22 +144,41 @@ def setup_servers(the_object_server=object_server, extra_conf=None):
write_fake_ring(obj_ring_path, *obj_devs) write_fake_ring(obj_ring_path, *obj_devs)
# write_fake_ring can't handle a 3-element ring, and the EC policy needs # write_fake_ring can't handle a 3-element ring, and the EC policy needs
# at least 3 devs to work with, so we do it manually # at least 6 devs to work with (ec_k=2, ec_m=1, duplication_fuctor=2),
# so we do it manually
devs = [{'id': 0, 'zone': 0, 'device': 'sdg1', 'ip': '127.0.0.1', devs = [{'id': 0, 'zone': 0, 'device': 'sdg1', 'ip': '127.0.0.1',
'port': obj1lis.getsockname()[1]}, 'port': obj1lis.getsockname()[1]},
{'id': 1, 'zone': 0, 'device': 'sdh1', 'ip': '127.0.0.1', {'id': 1, 'zone': 0, 'device': 'sdh1', 'ip': '127.0.0.1',
'port': obj2lis.getsockname()[1]}, 'port': obj2lis.getsockname()[1]},
{'id': 2, 'zone': 0, 'device': 'sdi1', 'ip': '127.0.0.1', {'id': 2, 'zone': 0, 'device': 'sdi1', 'ip': '127.0.0.1',
'port': obj3lis.getsockname()[1]}] 'port': obj3lis.getsockname()[1]},
{'id': 3, 'zone': 0, 'device': 'sdj1', 'ip': '127.0.0.1',
'port': obj4lis.getsockname()[1]},
{'id': 4, 'zone': 0, 'device': 'sdk1', 'ip': '127.0.0.1',
'port': obj5lis.getsockname()[1]},
{'id': 5, 'zone': 0, 'device': 'sdl1', 'ip': '127.0.0.1',
'port': obj6lis.getsockname()[1]}]
pol3_replica2part2dev_id = [[0, 1, 2, 0], pol3_replica2part2dev_id = [[0, 1, 2, 0],
[1, 2, 0, 1], [1, 2, 0, 1],
[2, 0, 1, 2]] [2, 0, 1, 2]]
pol4_replica2part2dev_id = [[0, 1, 2, 3],
[1, 2, 3, 4],
[2, 3, 4, 5],
[3, 4, 5, 0],
[4, 5, 0, 1],
[5, 0, 1, 2]]
obj3_ring_path = os.path.join( obj3_ring_path = os.path.join(
_testdir, storage_policy.POLICIES[3].ring_name + '.ring.gz') _testdir, storage_policy.POLICIES[3].ring_name + '.ring.gz')
part_shift = 30 part_shift = 30
with closing(GzipFile(obj3_ring_path, 'wb')) as fh: with closing(GzipFile(obj3_ring_path, 'wb')) as fh:
pickle.dump(RingData(pol3_replica2part2dev_id, devs, part_shift), fh) pickle.dump(RingData(pol3_replica2part2dev_id, devs, part_shift), fh)
obj4_ring_path = os.path.join(
_testdir, storage_policy.POLICIES[4].ring_name + '.ring.gz')
part_shift = 30
with closing(GzipFile(obj4_ring_path, 'wb')) as fh:
pickle.dump(RingData(pol4_replica2part2dev_id, devs, part_shift), fh)
prosrv = proxy_server.Application(conf, logger=debug_logger('proxy')) prosrv = proxy_server.Application(conf, logger=debug_logger('proxy'))
for policy in storage_policy.POLICIES: for policy in storage_policy.POLICIES:
# make sure all the rings are loaded # make sure all the rings are loaded
@ -172,8 +199,15 @@ def setup_servers(the_object_server=object_server, extra_conf=None):
conf, logger=debug_logger('obj2')) conf, logger=debug_logger('obj2'))
obj3srv = the_object_server.ObjectController( obj3srv = the_object_server.ObjectController(
conf, logger=debug_logger('obj3')) conf, logger=debug_logger('obj3'))
obj4srv = the_object_server.ObjectController(
conf, logger=debug_logger('obj4'))
obj5srv = the_object_server.ObjectController(
conf, logger=debug_logger('obj5'))
obj6srv = the_object_server.ObjectController(
conf, logger=debug_logger('obj6'))
context["test_servers"] = \ context["test_servers"] = \
(prosrv, acc1srv, acc2srv, con1srv, con2srv, obj1srv, obj2srv, obj3srv) (prosrv, acc1srv, acc2srv, con1srv, con2srv, obj1srv, obj2srv, obj3srv,
obj4srv, obj5srv, obj6srv)
nl = NullLogger() nl = NullLogger()
logging_prosv = proxy_logging.ProxyLoggingMiddleware(prosrv, conf, logging_prosv = proxy_logging.ProxyLoggingMiddleware(prosrv, conf,
logger=prosrv.logger) logger=prosrv.logger)
@ -185,8 +219,12 @@ def setup_servers(the_object_server=object_server, extra_conf=None):
obj1spa = spawn(wsgi.server, obj1lis, obj1srv, nl) obj1spa = spawn(wsgi.server, obj1lis, obj1srv, nl)
obj2spa = spawn(wsgi.server, obj2lis, obj2srv, nl) obj2spa = spawn(wsgi.server, obj2lis, obj2srv, nl)
obj3spa = spawn(wsgi.server, obj3lis, obj3srv, nl) obj3spa = spawn(wsgi.server, obj3lis, obj3srv, nl)
obj4spa = spawn(wsgi.server, obj4lis, obj4srv, nl)
obj5spa = spawn(wsgi.server, obj5lis, obj5srv, nl)
obj6spa = spawn(wsgi.server, obj6lis, obj6srv, nl)
context["test_coros"] = \ context["test_coros"] = \
(prospa, acc1spa, acc2spa, con1spa, con2spa, obj1spa, obj2spa, obj3spa) (prospa, acc1spa, acc2spa, con1spa, con2spa, obj1spa, obj2spa, obj3spa,
obj4spa, obj5spa, obj6spa)
# Create account # Create account
ts = normalize_timestamp(time.time()) ts = normalize_timestamp(time.time())
partition, nodes = prosrv.account_ring.get_nodes('a') partition, nodes = prosrv.account_ring.get_nodes('a')

View File

@ -25,6 +25,7 @@ import shutil
import re import re
import random import random
import struct import struct
import collections
from eventlet import Timeout, sleep from eventlet import Timeout, sleep
from contextlib import closing, contextmanager from contextlib import closing, contextmanager
@ -72,7 +73,8 @@ def make_ec_archive_bodies(policy, test_body):
# encode the buffers into fragment payloads # encode the buffers into fragment payloads
fragment_payloads = [] fragment_payloads = []
for chunk in chunks: for chunk in chunks:
fragments = policy.pyeclib_driver.encode(chunk) fragments = \
policy.pyeclib_driver.encode(chunk) * policy.ec_duplication_factor
if not fragments: if not fragments:
break break
fragment_payloads.append(fragments) fragment_payloads.append(fragments)
@ -662,13 +664,13 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
def test_get_response(self): def test_get_response(self):
part = self.part_nums[0] part = self.part_nums[0]
node = POLICIES[0].object_ring.get_part_nodes(int(part))[0] node = POLICIES[1].object_ring.get_part_nodes(int(part))[0]
for stat_code in (200, 400): for stat_code in (200, 400):
with mocked_http_conn(stat_code): with mocked_http_conn(stat_code):
resp = self.reconstructor._get_response(node, part, resp = self.reconstructor._get_response(node, part,
path='nada', path='nada',
headers={}, headers={},
policy=POLICIES[0]) policy=POLICIES[1])
if resp: if resp:
self.assertEqual(resp.status, 200) self.assertEqual(resp.status, 200)
else: else:
@ -677,12 +679,12 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
def test_reconstructor_does_not_log_on_404(self): def test_reconstructor_does_not_log_on_404(self):
part = self.part_nums[0] part = self.part_nums[0]
node = POLICIES[0].object_ring.get_part_nodes(int(part))[0] node = POLICIES[1].object_ring.get_part_nodes(int(part))[0]
with mocked_http_conn(404): with mocked_http_conn(404):
self.reconstructor._get_response(node, part, self.reconstructor._get_response(node, part,
path='some_path', path='some_path',
headers={}, headers={},
policy=POLICIES[0]) policy=POLICIES[1])
# Make sure that no warnings are emitted for a 404 # Make sure that no warnings are emitted for a 404
len_warning_lines = len(self.logger.get_lines_for_level('warning')) len_warning_lines = len(self.logger.get_lines_for_level('warning'))
@ -1158,6 +1160,7 @@ class TestObjectReconstructor(unittest.TestCase):
self.policy.object_ring.max_more_nodes = \ self.policy.object_ring.max_more_nodes = \
self.policy.object_ring.replicas self.policy.object_ring.replicas
self.ts_iter = make_timestamp_iter() self.ts_iter = make_timestamp_iter()
self.fabricated_ring = FabricatedRing()
def _configure_reconstructor(self, **kwargs): def _configure_reconstructor(self, **kwargs):
self.conf.update(kwargs) self.conf.update(kwargs)
@ -1723,7 +1726,7 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(job['device'], self.local_dev['device']) self.assertEqual(job['device'], self.local_dev['device'])
def test_build_jobs_primary(self): def test_build_jobs_primary(self):
ring = self.policy.object_ring = FabricatedRing() ring = self.policy.object_ring = self.fabricated_ring
# find a partition for which we're a primary # find a partition for which we're a primary
for partition in range(2 ** ring.part_power): for partition in range(2 ** ring.part_power):
part_nodes = ring.get_part_nodes(partition) part_nodes = ring.get_part_nodes(partition)
@ -1768,7 +1771,7 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(job['device'], self.local_dev['device']) self.assertEqual(job['device'], self.local_dev['device'])
def test_build_jobs_handoff(self): def test_build_jobs_handoff(self):
ring = self.policy.object_ring = FabricatedRing() ring = self.policy.object_ring = self.fabricated_ring
# find a partition for which we're a handoff # find a partition for which we're a handoff
for partition in range(2 ** ring.part_power): for partition in range(2 ** ring.part_power):
part_nodes = ring.get_part_nodes(partition) part_nodes = ring.get_part_nodes(partition)
@ -1787,7 +1790,8 @@ class TestObjectReconstructor(unittest.TestCase):
} }
# since this part doesn't belong on us it doesn't matter what # since this part doesn't belong on us it doesn't matter what
# frag_index we have # frag_index we have
frag_index = random.randint(0, ring.replicas - 1) frag_index = self.policy.get_backend_index(
random.randint(0, ring.replicas - 1))
stub_hashes = { stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'}, '123': {frag_index: 'hash', None: 'hash'},
'abc': {None: 'hash'}, 'abc': {None: 'hash'},
@ -1800,7 +1804,17 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(job['job_type'], object_reconstructor.REVERT) self.assertEqual(job['job_type'], object_reconstructor.REVERT)
self.assertEqual(job['frag_index'], frag_index) self.assertEqual(job['frag_index'], frag_index)
self.assertEqual(sorted(job['suffixes']), sorted(stub_hashes.keys())) self.assertEqual(sorted(job['suffixes']), sorted(stub_hashes.keys()))
self.assertEqual(len(job['sync_to']), 1) self.assertEqual(
self.policy.ec_duplication_factor, len(job['sync_to']))
# the sync_to node should be different each other
node_ids = set([node['id'] for node in job['sync_to']])
self.assertEqual(len(node_ids),
self.policy.ec_duplication_factor)
# but all the nodes have same backend index to sync
node_indexes = set(
self.policy.get_backend_index(node['index'])
for node in job['sync_to'])
self.assertEqual(1, len(node_indexes))
self.assertEqual(job['sync_to'][0]['index'], frag_index) self.assertEqual(job['sync_to'][0]['index'], frag_index)
self.assertEqual(job['path'], part_path) self.assertEqual(job['path'], part_path)
self.assertEqual(job['partition'], partition) self.assertEqual(job['partition'], partition)
@ -1808,12 +1822,12 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(job['local_dev'], self.local_dev) self.assertEqual(job['local_dev'], self.local_dev)
def test_build_jobs_mixed(self): def test_build_jobs_mixed(self):
ring = self.policy.object_ring = FabricatedRing() ring = self.policy.object_ring = self.fabricated_ring
# find a partition for which we're a primary # find a partition for which we're a primary
for partition in range(2 ** ring.part_power): for partition in range(2 ** ring.part_power):
part_nodes = ring.get_part_nodes(partition) part_nodes = ring.get_part_nodes(partition)
try: try:
frag_index = [n['id'] for n in part_nodes].index( node_index = [n['id'] for n in part_nodes].index(
self.local_dev['id']) self.local_dev['id'])
except ValueError: except ValueError:
pass pass
@ -1830,8 +1844,10 @@ class TestObjectReconstructor(unittest.TestCase):
'partition': partition, 'partition': partition,
'part_path': part_path, 'part_path': part_path,
} }
other_frag_index = random.choice([f for f in range(ring.replicas) frag_index = self.policy.get_backend_index(node_index)
if f != frag_index]) other_frag_index = self.policy.get_backend_index(
random.choice(
[f for f in range(ring.replicas) if f != node_index]))
stub_hashes = { stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'}, '123': {frag_index: 'hash', None: 'hash'},
'456': {other_frag_index: 'hash', None: 'hash'}, '456': {other_frag_index: 'hash', None: 'hash'},
@ -1865,11 +1881,12 @@ class TestObjectReconstructor(unittest.TestCase):
job = revert_jobs[0] job = revert_jobs[0]
self.assertEqual(job['frag_index'], other_frag_index) self.assertEqual(job['frag_index'], other_frag_index)
self.assertEqual(job['suffixes'], ['456']) self.assertEqual(job['suffixes'], ['456'])
self.assertEqual(len(job['sync_to']), 1) self.assertEqual(len(job['sync_to']),
self.policy.ec_duplication_factor)
self.assertEqual(job['sync_to'][0]['index'], other_frag_index) self.assertEqual(job['sync_to'][0]['index'], other_frag_index)
def test_build_jobs_revert_only_tombstones(self): def test_build_jobs_revert_only_tombstones(self):
ring = self.policy.object_ring = FabricatedRing() ring = self.policy.object_ring = self.fabricated_ring
# find a partition for which we're a handoff # find a partition for which we're a handoff
for partition in range(2 ** ring.part_power): for partition in range(2 ** ring.part_power):
part_nodes = ring.get_part_nodes(partition) part_nodes = ring.get_part_nodes(partition)
@ -1952,7 +1969,8 @@ class TestObjectReconstructor(unittest.TestCase):
def test_process_job_primary_in_sync(self): def test_process_job_primary_in_sync(self):
replicas = self.policy.object_ring.replicas replicas = self.policy.object_ring.replicas
frag_index = random.randint(0, replicas - 1) frag_index = random.randint(
0, self.policy.ec_n_unique_fragments - 1)
sync_to = [n for n in self.policy.object_ring.devs sync_to = [n for n in self.policy.object_ring.devs
if n != self.local_dev][:2] if n != self.local_dev][:2]
# setup left and right hashes # setup left and right hashes
@ -2010,7 +2028,8 @@ class TestObjectReconstructor(unittest.TestCase):
def test_process_job_primary_not_in_sync(self): def test_process_job_primary_not_in_sync(self):
replicas = self.policy.object_ring.replicas replicas = self.policy.object_ring.replicas
frag_index = random.randint(0, replicas - 1) frag_index = random.randint(
0, self.policy.ec_n_unique_fragments - 1)
sync_to = [n for n in self.policy.object_ring.devs sync_to = [n for n in self.policy.object_ring.devs
if n != self.local_dev][:2] if n != self.local_dev][:2]
# setup left and right hashes # setup left and right hashes
@ -2072,7 +2091,8 @@ class TestObjectReconstructor(unittest.TestCase):
def test_process_job_sync_missing_durable(self): def test_process_job_sync_missing_durable(self):
replicas = self.policy.object_ring.replicas replicas = self.policy.object_ring.replicas
frag_index = random.randint(0, replicas - 1) frag_index = random.randint(
0, self.policy.ec_n_unique_fragments - 1)
sync_to = [n for n in self.policy.object_ring.devs sync_to = [n for n in self.policy.object_ring.devs
if n != self.local_dev][:2] if n != self.local_dev][:2]
# setup left and right hashes # setup left and right hashes
@ -2140,7 +2160,8 @@ class TestObjectReconstructor(unittest.TestCase):
def test_process_job_primary_some_in_sync(self): def test_process_job_primary_some_in_sync(self):
replicas = self.policy.object_ring.replicas replicas = self.policy.object_ring.replicas
frag_index = random.randint(0, replicas - 1) frag_index = random.randint(
0, self.policy.ec_n_unique_fragments - 1)
sync_to = [n for n in self.policy.object_ring.devs sync_to = [n for n in self.policy.object_ring.devs
if n != self.local_dev][:2] if n != self.local_dev][:2]
# setup left and right hashes # setup left and right hashes
@ -2207,9 +2228,9 @@ class TestObjectReconstructor(unittest.TestCase):
self.fail('unexpected call %r' % call) self.fail('unexpected call %r' % call)
def test_process_job_primary_down(self): def test_process_job_primary_down(self):
replicas = self.policy.object_ring.replicas
partition = 0 partition = 0
frag_index = random.randint(0, replicas - 1) frag_index = random.randint(
0, self.policy.ec_n_unique_fragments - 1)
stub_hashes = { stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'}, '123': {frag_index: 'hash', None: 'hash'},
'abc': {frag_index: 'hash', None: 'hash'}, 'abc': {frag_index: 'hash', None: 'hash'},
@ -2276,9 +2297,9 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(expected_ssync_calls, found_ssync_calls) self.assertEqual(expected_ssync_calls, found_ssync_calls)
def test_process_job_suffix_call_errors(self): def test_process_job_suffix_call_errors(self):
replicas = self.policy.object_ring.replicas
partition = 0 partition = 0
frag_index = random.randint(0, replicas - 1) frag_index = random.randint(
0, self.policy.ec_n_unique_fragments - 1)
stub_hashes = { stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'}, '123': {frag_index: 'hash', None: 'hash'},
'abc': {frag_index: 'hash', None: 'hash'}, 'abc': {frag_index: 'hash', None: 'hash'},
@ -2325,8 +2346,8 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertFalse(ssync_calls) self.assertFalse(ssync_calls)
def test_process_job_handoff(self): def test_process_job_handoff(self):
replicas = self.policy.object_ring.replicas frag_index = random.randint(
frag_index = random.randint(0, replicas - 1) 0, self.policy.ec_n_unique_fragments - 1)
sync_to = [random.choice([n for n in self.policy.object_ring.devs sync_to = [random.choice([n for n in self.policy.object_ring.devs
if n != self.local_dev])] if n != self.local_dev])]
sync_to[0]['index'] = frag_index sync_to[0]['index'] = frag_index
@ -2371,8 +2392,8 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(set(call['suffixes']), set(['123', 'abc'])) self.assertEqual(set(call['suffixes']), set(['123', 'abc']))
def test_process_job_will_not_revert_to_handoff(self): def test_process_job_will_not_revert_to_handoff(self):
replicas = self.policy.object_ring.replicas frag_index = random.randint(
frag_index = random.randint(0, replicas - 1) 0, self.policy.ec_n_unique_fragments - 1)
sync_to = [random.choice([n for n in self.policy.object_ring.devs sync_to = [random.choice([n for n in self.policy.object_ring.devs
if n != self.local_dev])] if n != self.local_dev])]
sync_to[0]['index'] = frag_index sync_to[0]['index'] = frag_index
@ -2430,8 +2451,8 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(set(call['suffixes']), set(['123', 'abc'])) self.assertEqual(set(call['suffixes']), set(['123', 'abc']))
def test_process_job_revert_is_handoff_fails(self): def test_process_job_revert_is_handoff_fails(self):
replicas = self.policy.object_ring.replicas frag_index = random.randint(
frag_index = random.randint(0, replicas - 1) 0, self.policy.ec_n_unique_fragments - 1)
sync_to = [random.choice([n for n in self.policy.object_ring.devs sync_to = [random.choice([n for n in self.policy.object_ring.devs
if n != self.local_dev])] if n != self.local_dev])]
sync_to[0]['index'] = frag_index sync_to[0]['index'] = frag_index
@ -2489,8 +2510,8 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(self.reconstructor.handoffs_remaining, 1) self.assertEqual(self.reconstructor.handoffs_remaining, 1)
def test_process_job_revert_cleanup(self): def test_process_job_revert_cleanup(self):
replicas = self.policy.object_ring.replicas frag_index = random.randint(
frag_index = random.randint(0, replicas - 1) 0, self.policy.ec_n_unique_fragments - 1)
sync_to = [random.choice([n for n in self.policy.object_ring.devs sync_to = [random.choice([n for n in self.policy.object_ring.devs
if n != self.local_dev])] if n != self.local_dev])]
sync_to[0]['index'] = frag_index sync_to[0]['index'] = frag_index
@ -2623,7 +2644,6 @@ class TestObjectReconstructor(unittest.TestCase):
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest() etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(1) broken_body = ec_archive_bodies.pop(1)
responses = list() responses = list()
@ -2669,6 +2689,9 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual( self.assertEqual(
[{'timestamp': '1234567890.12345', 'exclude': []}], [{'timestamp': '1234567890.12345', 'exclude': []}],
json.loads(called_header['X-Backend-Fragment-Preferences'])) json.loads(called_header['X-Backend-Fragment-Preferences']))
# no error and warning
self.assertFalse(self.logger.get_lines_for_level('error'))
self.assertFalse(self.logger.get_lines_for_level('warning'))
def test_reconstruct_fa_errors_works(self): def test_reconstruct_fa_errors_works(self):
job = { job = {
@ -2712,6 +2735,57 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(md5(fixed_body).hexdigest(), self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest()) md5(broken_body).hexdigest())
def test_reconstruct_fa_error_with_invalid_header(self):
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[4]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345',
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(4)
base_responses = list()
for body in ec_archive_bodies:
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
base_responses.append((200, body, headers))
responses = base_responses
# force the test to exercise the handling of this bad response by
# sticking it in near the front
error_index = random.randint(0, self.policy.ec_ndata - 1)
status, body, headers = responses[error_index]
# one esoteric failure is a literal string 'None' in place of the
# X-Object-Sysmeta-EC-Frag-Index
stub_node_job = {'some_keys': 'foo', 'but_not': 'frag_index'}
headers['X-Object-Sysmeta-Ec-Frag-Index'] = str(
stub_node_job.get('frag_index'))
# oops!
self.assertEqual('None',
headers.get('X-Object-Sysmeta-Ec-Frag-Index'))
responses[error_index] = status, body, headers
codes, body_iter, headers_iter = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter,
headers=headers_iter):
df = self.reconstructor.reconstruct_fa(
job, node, dict(metadata))
fixed_body = ''.join(df.reader())
# ... this bad request should be treated like any other failure
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
def test_reconstruct_parity_fa_with_data_node_failure(self): def test_reconstruct_parity_fa_with_data_node_failure(self):
job = { job = {
'partition': 0, 'partition': 0,
@ -2731,7 +2805,6 @@ class TestObjectReconstructor(unittest.TestCase):
test_data = ('rebuild' * self.policy.ec_segment_size)[:-454] test_data = ('rebuild' * self.policy.ec_segment_size)[:-454]
etag = md5(test_data).hexdigest() etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
# the scheme is 10+4, so this gets a parity node # the scheme is 10+4, so this gets a parity node
broken_body = ec_archive_bodies.pop(-4) broken_body = ec_archive_bodies.pop(-4)
@ -2755,7 +2828,7 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(md5(fixed_body).hexdigest(), self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest()) md5(broken_body).hexdigest())
def test_reconstruct_fa_errors_fails(self): def test_reconstruct_fa_exceptions_fails(self):
job = { job = {
'partition': 0, 'partition': 0,
'policy': self.policy, 'policy': self.policy,
@ -2770,12 +2843,53 @@ class TestObjectReconstructor(unittest.TestCase):
'X-Timestamp': '1234567890.12345' 'X-Timestamp': '1234567890.12345'
} }
possible_errors = [404, Timeout(), Exception('kaboom!')] possible_errors = [Timeout(), Exception('kaboom!')]
codes = [random.choice(possible_errors) for i in codes = [random.choice(possible_errors) for i in
range(policy.object_ring.replicas - 1)] range(policy.object_ring.replicas - 1)]
with mocked_http_conn(*codes): with mocked_http_conn(*codes):
self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa, self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa,
job, node, metadata) job, node, metadata)
error_lines = self.logger.get_lines_for_level('error')
# # of replicas failed and one more error log to report no enough
# responses to reconstruct.
self.assertEqual(policy.object_ring.replicas, len(error_lines))
self.assertIn(
'Unable to get enough responses (%s error responses)'
% (policy.object_ring.replicas - 1),
error_lines[-1],
"Unexpected error line found: %s" % error_lines[-1])
# no warning
self.assertFalse(self.logger.get_lines_for_level('warning'))
def test_reconstruct_fa_all_404s_fails(self):
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
policy = self.policy
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345'
}
codes = [404 for i in range(policy.object_ring.replicas - 1)]
with mocked_http_conn(*codes):
self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa,
job, node, metadata)
error_lines = self.logger.get_lines_for_level('error')
# only 1 log to report no enough responses
self.assertEqual(1, len(error_lines))
self.assertIn(
'Unable to get enough responses (%s error responses)'
% (policy.object_ring.replicas - 1),
error_lines[0],
"Unexpected error line found: %s" % error_lines[0])
# no warning
self.assertFalse(self.logger.get_lines_for_level('warning'))
def test_reconstruct_fa_with_mixed_old_etag(self): def test_reconstruct_fa_with_mixed_old_etag(self):
job = { job = {
@ -2795,13 +2909,14 @@ class TestObjectReconstructor(unittest.TestCase):
etag = md5(test_data).hexdigest() etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
# bad response
broken_body = ec_archive_bodies.pop(1) broken_body = ec_archive_bodies.pop(1)
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time()))) ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
# bad response bad_headers = get_header_frag_index(self, broken_body)
bad_headers = { bad_headers.update({
'X-Object-Sysmeta-Ec-Etag': 'some garbage', 'X-Object-Sysmeta-Ec-Etag': 'some garbage',
'X-Backend-Timestamp': next(ts).internal, 'X-Backend-Timestamp': next(ts).internal,
} })
# good responses # good responses
responses = list() responses = list()
@ -2828,6 +2943,10 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(md5(fixed_body).hexdigest(), self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest()) md5(broken_body).hexdigest())
# no error and warning
self.assertFalse(self.logger.get_lines_for_level('error'))
self.assertFalse(self.logger.get_lines_for_level('warning'))
def test_reconstruct_fa_with_mixed_new_etag(self): def test_reconstruct_fa_with_mixed_new_etag(self):
job = { job = {
'partition': 0, 'partition': 0,
@ -2868,17 +2987,164 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(md5(fixed_body).hexdigest(), self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest()) md5(broken_body).hexdigest())
# one newer etag can spoil the bunch # one newer etag won't spoil the bunch
new_index = random.randint(0, len(responses) - self.policy.ec_nparity) new_index = random.randint(0, self.policy.ec_ndata - 1)
new_headers = get_header_frag_index(self, (responses[new_index])[1]) new_headers = get_header_frag_index(self, (responses[new_index])[1])
new_headers.update({'X-Object-Sysmeta-Ec-Etag': 'some garbage', new_headers.update({'X-Object-Sysmeta-Ec-Etag': 'some garbage',
'X-Backend-Timestamp': next(ts).internal}) 'X-Backend-Timestamp': next(ts).internal})
new_response = (200, '', new_headers) new_response = (200, '', new_headers)
responses[new_index] = new_response responses[new_index] = new_response
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, dict(metadata))
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
# no error and warning
self.assertFalse(self.logger.get_lines_for_level('error'))
self.assertFalse(self.logger.get_lines_for_level('warning'))
def test_reconstruct_fa_with_mixed_etag_with_same_timestamp(self):
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345'
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(1)
# good responses
responses = list()
for body in ec_archive_bodies:
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
responses.append((200, body, headers))
# sanity check before negative test
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, dict(metadata))
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
# one newer timestamp but same etag won't spoil the bunch
# N.B. (FIXIME). we choose the first response as garbage, the
# reconstruction fails because all other *correct* frags will be
# assumed as garbage. To avoid the freaky failing set randint
# as [1, self.policy.ec_ndata - 1] to make the first response
# being the correct fragment to reconstruct
new_index = random.randint(1, self.policy.ec_ndata - 1)
new_headers = get_header_frag_index(self, (responses[new_index])[1])
new_headers.update({'X-Object-Sysmeta-Ec-Etag': 'some garbage'})
new_response = (200, '', new_headers)
responses[new_index] = new_response
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, dict(metadata))
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
# no error and warning
error_log_lines = self.logger.get_lines_for_level('error')
self.assertEqual(1, len(error_log_lines))
self.assertIn(
'Mixed Etag (some garbage, %s) for 10.0.0.1:1001/sdb/0/a/c/o '
'policy#%s frag#1' % (etag, int(self.policy)),
error_log_lines[0])
self.assertFalse(self.logger.get_lines_for_level('warning'))
def test_reconstruct_fa_with_mixed_not_enough_etags_fail(self):
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345'
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
ec_archive_dict = dict()
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
# create 3 different ec bodies
for i in range(3):
body = test_data[i:]
archive_bodies = make_ec_archive_bodies(self.policy, body)
# pop the index to the destination node
archive_bodies.pop(1)
ec_archive_dict[
(md5(body).hexdigest(), next(ts).internal)] = archive_bodies
responses = list()
# fill out response list by 3 different etag bodies
for etag, ts in itertools.cycle(ec_archive_dict):
body = ec_archive_dict[(etag, ts)].pop(0)
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag,
'X-Backend-Timestamp': ts})
responses.append((200, body, headers))
if len(responses) >= (self.policy.object_ring.replicas - 1):
break
# sanity, there is 3 different etag and each etag
# doesn't have > ec_k bodies
etag_count = collections.Counter(
[in_resp_headers['X-Object-Sysmeta-Ec-Etag']
for _, _, in_resp_headers in responses])
self.assertEqual(3, len(etag_count))
for etag, count in etag_count.items():
self.assertLess(count, self.policy.ec_ndata)
codes, body_iter, headers = zip(*responses) codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa, self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa,
job, node, dict(metadata)) job, node, metadata)
error_lines = self.logger.get_lines_for_level('error')
# only 1 log to report no enough responses
self.assertEqual(3, len(error_lines))
for error_line in error_lines:
for expected_etag, ts in ec_archive_dict:
if expected_etag in error_line:
break
else:
self.fail(
"no expected etag %s found: %s" %
(list(ec_archive_dict), error_line))
expected = 'Unable to get enough responses (%s/10) to ' \
'reconstruct 10.0.0.1:1001/sdb/0/a/c/o policy#0 ' \
'frag#1 with ETag' % etag_count[expected_etag]
self.assertIn(
expected, error_line,
"Unexpected error line found: Expected: %s Got: %s"
% (expected, error_line))
# no warning
self.assertFalse(self.logger.get_lines_for_level('warning'))
def test_reconstruct_fa_finds_itself_does_not_fail(self): def test_reconstruct_fa_finds_itself_does_not_fail(self):
job = { job = {
@ -2904,12 +3170,9 @@ class TestObjectReconstructor(unittest.TestCase):
ec_archive_bodies = ec_archive_bodies[:-1] ec_archive_bodies = ec_archive_bodies[:-1]
def make_header(body): def make_header(body):
metadata = self.policy.pyeclib_driver.get_metadata(body) headers = get_header_frag_index(self, body)
frag_index = struct.unpack('h', metadata[:2])[0] headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
return { return headers
'X-Object-Sysmeta-Ec-Frag-Index': frag_index,
'X-Object-Sysmeta-Ec-Etag': etag,
}
responses = [(200, body, make_header(body)) responses = [(200, body, make_header(body))
for body in ec_archive_bodies] for body in ec_archive_bodies]
@ -2922,6 +3185,31 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(md5(fixed_body).hexdigest(), self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest()) md5(broken_body).hexdigest())
# no error and warning
self.assertFalse(self.logger.get_lines_for_level('error'))
self.assertFalse(self.logger.get_lines_for_level('warning'))
# the found own frag will be reported in the debug message
debug_log_lines = self.logger.get_lines_for_level('debug')
self.assertEqual(2, len(debug_log_lines))
self.assertIn(
'Found existing frag #1 while rebuilding #1 from',
debug_log_lines[0])
# ... and then, it should be skipped in the responses
# N.B. in the future, we could avoid those check because
# definately sending the copy rather than reconstruct will
# save resources. But one more reason, we're avoiding to
# use the dest index fragment even if it goes to reconstruct
# function is that it will cause a bunch of warning log from
# liberasurecode[1].
# 1: https://github.com/openstack/liberasurecode/blob/
# master/src/erasurecode.c#L870
expected_prefix = 'Reconstruct frag #1 with frag indexes'
self.assertIn(expected_prefix, debug_log_lines[1])
got_frag_index_list = json.loads(
debug_log_lines[1][len(expected_prefix):])
self.assertNotIn(1, got_frag_index_list)
def test_reconstruct_fa_finds_duplicate_does_not_fail(self): def test_reconstruct_fa_finds_duplicate_does_not_fail(self):
job = { job = {
'partition': 0, 'partition': 0,
@ -2947,12 +3235,9 @@ class TestObjectReconstructor(unittest.TestCase):
ec_archive_bodies)[:-num_duplicates] ec_archive_bodies)[:-num_duplicates]
def make_header(body): def make_header(body):
metadata = self.policy.pyeclib_driver.get_metadata(body) headers = get_header_frag_index(self, body)
frag_index = struct.unpack('h', metadata[:2])[0] headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
return { return headers
'X-Object-Sysmeta-Ec-Frag-Index': frag_index,
'X-Object-Sysmeta-Ec-Etag': etag,
}
responses = [(200, body, make_header(body)) responses = [(200, body, make_header(body))
for body in ec_archive_bodies] for body in ec_archive_bodies]
@ -2965,6 +3250,224 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(md5(fixed_body).hexdigest(), self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest()) md5(broken_body).hexdigest())
# no error and warning
self.assertFalse(self.logger.get_lines_for_level('error'))
self.assertFalse(self.logger.get_lines_for_level('warning'))
debug_log_lines = self.logger.get_lines_for_level('debug')
self.assertEqual(1, len(debug_log_lines))
expected_prefix = 'Reconstruct frag #1 with frag indexes'
self.assertIn(expected_prefix, debug_log_lines[0])
got_frag_index_list = json.loads(
debug_log_lines[0][len(expected_prefix):])
self.assertNotIn(1, got_frag_index_list)
def test_reconstruct_fa_missing_headers(self):
# This is much negative tests asserting when the expected
# headers are missing in the responses to gather fragments
# to reconstruct
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345'
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(1)
def make_header(body):
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
return headers
def test_missing_header(missing_header, expected_warning):
self.logger._clear()
responses = [(200, body, make_header(body))
for body in ec_archive_bodies]
responses[0][2].update({missing_header: None})
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(
*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, metadata)
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
# no errorg
self.assertFalse(self.logger.get_lines_for_level('error'))
# ...but warning for the missing header
warning_log_lines = self.logger.get_lines_for_level('warning')
self.assertEqual(1, len(warning_log_lines))
self.assertIn(expected_warning, warning_log_lines)
message_base = \
"Invalid resp from 10.0.0.0:1000/sda/0/a/c/o policy#0 frag#0"
test_missing_header(
'X-Object-Sysmeta-Ec-Etag',
"%s %s" % (message_base, "(missing Etag)"))
test_missing_header(
'X-Object-Sysmeta-Ec-Frag-Index',
"%s %s" % (message_base,
"(invalid X-Object-Sysmeta-Ec-Frag-Index)"))
test_missing_header(
'X-Backend-Timestamp',
"%s %s" % (message_base, "(missing X-Backend-Timestamp)"))
def test_reconstruct_fa_invalid_frag_index_headers(self):
# This is much negative tests asserting when the expected
# ec frag index header has invalid value in the responses
# to gather fragments to reconstruct
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345'
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(1)
def make_header(body):
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
return headers
def test_invalid_ec_frag_index_header(invalid_frag_index):
self.logger._clear()
responses = [(200, body, make_header(body))
for body in ec_archive_bodies]
responses[0][2].update({
'X-Object-Sysmeta-Ec-Frag-Index': invalid_frag_index})
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(
*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, metadata)
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
# no errorg
self.assertFalse(self.logger.get_lines_for_level('error'))
# ...but warning for the invalid header
warning_log_lines = self.logger.get_lines_for_level('warning')
self.assertEqual(1, len(warning_log_lines))
expected_message = \
"Invalid resp from 10.0.0.0:1000/sda/0/a/c/o " \
"policy#0 frag#0 (invalid X-Object-Sysmeta-Ec-Frag-Index)"
self.assertIn(expected_message, warning_log_lines)
for value in ('None', 'invalid'):
test_invalid_ec_frag_index_header(value)
@patch_policies([ECStoragePolicy(0, name='ec', is_default=True,
ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=10, ec_nparity=4,
ec_segment_size=4096,
ec_duplication_factor=2)],
fake_ring_args=[{'replicas': 28}])
class TestObjectReconstructorECDuplicationFactor(TestObjectReconstructor):
def setUp(self):
super(TestObjectReconstructorECDuplicationFactor, self).setUp()
self.fabricated_ring = FabricatedRing(replicas=28, devices=56)
def _test_reconstruct_with_duplicate_frags_no_errors(self, index):
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[index]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345',
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(index)
responses = list()
for body in ec_archive_bodies:
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
responses.append((200, body, headers))
# make a hook point at
# swift.obj.reconstructor.ObjectReconstructor._get_response
called_headers = []
orig_func = object_reconstructor.ObjectReconstructor._get_response
def _get_response_hook(self, node, part, path, headers, policy):
called_headers.append(headers)
return orig_func(self, node, part, path, headers, policy)
# need to m + 1 node failures to reach 2nd set of duplicated fragments
failed_start_at = (
self.policy.ec_n_unique_fragments - self.policy.ec_nparity - 1)
# set Timeout for node #10, #11, #12, #13, #14
for i in range(self.policy.ec_nparity + 1):
responses[failed_start_at + i] = (Timeout(), '', '')
codes, body_iter, headers = zip(*responses)
get_response_path = \
'swift.obj.reconstructor.ObjectReconstructor._get_response'
with mock.patch(get_response_path, _get_response_hook):
with mocked_http_conn(
*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, metadata)
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
for called_header in called_headers:
called_header = HeaderKeyDict(called_header)
self.assertTrue('Content-Length' in called_header)
self.assertEqual(called_header['Content-Length'], '0')
self.assertTrue('User-Agent' in called_header)
user_agent = called_header['User-Agent']
self.assertTrue(user_agent.startswith('obj-reconstructor'))
def test_reconstruct_with_duplicate_frags_no_errors(self):
# any fragments can be broken
for index in range(28):
self._test_reconstruct_with_duplicate_frags_no_errors(index)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff