Make container sync copy SLO manifests

Currently the container sync daemon fails to copy
an SLO manifest, and the error will stall progress
of the sync process on that container. There are
several reasons why the sync of an SLO manifest
may fail:

1. The GET of the manifest from the source
   container returns an X-Static-Large-Object header
   that is not allowed to be included with a PUT
   to the destination container.

2. The format of the manifest object that is read
   from the source is not in the syntax required
   for a SLO manifest PUT.

3. Assuming 2 were fixed, the PUT of the manifest
   includes an ETag header which will not match the
   md5 of the manifest generated by the receiving
   proxy's SLO middleware.

4. If the manifest is being synced to a different
   account and/or cluster, then the SLO segments may
   not have been synced and so the validation of the
   PUT manifest will fail.

This patch addresses all of these obstacles by
enabling the destination container-sync middleware to
cause the SLO middleware to be bypassed by setting a
swift.slo_override flag in the request environ. This
flag is only set for request that have been validated
as originating from a container sync peer.

This is justifed by noting that a SLO manifest PUT from
a container sync peer can be assumed to have valid syntax
because it was already been validated when written to
the source container.

Furthermore, we must allow SLO manifests to be synced
without requiring the semantic of their content to be
re-validated because we have no way to enforce or check
that segments have been synced prior to the manifest, nor
to check that the semantic of the manifest is still valid
at the source.

This does mean that GETs to synced SLO manifests may fail
if segments have not been synced. This is however
consistent with the expectation for synced DLO manifests
and indeed for the source SLO manifest if segments have
been deleted since it was written.

Co-Authored-By: Oshrit Feder <oshritf@il.ibm.com>
Change-Id: I8d503419b7996721a671ed6b2795224775a7d8c6
Closes-Bug: #1605597
This commit is contained in:
Alistair Coles 2016-07-28 18:41:08 +01:00
parent c9c2571b83
commit f679ed0cc8
8 changed files with 239 additions and 43 deletions

View File

@ -14,13 +14,24 @@ synchronization key.
.. note:: .. note::
If you are using the large objects feature and syncing to another cluster If you are using the :ref:`Large Objects <large-objects>` feature and
then you will need to ensure that manifest files and segment files are syncing to another cluster then you will need to ensure that manifest files
synced. If segment files are in a different container than their manifest and segment files are synced. If segment files are in a different container
then both the manifest's container and the segments' container must be than their manifest then both the manifest's container and the segments'
synced. The target container for synced segment files must always have the container must be synced. The target container for synced segment files
same name as their source container in order for them to be resolved by must always have the same name as their source container in order for them
synced manifests. to be resolved by synced manifests.
Be aware that manifest files may be synced before segment files even if
they are in the same container and were created after the segment files.
In the case of :ref:`Static Large Objects <static-large-objects>`, a GET
request for a manifest whose segments have yet to be completely synced will
fail with none or only part of the large object content being returned.
In the case of :ref:`Dynamic Large Objects <dynamic-large-objects>`, a GET
request for a manifest whose segments have yet to be completely synced will
either fail or return unexpected (and most likely incorrect) content.
.. note:: .. note::

View File

@ -130,6 +130,11 @@ class ContainerSync(object):
raise exc raise exc
else: else:
req.environ['swift.authorize_override'] = True req.environ['swift.authorize_override'] = True
# An SLO manifest will already be in the internal manifest
# syntax and might be synced before its segments, so stop SLO
# middleware from performing the usual manifest validation.
req.environ['swift.slo_override'] = True
if req.path == '/info': if req.path == '/info':
# Ensure /info requests get the freshest results # Ensure /info requests get the freshest results
self.register_info() self.register_info()

View File

@ -1041,6 +1041,9 @@ class StaticLargeObject(object):
""" """
WSGI entry point WSGI entry point
""" """
if env.get('swift.slo_override'):
return self.app(env, start_response)
req = Request(env) req = Request(env)
try: try:
vrs, account, container, obj = req.split_path(4, 4, True) vrs, account, container, obj = req.split_path(4, 4, True)

View File

@ -361,6 +361,16 @@ class ProbeTest(unittest.TestCase):
self.ipport2server[proxy_ipport] = 'proxy' self.ipport2server[proxy_ipport] = 'proxy'
self.url, self.token, self.account = check_server( self.url, self.token, self.account = check_server(
proxy_ipport, self.ipport2server) proxy_ipport, self.ipport2server)
self.account_1 = {
'url': self.url, 'token': self.token, 'account': self.account}
url2, token2 = get_auth(
'http://%s:%d/auth/v1.0' % proxy_ipport,
'test2:tester2', 'testing2')
self.account_2 = {
'url': url2, 'token': token2, 'account': url2.split('/')[-1]}
head_account(url2, token2) # sanity check
self.replicators = Manager( self.replicators = Manager(
['account-replicator', 'container-replicator', ['account-replicator', 'container-replicator',
'object-replicator']) 'object-replicator'])

View File

@ -11,7 +11,7 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import json
import uuid import uuid
import random import random
from nose import SkipTest from nose import SkipTest
@ -51,35 +51,47 @@ class TestContainerSync(ReplProbeTest):
super(TestContainerSync, self).setUp() super(TestContainerSync, self).setUp()
self.realm, self.cluster = get_current_realm_cluster(self.url) self.realm, self.cluster = get_current_realm_cluster(self.url)
def _setup_synced_containers(self, skey='secret', dkey='secret'): def _setup_synced_containers(
self, source_overrides=None, dest_overrides=None):
# these defaults are used to create both source and dest containers
# unless overridden by source_overrides and/or dest_overrides
default_params = {'url': self.url,
'token': self.token,
'account': self.account,
'sync_key': 'secret'}
# setup dest container # setup dest container
dest_container = 'dest-container-%s' % uuid.uuid4() dest = dict(default_params)
dest['name'] = 'dest-container-%s' % uuid.uuid4()
dest.update(dest_overrides or {})
dest_headers = {} dest_headers = {}
dest_policy = None dest_policy = None
if len(ENABLED_POLICIES) > 1: if len(ENABLED_POLICIES) > 1:
dest_policy = random.choice(ENABLED_POLICIES) dest_policy = random.choice(ENABLED_POLICIES)
dest_headers['X-Storage-Policy'] = dest_policy.name dest_headers['X-Storage-Policy'] = dest_policy.name
if dkey is not None: if dest['sync_key'] is not None:
dest_headers['X-Container-Sync-Key'] = dkey dest_headers['X-Container-Sync-Key'] = dest['sync_key']
client.put_container(self.url, self.token, dest_container, client.put_container(dest['url'], dest['token'], dest['name'],
headers=dest_headers) headers=dest_headers)
# setup source container # setup source container
source_container = 'source-container-%s' % uuid.uuid4() source = dict(default_params)
source['name'] = 'source-container-%s' % uuid.uuid4()
source.update(source_overrides or {})
source_headers = {} source_headers = {}
sync_to = '//%s/%s/%s/%s' % (self.realm, self.cluster, self.account, sync_to = '//%s/%s/%s/%s' % (self.realm, self.cluster, dest['account'],
dest_container) dest['name'])
source_headers['X-Container-Sync-To'] = sync_to source_headers['X-Container-Sync-To'] = sync_to
if skey is not None: if source['sync_key'] is not None:
source_headers['X-Container-Sync-Key'] = skey source_headers['X-Container-Sync-Key'] = source['sync_key']
if dest_policy: if dest_policy:
source_policy = random.choice([p for p in ENABLED_POLICIES source_policy = random.choice([p for p in ENABLED_POLICIES
if p is not dest_policy]) if p is not dest_policy])
source_headers['X-Storage-Policy'] = source_policy.name source_headers['X-Storage-Policy'] = source_policy.name
client.put_container(self.url, self.token, source_container, client.put_container(source['url'], source['token'], source['name'],
headers=source_headers) headers=source_headers)
return source_container, dest_container return source['name'], dest['name']
def _test_sync(self, object_post_as_copy): def _test_sync(self, object_post_as_copy):
source_container, dest_container = self._setup_synced_containers() source_container, dest_container = self._setup_synced_containers()
@ -148,10 +160,97 @@ class TestContainerSync(ReplProbeTest):
def test_sync_with_fast_post(self): def test_sync_with_fast_post(self):
self._test_sync(False) self._test_sync(False)
def test_sync_slo_manifest(self):
# Verify that SLO manifests are sync'd even if their segments can not
# be found in the destination account at time of sync'ing.
# Create source and dest containers for manifest in separate accounts.
dest_account = self.account_2
source_container, dest_container = self._setup_synced_containers(
dest_overrides=dest_account
)
# Create source and dest containers for segments in separate accounts.
# These containers must have same name for the destination SLO manifest
# to be able to resolve segments. Initially the destination has no sync
# key so segments will not sync.
segs_container = 'segments-%s' % uuid.uuid4()
dest_segs_info = dict(dest_account)
dest_segs_info.update({'name': segs_container, 'sync_key': None})
self._setup_synced_containers(
source_overrides={'name': segs_container, 'sync_key': 'segs_key'},
dest_overrides=dest_segs_info)
# upload a segment to source
segment_name = 'segment-%s' % uuid.uuid4()
segment_data = 'segment body' # it's ok for first segment to be small
segment_etag = client.put_object(
self.url, self.token, segs_container, segment_name,
segment_data)
manifest = [{'etag': segment_etag,
'size_bytes': len(segment_data),
'path': '/%s/%s' % (segs_container, segment_name)}]
manifest_name = 'manifest-%s' % uuid.uuid4()
put_headers = {'X-Object-Meta-Test': 'put_value'}
client.put_object(
self.url, self.token, source_container, manifest_name,
json.dumps(manifest), headers=put_headers,
query_string='multipart-manifest=put')
resp_headers, manifest_body = client.get_object(
self.url, self.token, source_container, manifest_name,
query_string='multipart-manifest=get')
int_manifest = json.loads(manifest_body)
# cycle container-sync
Manager(['container-sync']).once()
# verify manifest was sync'd
resp_headers, dest_listing = client.get_container(
dest_account['url'], dest_account['token'], dest_container)
self.assertFalse(dest_listing[1:])
self.assertEqual(manifest_name, dest_listing[0]['name'])
# verify manifest body
resp_headers, body = client.get_object(
dest_account['url'], dest_account['token'], dest_container,
manifest_name, query_string='multipart-manifest=get')
self.assertEqual(int_manifest, json.loads(body))
self.assertIn('x-object-meta-test', resp_headers)
self.assertEqual('put_value', resp_headers['x-object-meta-test'])
# attempt to GET the SLO will fail because the segment wasn't sync'd
with self.assertRaises(ClientException) as cm:
client.get_object(dest_account['url'], dest_account['token'],
dest_container, manifest_name)
self.assertEqual(409, cm.exception.http_status)
# now set sync key on destination segments container
client.put_container(
dest_account['url'], dest_account['token'], segs_container,
headers={'X-Container-Sync-Key': 'segs_key'})
# cycle container-sync
Manager(['container-sync']).once()
# sanity check - verify manifest body
resp_headers, body = client.get_object(
dest_account['url'], dest_account['token'], dest_container,
manifest_name, query_string='multipart-manifest=get')
self.assertEqual(int_manifest, json.loads(body))
self.assertIn('x-object-meta-test', resp_headers)
self.assertEqual('put_value', resp_headers['x-object-meta-test'])
# verify GET of SLO manifest now succeeds
resp_headers, body = client.get_object(
dest_account['url'], dest_account['token'], dest_container,
manifest_name)
self.assertEqual(segment_data, body)
def test_sync_lazy_skey(self): def test_sync_lazy_skey(self):
# Create synced containers, but with no key at source # Create synced containers, but with no key at source
source_container, dest_container =\ source_container, dest_container =\
self._setup_synced_containers(None, 'secret') self._setup_synced_containers(source_overrides={'sync_key': None})
# upload to source # upload to source
object_name = 'object-%s' % uuid.uuid4() object_name = 'object-%s' % uuid.uuid4()
@ -178,7 +277,7 @@ class TestContainerSync(ReplProbeTest):
def test_sync_lazy_dkey(self): def test_sync_lazy_dkey(self):
# Create synced containers, but with no key at dest # Create synced containers, but with no key at dest
source_container, dest_container =\ source_container, dest_container =\
self._setup_synced_containers('secret', None) self._setup_synced_containers(dest_overrides={'sync_key': None})
# upload to source # upload to source
object_name = 'object-%s' % uuid.uuid4() object_name = 'object-%s' % uuid.uuid4()

View File

@ -213,9 +213,9 @@ cluster_dfw1 = http://dfw1.host/v1/
resp.body, resp.body,
'X-Container-Sync-Auth header not valid; contact cluster operator ' 'X-Container-Sync-Auth header not valid; contact cluster operator '
'for support.') 'for support.')
self.assertTrue( self.assertIn('cs:invalid-sig', req.environ.get('swift.log_info'))
'cs:invalid-sig' in req.environ.get('swift.log_info'), self.assertNotIn('swift.authorize_override', req.environ)
req.environ.get('swift.log_info')) self.assertNotIn('swift.slo_override', req.environ)
def test_valid_sig(self): def test_valid_sig(self):
ts = '1455221706.726999_0123456789abcdef' ts = '1455221706.726999_0123456789abcdef'
@ -233,6 +233,8 @@ cluster_dfw1 = http://dfw1.host/v1/
self.assertIn('cs:valid', req.environ.get('swift.log_info')) self.assertIn('cs:valid', req.environ.get('swift.log_info'))
self.assertIn('X-Timestamp', resp.headers) self.assertIn('X-Timestamp', resp.headers)
self.assertEqual(ts, resp.headers['X-Timestamp']) self.assertEqual(ts, resp.headers['X-Timestamp'])
self.assertIn('swift.authorize_override', req.environ)
self.assertIn('swift.slo_override', req.environ)
def test_valid_sig2(self): def test_valid_sig2(self):
sig = self.sync.realms_conf.get_sig( sig = self.sync.realms_conf.get_sig(
@ -245,9 +247,9 @@ cluster_dfw1 = http://dfw1.host/v1/
resp = req.get_response(self.sync) resp = req.get_response(self.sync)
self.assertEqual(resp.status, '200 OK') self.assertEqual(resp.status, '200 OK')
self.assertEqual(resp.body, 'Response to Authorized Request') self.assertEqual(resp.body, 'Response to Authorized Request')
self.assertTrue( self.assertIn('cs:valid', req.environ.get('swift.log_info'))
'cs:valid' in req.environ.get('swift.log_info'), self.assertIn('swift.authorize_override', req.environ)
req.environ.get('swift.log_info')) self.assertIn('swift.slo_override', req.environ)
def test_info(self): def test_info(self):
req = swob.Request.blank('/info') req = swob.Request.blank('/info')

View File

@ -120,6 +120,25 @@ class TestSloMiddleware(SloTestCase):
self.assertTrue( self.assertTrue(
resp.startswith('X-Static-Large-Object is a reserved header')) resp.startswith('X-Static-Large-Object is a reserved header'))
def test_slo_PUT_env_override(self):
path = '/v1/a/c/o'
body = 'manifest body not checked when override flag set'
resp_status = []
def start_response(status, headers, *args):
resp_status.append(status)
req = Request.blank(
path, headers={'x-static-large-object': "true"},
environ={'REQUEST_METHOD': 'PUT', 'swift.slo_override': True},
body=body)
self.app.register('PUT', path, swob.HTTPCreated, {})
resp_iter = self.slo(req.environ, start_response)
self.assertEqual('', ''.join(resp_iter))
self.assertEqual(self.app.calls, [('PUT', path)])
self.assertEqual(body, self.app.uploaded[path][1])
self.assertEqual(resp_status[0], '201 Created')
def _put_bogus_slo(self, manifest_text, def _put_bogus_slo(self, manifest_text,
manifest_path='/v1/a/c/the-manifest'): manifest_path='/v1/a/c/the-manifest'):
with self.assertRaises(HTTPException) as catcher: with self.assertRaises(HTTPException) as catcher:

View File

@ -930,27 +930,30 @@ class TestContainerSync(unittest.TestCase):
sync.uuid = FakeUUID sync.uuid = FakeUUID
ts_data = Timestamp(1.1) ts_data = Timestamp(1.1)
timestamp = Timestamp(1.2) timestamp = Timestamp(1.2)
put_object_calls = []
def fake_put_object(sync_to, name=None, headers=None, def fake_put_object(*args, **kwargs):
contents=None, proxy=None, logger=None, put_object_calls.append((args, kwargs))
timeout=None):
def check_put_object(extra_headers, sync_to, name=None,
headers=None, contents=None, proxy=None,
logger=None, timeout=None):
self.assertEqual(sync_to, 'http://sync/to/path') self.assertEqual(sync_to, 'http://sync/to/path')
self.assertEqual(name, 'object') self.assertEqual(name, 'object')
expected_headers = {
'x-timestamp': timestamp.internal,
'etag': 'etagvalue',
'other-header': 'other header value',
'content-type': 'text/plain'}
if realm: if realm:
self.assertEqual(headers, { expected_headers.update({
'x-container-sync-auth': 'x-container-sync-auth':
'US abcdef a5fb3cf950738e6e3b364190e246bd7dd21dad3c', 'US abcdef a5fb3cf950738e6e3b364190e246bd7dd21dad3c'})
'x-timestamp': timestamp.internal,
'etag': 'etagvalue',
'other-header': 'other header value',
'content-type': 'text/plain'})
else: else:
self.assertEqual(headers, { expected_headers.update({
'x-container-sync-key': 'key', 'x-container-sync-key': 'key'})
'x-timestamp': timestamp.internal, expected_headers.update(extra_headers)
'other-header': 'other header value', self.assertDictEqual(expected_headers, headers)
'etag': 'etagvalue',
'content-type': 'text/plain'})
self.assertEqual(contents.read(), 'contents') self.assertEqual(contents.read(), 'contents')
self.assertEqual(proxy, 'http://proxy') self.assertEqual(proxy, 'http://proxy')
self.assertEqual(timeout, 5.0) self.assertEqual(timeout, 5.0)
@ -995,6 +998,9 @@ class TestContainerSync(unittest.TestCase):
'key', FakeContainerBroker('broker'), 'key', FakeContainerBroker('broker'),
{'account': 'a', 'container': 'c', 'storage_policy_index': 0}, {'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key)) realm, realm_key))
self.assertEqual(1, len(put_object_calls))
check_put_object({'etag': 'etagvalue'},
*put_object_calls[0][0], **put_object_calls[0][1])
expected_put_count += 1 expected_put_count += 1
self.assertEqual(cs.container_puts, expected_put_count) self.assertEqual(cs.container_puts, expected_put_count)
@ -1016,6 +1022,7 @@ class TestContainerSync(unittest.TestCase):
# Success as everything says it worked, also checks 'date' and # Success as everything says it worked, also checks 'date' and
# 'last-modified' headers are removed and that 'etag' header is # 'last-modified' headers are removed and that 'etag' header is
# stripped of double quotes. # stripped of double quotes.
put_object_calls = []
self.assertTrue(cs.container_sync_row( self.assertTrue(cs.container_sync_row(
{'deleted': False, {'deleted': False,
'name': 'object', 'name': 'object',
@ -1024,12 +1031,16 @@ class TestContainerSync(unittest.TestCase):
'key', FakeContainerBroker('broker'), 'key', FakeContainerBroker('broker'),
{'account': 'a', 'container': 'c', 'storage_policy_index': 0}, {'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key)) realm, realm_key))
self.assertEqual(1, len(put_object_calls))
check_put_object({'etag': 'etagvalue'},
*put_object_calls[0][0], **put_object_calls[0][1])
expected_put_count += 1 expected_put_count += 1
self.assertEqual(cs.container_puts, expected_put_count) self.assertEqual(cs.container_puts, expected_put_count)
# Success as everything says it worked, also check that PUT # Success as everything says it worked, also check that PUT
# timestamp equals GET timestamp when it is newer than created_at # timestamp equals GET timestamp when it is newer than created_at
# value. # value.
put_object_calls = []
self.assertTrue(cs.container_sync_row( self.assertTrue(cs.container_sync_row(
{'deleted': False, {'deleted': False,
'name': 'object', 'name': 'object',
@ -1038,6 +1049,42 @@ class TestContainerSync(unittest.TestCase):
'key', FakeContainerBroker('broker'), 'key', FakeContainerBroker('broker'),
{'account': 'a', 'container': 'c', 'storage_policy_index': 0}, {'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key)) realm, realm_key))
self.assertEqual(1, len(put_object_calls))
check_put_object({'etag': 'etagvalue'},
*put_object_calls[0][0], **put_object_calls[0][1])
expected_put_count += 1
self.assertEqual(cs.container_puts, expected_put_count)
def fake_get_object(acct, con, obj, headers, acceptable_statuses):
self.assertEqual(headers['X-Newest'], True)
self.assertEqual(headers['X-Backend-Storage-Policy-Index'],
'0')
return (200,
{'date': 'date value',
'last-modified': 'last modified value',
'x-timestamp': timestamp.internal,
'other-header': 'other header value',
'etag': '"etagvalue"',
'x-static-large-object': 'true',
'content-type': 'text/plain; swift_bytes=123'},
iter('contents'))
cs.swift.get_object = fake_get_object
# Success as everything says it worked, also check that etag
# header removed in case of SLO
put_object_calls = []
self.assertTrue(cs.container_sync_row(
{'deleted': False,
'name': 'object',
'created_at': '1.1',
'size': 60}, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key))
self.assertEqual(1, len(put_object_calls))
check_put_object({'x-static-large-object': 'true'},
*put_object_calls[0][0], **put_object_calls[0][1])
expected_put_count += 1 expected_put_count += 1
self.assertEqual(cs.container_puts, expected_put_count) self.assertEqual(cs.container_puts, expected_put_count)