Merge "Container-Sync to iterate only over synced containers"
This commit is contained in:
commit
fcdb2fa2a1
@ -29,7 +29,7 @@ synchronization key.
|
||||
Configuring Container Sync
|
||||
--------------------------
|
||||
|
||||
Create a container-sync-realms.conf file specifying the allowable clusters
|
||||
Create a ``container-sync-realms.conf`` file specifying the allowable clusters
|
||||
and their information::
|
||||
|
||||
[realm1]
|
||||
@ -50,18 +50,18 @@ clusters that have agreed to allow container syncing with each other. Realm
|
||||
names will be considered case insensitive.
|
||||
|
||||
The key is the overall cluster-to-cluster key used in combination with the
|
||||
external users' key that they set on their containers' X-Container-Sync-Key
|
||||
metadata header values. These keys will be used to sign each request the
|
||||
container sync daemon makes and used to validate each incoming container sync
|
||||
request.
|
||||
external users' key that they set on their containers'
|
||||
``X-Container-Sync-Key`` metadata header values. These keys will be used to
|
||||
sign each request the container sync daemon makes and used to validate each
|
||||
incoming container sync request.
|
||||
|
||||
The key2 is optional and is an additional key incoming requests will be checked
|
||||
against. This is so you can rotate keys if you wish; you move the existing key
|
||||
to key2 and make a new key value.
|
||||
|
||||
Any values in the realm section whose names begin with cluster\_ will indicate
|
||||
the name and endpoint of a cluster and will be used by external users in
|
||||
their containers' X-Container-Sync-To metadata header values with the format
|
||||
Any values in the realm section whose names begin with ``cluster_`` will
|
||||
indicate the name and endpoint of a cluster and will be used by external users in
|
||||
their containers' ``X-Container-Sync-To`` metadata header values with the format
|
||||
"//realm_name/cluster_name/account_name/container_name". Realm and cluster
|
||||
names are considered case insensitive.
|
||||
|
||||
@ -71,7 +71,7 @@ container servers, since that is where the container sync daemon runs. Note
|
||||
that the endpoint ends with /v1/ and that the container sync daemon will then
|
||||
add the account/container/obj name after that.
|
||||
|
||||
Distribute this container-sync-realms.conf file to all your proxy servers
|
||||
Distribute this ``container-sync-realms.conf`` file to all your proxy servers
|
||||
and container servers.
|
||||
|
||||
You also need to add the container_sync middleware to your proxy pipeline. It
|
||||
@ -95,7 +95,7 @@ section, Configuring Container Sync, for the new-style.
|
||||
With the old-style, the Swift cluster operator must allow synchronization with
|
||||
a set of hosts before the user can enable container synchronization. First, the
|
||||
backend container server needs to be given this list of hosts in the
|
||||
container-server.conf file::
|
||||
``container-server.conf`` file::
|
||||
|
||||
[DEFAULT]
|
||||
# This is a comma separated list of hosts allowed in the
|
||||
@ -170,8 +170,8 @@ we'll make next::
|
||||
|
||||
The ``-t`` indicates the cluster to sync to, which is the realm name of the
|
||||
section from container-sync-realms.conf, followed by the cluster name from
|
||||
that section (without the cluster\_ prefix), followed by the account and container names we want to sync to.
|
||||
The ``-k`` specifies the secret key the two containers will share for
|
||||
that section (without the cluster\_ prefix), followed by the account and container
|
||||
names we want to sync to. The ``-k`` specifies the secret key the two containers will share for
|
||||
synchronization; this is the user key, the cluster key in
|
||||
container-sync-realms.conf will also be used behind the scenes.
|
||||
|
||||
@ -195,8 +195,18 @@ as it gets synchronized over to the second::
|
||||
list container2
|
||||
|
||||
[Nothing there yet, so we wait a bit...]
|
||||
[If you're an operator running SAIO and just testing, you may need to
|
||||
run 'swift-init container-sync once' to perform a sync scan.]
|
||||
|
||||
.. note::
|
||||
|
||||
If you're an operator running SAIO and just testing, each time you
|
||||
configure a container for synchronization and place objects in the
|
||||
source container you will need to ensure that container-sync runs
|
||||
before attempting to retrieve objects from the target container.
|
||||
That is, you need to run::
|
||||
|
||||
swift-init container-sync once
|
||||
|
||||
Now expect to see objects copied from the first container to the second::
|
||||
|
||||
$ swift -A http://cluster2/auth/v1.0 -U test2:tester2 -K testing2 \
|
||||
list container2
|
||||
@ -340,13 +350,34 @@ synchronize to the second, we could have used this curl command::
|
||||
What's going on behind the scenes, in the cluster?
|
||||
--------------------------------------------------
|
||||
|
||||
The swift-container-sync does the job of sending updates to the remote
|
||||
container.
|
||||
Container ring devices have a directory called ``containers``, where container
|
||||
databases reside. In addition to ``containers``, each container ring device
|
||||
also has a directory called ``sync-containers``. ``sync-containers`` holds
|
||||
symlinks to container databases that were configured for container sync using
|
||||
``x-container-sync-to`` and ``x-container-sync-key`` metadata keys.
|
||||
|
||||
This is done by scanning the local devices for container databases and
|
||||
checking for x-container-sync-to and x-container-sync-key metadata values.
|
||||
If they exist, newer rows since the last sync will trigger PUTs or DELETEs
|
||||
to the other container.
|
||||
The swift-container-sync process does the job of sending updates to the remote
|
||||
container. This is done by scanning ``sync-containers`` for container
|
||||
databases. For each container db found, newer rows since the last sync will
|
||||
trigger PUTs or DELETEs to the other container.
|
||||
|
||||
``sync-containers`` is maintained as follows:
|
||||
Whenever the container-server processes a PUT or a POST request that carries
|
||||
``x-container-sync-to`` and ``x-container-sync-key`` metadata keys the server
|
||||
creates a symlink to the container database in ``sync-containers``. Whenever
|
||||
the container server deletes a synced container, the appropriate symlink
|
||||
is deleted from ``sync-containers``.
|
||||
|
||||
In addition to the container-server, the container-replicator process does the
|
||||
job of identifying containers that should be synchronized. This is done by
|
||||
scanning the local devices for container databases and checking for
|
||||
x-container-sync-to and x-container-sync-key metadata values. If they exist
|
||||
then a symlink to the container database is created in a sync-containers
|
||||
sub-directory on the same device.
|
||||
|
||||
Similarly, when the container sync metadata keys are deleted, the container
|
||||
server and container-replicator would take care of deleting the symlinks
|
||||
from ``sync-containers``.
|
||||
|
||||
.. note::
|
||||
|
||||
|
@ -20,6 +20,7 @@ import time
|
||||
from collections import defaultdict
|
||||
from eventlet import Timeout
|
||||
|
||||
from swift.container.sync_store import ContainerSyncStore
|
||||
from swift.container.backend import ContainerBroker, DATADIR
|
||||
from swift.container.reconciler import (
|
||||
MISPLACED_OBJECTS_ACCOUNT, incorrect_policy_index,
|
||||
@ -189,6 +190,13 @@ class ContainerReplicator(db_replicator.Replicator):
|
||||
def _post_replicate_hook(self, broker, info, responses):
|
||||
if info['account'] == MISPLACED_OBJECTS_ACCOUNT:
|
||||
return
|
||||
|
||||
try:
|
||||
self.sync_store.update_sync_store(broker)
|
||||
except Exception:
|
||||
self.logger.exception('Failed to update sync_store %s' %
|
||||
broker.db_file)
|
||||
|
||||
point = broker.get_reconciler_sync()
|
||||
if not broker.has_multiple_policies() and info['max_row'] != point:
|
||||
broker.update_reconciler_sync(info['max_row'])
|
||||
@ -210,6 +218,13 @@ class ContainerReplicator(db_replicator.Replicator):
|
||||
# this container shouldn't be here, make sure it's cleaned up
|
||||
self.reconciler_cleanups[broker.container] = broker
|
||||
return
|
||||
try:
|
||||
# DB is going to get deleted. Be preemptive about it
|
||||
self.sync_store.remove_synced_container(broker)
|
||||
except Exception:
|
||||
self.logger.exception('Failed to remove sync_store entry %s' %
|
||||
broker.db_file)
|
||||
|
||||
return super(ContainerReplicator, self).delete_db(broker)
|
||||
|
||||
def replicate_reconcilers(self):
|
||||
@ -237,6 +252,9 @@ class ContainerReplicator(db_replicator.Replicator):
|
||||
def run_once(self, *args, **kwargs):
|
||||
self.reconciler_containers = {}
|
||||
self.reconciler_cleanups = {}
|
||||
self.sync_store = ContainerSyncStore(self.root,
|
||||
self.logger,
|
||||
self.mount_check)
|
||||
rv = super(ContainerReplicator, self).run_once(*args, **kwargs)
|
||||
if any([self.reconciler_containers, self.reconciler_cleanups]):
|
||||
self.replicate_reconcilers()
|
||||
|
@ -23,6 +23,7 @@ from xml.etree.cElementTree import Element, SubElement, tostring
|
||||
from eventlet import Timeout
|
||||
|
||||
import swift.common.db
|
||||
from swift.container.sync_store import ContainerSyncStore
|
||||
from swift.container.backend import ContainerBroker, DATADIR
|
||||
from swift.container.replicator import ContainerReplicatorRpc
|
||||
from swift.common.db import DatabaseAlreadyExists
|
||||
@ -110,6 +111,9 @@ class ContainerController(BaseStorageServer):
|
||||
self.save_headers.append('x-versions-location')
|
||||
swift.common.db.DB_PREALLOCATION = \
|
||||
config_true_value(conf.get('db_preallocation', 'f'))
|
||||
self.sync_store = ContainerSyncStore(self.root,
|
||||
self.logger,
|
||||
self.mount_check)
|
||||
|
||||
def _get_container_broker(self, drive, part, account, container, **kwargs):
|
||||
"""
|
||||
@ -242,6 +246,13 @@ class ContainerController(BaseStorageServer):
|
||||
else:
|
||||
return None
|
||||
|
||||
def _update_sync_store(self, broker, method):
|
||||
try:
|
||||
self.sync_store.update_sync_store(broker)
|
||||
except Exception:
|
||||
self.logger.exception('Failed to update sync_store %s during %s' %
|
||||
broker.db_file, method)
|
||||
|
||||
@public
|
||||
@timing_stats()
|
||||
def DELETE(self, req):
|
||||
@ -276,6 +287,7 @@ class ContainerController(BaseStorageServer):
|
||||
broker.delete_db(req_timestamp.internal)
|
||||
if not broker.is_deleted():
|
||||
return HTTPConflict(request=req)
|
||||
self._update_sync_store(broker, 'DELETE')
|
||||
resp = self.account_update(req, account, container, broker)
|
||||
if resp:
|
||||
return resp
|
||||
@ -381,6 +393,8 @@ class ContainerController(BaseStorageServer):
|
||||
broker.metadata['X-Container-Sync-To'][0]:
|
||||
broker.set_x_container_sync_points(-1, -1)
|
||||
broker.update_metadata(metadata, validate_metadata=True)
|
||||
if metadata:
|
||||
self._update_sync_store(broker, 'PUT')
|
||||
resp = self.account_update(req, account, container, broker)
|
||||
if resp:
|
||||
return resp
|
||||
@ -564,6 +578,7 @@ class ContainerController(BaseStorageServer):
|
||||
broker.metadata['X-Container-Sync-To'][0]:
|
||||
broker.set_x_container_sync_points(-1, -1)
|
||||
broker.update_metadata(metadata, validate_metadata=True)
|
||||
self._update_sync_store(broker, 'POST')
|
||||
return HTTPNoContent(request=req)
|
||||
|
||||
def __call__(self, env, start_response):
|
||||
|
@ -24,7 +24,9 @@ from struct import unpack_from
|
||||
from eventlet import sleep, Timeout
|
||||
|
||||
import swift.common.db
|
||||
from swift.container.backend import ContainerBroker, DATADIR
|
||||
from swift.common.db import DatabaseConnectionError
|
||||
from swift.container.backend import ContainerBroker
|
||||
from swift.container.sync_store import ContainerSyncStore
|
||||
from swift.common.container_sync_realms import ContainerSyncRealms
|
||||
from swift.common.internal_client import (
|
||||
delete_object, put_object, InternalClient, UnexpectedResponse)
|
||||
@ -32,7 +34,7 @@ from swift.common.exceptions import ClientException
|
||||
from swift.common.ring import Ring
|
||||
from swift.common.ring.utils import is_local_device
|
||||
from swift.common.utils import (
|
||||
audit_location_generator, clean_content_type, config_true_value,
|
||||
clean_content_type, config_true_value,
|
||||
FileLikeIter, get_logger, hash_path, quote, urlparse, validate_sync_to,
|
||||
whataremyips, Timestamp)
|
||||
from swift.common.daemon import Daemon
|
||||
@ -187,6 +189,10 @@ class ContainerSync(Daemon):
|
||||
a.strip()
|
||||
for a in conf.get('sync_proxy', '').split(',')
|
||||
if a.strip()]
|
||||
#: ContainerSyncStore instance for iterating over synced containers
|
||||
self.sync_store = ContainerSyncStore(self.devices,
|
||||
self.logger,
|
||||
self.mount_check)
|
||||
#: Number of containers with sync turned on that were successfully
|
||||
#: synced.
|
||||
self.container_syncs = 0
|
||||
@ -194,7 +200,8 @@ class ContainerSync(Daemon):
|
||||
self.container_deletes = 0
|
||||
#: Number of successful PUTs triggered.
|
||||
self.container_puts = 0
|
||||
#: Number of containers that didn't have sync turned on.
|
||||
#: Number of containers whose sync has been turned off, but
|
||||
#: are not yet cleared from the sync store.
|
||||
self.container_skips = 0
|
||||
#: Number of containers that had a failure of some type.
|
||||
self.container_failures = 0
|
||||
@ -247,10 +254,7 @@ class ContainerSync(Daemon):
|
||||
sleep(random() * self.interval)
|
||||
while True:
|
||||
begin = time()
|
||||
all_locs = audit_location_generator(self.devices, DATADIR, '.db',
|
||||
mount_check=self.mount_check,
|
||||
logger=self.logger)
|
||||
for path, device, partition in all_locs:
|
||||
for path in self.sync_store.synced_containers_generator():
|
||||
self.container_sync(path)
|
||||
if time() - self.reported >= 3600: # once an hour
|
||||
self.report()
|
||||
@ -264,10 +268,7 @@ class ContainerSync(Daemon):
|
||||
"""
|
||||
self.logger.info(_('Begin container sync "once" mode'))
|
||||
begin = time()
|
||||
all_locs = audit_location_generator(self.devices, DATADIR, '.db',
|
||||
mount_check=self.mount_check,
|
||||
logger=self.logger)
|
||||
for path, device, partition in all_locs:
|
||||
for path in self.sync_store.synced_containers_generator():
|
||||
self.container_sync(path)
|
||||
if time() - self.reported >= 3600: # once an hour
|
||||
self.report()
|
||||
@ -308,7 +309,20 @@ class ContainerSync(Daemon):
|
||||
broker = None
|
||||
try:
|
||||
broker = ContainerBroker(path)
|
||||
info = broker.get_info()
|
||||
# The path we pass to the ContainerBroker is a real path of
|
||||
# a container DB. If we get here, however, it means that this
|
||||
# path is linked from the sync_containers dir. In rare cases
|
||||
# of race or processes failures the link can be stale and
|
||||
# the get_info below will raise a DB doesn't exist exception
|
||||
# In this case we remove the stale link and raise an error
|
||||
# since in most cases the db should be there.
|
||||
try:
|
||||
info = broker.get_info()
|
||||
except DatabaseConnectionError as db_err:
|
||||
if str(db_err).endswith("DB doesn't exist"):
|
||||
self.sync_store.remove_synced_container(broker)
|
||||
raise
|
||||
|
||||
x, nodes = self.container_ring.get_nodes(info['account'],
|
||||
info['container'])
|
||||
for ordinal, node in enumerate(nodes):
|
||||
@ -388,7 +402,7 @@ class ContainerSync(Daemon):
|
||||
broker.set_x_container_sync_points(sync_point1, None)
|
||||
self.container_syncs += 1
|
||||
self.logger.increment('syncs')
|
||||
except (Exception, Timeout) as err:
|
||||
except (Exception, Timeout):
|
||||
self.container_failures += 1
|
||||
self.logger.increment('failures')
|
||||
self.logger.exception(_('ERROR Syncing %s'),
|
||||
|
177
swift/container/sync_store.py
Normal file
177
swift/container/sync_store.py
Normal file
@ -0,0 +1,177 @@
|
||||
# Copyright (c) 2010-2016 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import errno
|
||||
|
||||
from swift.common.utils import audit_location_generator, mkdirs
|
||||
from swift.container.backend import DATADIR
|
||||
|
||||
SYNC_DATADIR = 'sync_containers'
|
||||
|
||||
|
||||
class ContainerSyncStore(object):
|
||||
"""
|
||||
Filesystem based store for local containers that needs to be synced.
|
||||
|
||||
The store holds a list of containers that need to be synced by the
|
||||
container sync daemon. The store is local to the container server node,
|
||||
that is, only containers whose databases are kept locally on the node are
|
||||
listed.
|
||||
"""
|
||||
def __init__(self, devices, logger, mount_check):
|
||||
self.devices = os.path.normpath(os.path.join('/', devices)) + '/'
|
||||
self.logger = logger
|
||||
self.mount_check = mount_check
|
||||
|
||||
def _container_to_synced_container_path(self, path):
|
||||
# path is assumed to be of the form:
|
||||
# /srv/node/sdb/containers/part/.../*.db
|
||||
# or more generally:
|
||||
# devices/device/containers/part/.../*.db
|
||||
# Below we split the path to the following parts:
|
||||
# devices, device, rest
|
||||
devices = self.devices
|
||||
path = os.path.normpath(path)
|
||||
device = path[len(devices):path.rfind(DATADIR)]
|
||||
rest = path[path.rfind(DATADIR) + len(DATADIR) + 1:]
|
||||
|
||||
return os.path.join(devices, device, SYNC_DATADIR, rest)
|
||||
|
||||
def _synced_container_to_container_path(self, path):
|
||||
# synced path is assumed to be of the form:
|
||||
# /srv/node/sdb/sync_containers/part/.../*.db
|
||||
# or more generally:
|
||||
# devices/device/sync_containers/part/.../*.db
|
||||
# Below we split the path to the following parts:
|
||||
# devices, device, rest
|
||||
devices = self.devices
|
||||
path = os.path.normpath(path)
|
||||
device = path[len(devices):path.rfind(SYNC_DATADIR)]
|
||||
rest = path[path.rfind(SYNC_DATADIR) + len(SYNC_DATADIR) + 1:]
|
||||
|
||||
return os.path.join(devices, device, DATADIR, rest)
|
||||
|
||||
def add_synced_container(self, broker):
|
||||
"""
|
||||
Adds the container db represented by broker to the list of synced
|
||||
containers.
|
||||
|
||||
:param broker: An instance of ContainerBroker representing the
|
||||
container to add.
|
||||
"""
|
||||
sync_file = self._container_to_synced_container_path(broker.db_file)
|
||||
stat = None
|
||||
try:
|
||||
stat = os.stat(sync_file)
|
||||
except OSError as oserr:
|
||||
if oserr.errno != errno.ENOENT:
|
||||
raise oserr
|
||||
|
||||
if stat is not None:
|
||||
return
|
||||
|
||||
sync_path = os.path.dirname(sync_file)
|
||||
mkdirs(sync_path)
|
||||
|
||||
try:
|
||||
os.symlink(broker.db_file, sync_file)
|
||||
except OSError as oserr:
|
||||
if (oserr.errno != errno.EEXIST or
|
||||
not os.path.islink(sync_file)):
|
||||
raise oserr
|
||||
|
||||
def remove_synced_container(self, broker):
|
||||
"""
|
||||
Removes the container db represented by broker from the list of synced
|
||||
containers.
|
||||
|
||||
:param broker: An instance of ContainerBroker representing the
|
||||
container to remove.
|
||||
"""
|
||||
sync_file = broker.db_file
|
||||
sync_file = self._container_to_synced_container_path(sync_file)
|
||||
try:
|
||||
os.unlink(sync_file)
|
||||
os.removedirs(os.path.dirname(sync_file))
|
||||
except OSError as oserr:
|
||||
if oserr.errno != errno.ENOENT:
|
||||
raise oserr
|
||||
|
||||
def update_sync_store(self, broker):
|
||||
"""
|
||||
Add or remove a symlink to/from the sync-containers directory
|
||||
according to the broker's metadata.
|
||||
|
||||
Decide according to the broker x-container-sync-to and
|
||||
x-container-sync-key whether a symlink needs to be added or
|
||||
removed.
|
||||
|
||||
We mention that if both metadata items do not appear
|
||||
at all, the container has never been set for sync in reclaim_age
|
||||
in which case we do nothing. This is important as this method is
|
||||
called for ALL containers from the container replicator.
|
||||
|
||||
Once we realize that we do need to do something, we check if
|
||||
the container is marked for delete, in which case we want to
|
||||
remove the symlink
|
||||
|
||||
For adding a symlink we notice that both x-container-sync-to and
|
||||
x-container-sync-key exist and are valid, that is, are not empty.
|
||||
|
||||
At this point we know we need to do something, the container
|
||||
is not marked for delete and the condition to add a symlink
|
||||
is not met. conclusion need to remove the symlink.
|
||||
|
||||
:param broker: An instance of ContainerBroker
|
||||
"""
|
||||
# If the broker metadata does not have both x-container-sync-to
|
||||
# and x-container-sync-key it has *never* been set. Make sure
|
||||
# we do nothing in this case
|
||||
if ('X-Container-Sync-To' not in broker.metadata and
|
||||
'X-Container-Sync-Key' not in broker.metadata):
|
||||
return
|
||||
|
||||
if broker.is_deleted():
|
||||
self.remove_synced_container(broker)
|
||||
return
|
||||
|
||||
# If both x-container-sync-to and x-container-sync-key
|
||||
# exist and valid, add the symlink
|
||||
sync_to = sync_key = None
|
||||
if 'X-Container-Sync-To' in broker.metadata:
|
||||
sync_to = broker.metadata['X-Container-Sync-To'][0]
|
||||
if 'X-Container-Sync-Key' in broker.metadata:
|
||||
sync_key = broker.metadata['X-Container-Sync-Key'][0]
|
||||
if sync_to and sync_key:
|
||||
self.add_synced_container(broker)
|
||||
return
|
||||
|
||||
self.remove_synced_container(broker)
|
||||
|
||||
def synced_containers_generator(self):
|
||||
"""
|
||||
Iterates over the list of synced containers
|
||||
yielding the path of the container db
|
||||
"""
|
||||
all_locs = audit_location_generator(self.devices, SYNC_DATADIR, '.db',
|
||||
mount_check=self.mount_check,
|
||||
logger=self.logger)
|
||||
for path, device, partition in all_locs:
|
||||
# What we want to yield is the real path as its being used for
|
||||
# initiating a container broker. The broker would break if not
|
||||
# given the db real path, as it e.g. assumes the existence of
|
||||
# .pending in the same path
|
||||
yield self._synced_container_to_container_path(path)
|
@ -18,8 +18,9 @@ from nose import SkipTest
|
||||
import unittest
|
||||
|
||||
from six.moves.urllib.parse import urlparse
|
||||
from swiftclient import client
|
||||
from swiftclient import client, ClientException
|
||||
|
||||
from swift.common.http import HTTP_NOT_FOUND
|
||||
from swift.common.manager import Manager
|
||||
from test.probe.common import ReplProbeTest, ENABLED_POLICIES
|
||||
|
||||
@ -49,25 +50,27 @@ class TestContainerSync(ReplProbeTest):
|
||||
super(TestContainerSync, self).setUp()
|
||||
self.realm, self.cluster = get_current_realm_cluster(self.url)
|
||||
|
||||
def test_sync(self):
|
||||
base_headers = {'X-Container-Sync-Key': 'secret'}
|
||||
|
||||
def _setup_synced_containers(self, skey='secret', dkey='secret'):
|
||||
# setup dest container
|
||||
dest_container = 'dest-container-%s' % uuid.uuid4()
|
||||
dest_headers = base_headers.copy()
|
||||
dest_headers = {}
|
||||
dest_policy = None
|
||||
if len(ENABLED_POLICIES) > 1:
|
||||
dest_policy = random.choice(ENABLED_POLICIES)
|
||||
dest_headers['X-Storage-Policy'] = dest_policy.name
|
||||
if dkey is not None:
|
||||
dest_headers['X-Container-Sync-Key'] = dkey
|
||||
client.put_container(self.url, self.token, dest_container,
|
||||
headers=dest_headers)
|
||||
|
||||
# setup source container
|
||||
source_container = 'source-container-%s' % uuid.uuid4()
|
||||
source_headers = base_headers.copy()
|
||||
source_headers = {}
|
||||
sync_to = '//%s/%s/%s/%s' % (self.realm, self.cluster, self.account,
|
||||
dest_container)
|
||||
source_headers['X-Container-Sync-To'] = sync_to
|
||||
if skey is not None:
|
||||
source_headers['X-Container-Sync-Key'] = skey
|
||||
if dest_policy:
|
||||
source_policy = random.choice([p for p in ENABLED_POLICIES
|
||||
if p is not dest_policy])
|
||||
@ -75,6 +78,11 @@ class TestContainerSync(ReplProbeTest):
|
||||
client.put_container(self.url, self.token, source_container,
|
||||
headers=source_headers)
|
||||
|
||||
return source_container, dest_container
|
||||
|
||||
def test_sync(self):
|
||||
source_container, dest_container = self._setup_synced_containers()
|
||||
|
||||
# upload to source
|
||||
object_name = 'object-%s' % uuid.uuid4()
|
||||
client.put_object(self.url, self.token, source_container, object_name,
|
||||
@ -83,11 +91,63 @@ class TestContainerSync(ReplProbeTest):
|
||||
# cycle container-sync
|
||||
Manager(['container-sync']).once()
|
||||
|
||||
# retrieve from sync'd container
|
||||
headers, body = client.get_object(self.url, self.token,
|
||||
dest_container, object_name)
|
||||
_junk, body = client.get_object(self.url, self.token,
|
||||
dest_container, object_name)
|
||||
self.assertEqual(body, 'test-body')
|
||||
|
||||
def test_sync_lazy_skey(self):
|
||||
# Create synced containers, but with no key at source
|
||||
source_container, dest_container =\
|
||||
self._setup_synced_containers(None, 'secret')
|
||||
|
||||
# upload to source
|
||||
object_name = 'object-%s' % uuid.uuid4()
|
||||
client.put_object(self.url, self.token, source_container, object_name,
|
||||
'test-body')
|
||||
|
||||
# cycle container-sync, nothing should happen
|
||||
Manager(['container-sync']).once()
|
||||
with self.assertRaises(ClientException) as err:
|
||||
_junk, body = client.get_object(self.url, self.token,
|
||||
dest_container, object_name)
|
||||
self.assertEqual(err.exception.http_status, HTTP_NOT_FOUND)
|
||||
|
||||
# amend source key
|
||||
source_headers = {'X-Container-Sync-Key': 'secret'}
|
||||
client.put_container(self.url, self.token, source_container,
|
||||
headers=source_headers)
|
||||
# cycle container-sync, should replicate
|
||||
Manager(['container-sync']).once()
|
||||
_junk, body = client.get_object(self.url, self.token,
|
||||
dest_container, object_name)
|
||||
self.assertEqual(body, 'test-body')
|
||||
|
||||
def test_sync_lazy_dkey(self):
|
||||
# Create synced containers, but with no key at dest
|
||||
source_container, dest_container =\
|
||||
self._setup_synced_containers('secret', None)
|
||||
|
||||
# upload to source
|
||||
object_name = 'object-%s' % uuid.uuid4()
|
||||
client.put_object(self.url, self.token, source_container, object_name,
|
||||
'test-body')
|
||||
|
||||
# cycle container-sync, nothing should happen
|
||||
Manager(['container-sync']).once()
|
||||
with self.assertRaises(ClientException) as err:
|
||||
_junk, body = client.get_object(self.url, self.token,
|
||||
dest_container, object_name)
|
||||
self.assertEqual(err.exception.http_status, HTTP_NOT_FOUND)
|
||||
|
||||
# amend dest key
|
||||
dest_headers = {'X-Container-Sync-Key': 'secret'}
|
||||
client.put_container(self.url, self.token, dest_container,
|
||||
headers=dest_headers)
|
||||
# cycle container-sync, should replicate
|
||||
Manager(['container-sync']).once()
|
||||
_junk, body = client.get_object(self.url, self.token,
|
||||
dest_container, object_name)
|
||||
self.assertEqual(body, 'test-body')
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
@ -23,14 +23,14 @@ import random
|
||||
import sqlite3
|
||||
|
||||
from swift.common import db_replicator
|
||||
from swift.container import replicator, backend, server
|
||||
from swift.container import replicator, backend, server, sync_store
|
||||
from swift.container.reconciler import (
|
||||
MISPLACED_OBJECTS_ACCOUNT, get_reconciler_container_name)
|
||||
from swift.common.utils import Timestamp
|
||||
from swift.common.storage_policy import POLICIES
|
||||
|
||||
from test.unit.common import test_db_replicator
|
||||
from test.unit import patch_policies, make_timestamp_iter
|
||||
from test.unit import patch_policies, make_timestamp_iter, FakeLogger
|
||||
from contextlib import contextmanager
|
||||
|
||||
|
||||
@ -998,6 +998,135 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
daemon._post_replicate_hook(broker, info, [])
|
||||
self.assertEqual(0, len(calls))
|
||||
|
||||
def test_update_sync_store_exception(self):
|
||||
class FakeContainerSyncStore(object):
|
||||
def update_sync_store(self, broker):
|
||||
raise OSError(1, '1')
|
||||
|
||||
logger = FakeLogger()
|
||||
daemon = replicator.ContainerReplicator({}, logger)
|
||||
daemon.sync_store = FakeContainerSyncStore()
|
||||
ts_iter = make_timestamp_iter()
|
||||
broker = self._get_broker('a', 'c', node_index=0)
|
||||
timestamp = next(ts_iter)
|
||||
broker.initialize(timestamp.internal, POLICIES.default.idx)
|
||||
info = broker.get_replication_info()
|
||||
daemon._post_replicate_hook(broker, info, [])
|
||||
log_lines = logger.get_lines_for_level('error')
|
||||
self.assertEqual(1, len(log_lines))
|
||||
self.assertIn('Failed to update sync_store', log_lines[0])
|
||||
|
||||
def test_update_sync_store(self):
|
||||
klass = 'swift.container.sync_store.ContainerSyncStore'
|
||||
daemon = replicator.ContainerReplicator({})
|
||||
daemon.sync_store = sync_store.ContainerSyncStore(
|
||||
daemon.root, daemon.logger, daemon.mount_check)
|
||||
ts_iter = make_timestamp_iter()
|
||||
broker = self._get_broker('a', 'c', node_index=0)
|
||||
timestamp = next(ts_iter)
|
||||
broker.initialize(timestamp.internal, POLICIES.default.idx)
|
||||
info = broker.get_replication_info()
|
||||
with mock.patch(klass + '.remove_synced_container') as mock_remove:
|
||||
with mock.patch(klass + '.add_synced_container') as mock_add:
|
||||
daemon._post_replicate_hook(broker, info, [])
|
||||
self.assertEqual(0, mock_remove.call_count)
|
||||
self.assertEqual(0, mock_add.call_count)
|
||||
|
||||
timestamp = next(ts_iter)
|
||||
# sync-to and sync-key empty - remove from store
|
||||
broker.update_metadata(
|
||||
{'X-Container-Sync-To': ('', timestamp.internal),
|
||||
'X-Container-Sync-Key': ('', timestamp.internal)})
|
||||
with mock.patch(klass + '.remove_synced_container') as mock_remove:
|
||||
with mock.patch(klass + '.add_synced_container') as mock_add:
|
||||
daemon._post_replicate_hook(broker, info, [])
|
||||
self.assertEqual(0, mock_add.call_count)
|
||||
mock_remove.assert_called_once_with(broker)
|
||||
|
||||
timestamp = next(ts_iter)
|
||||
# sync-to is not empty sync-key is empty - remove from store
|
||||
broker.update_metadata(
|
||||
{'X-Container-Sync-To': ('a', timestamp.internal)})
|
||||
with mock.patch(klass + '.remove_synced_container') as mock_remove:
|
||||
with mock.patch(klass + '.add_synced_container') as mock_add:
|
||||
daemon._post_replicate_hook(broker, info, [])
|
||||
self.assertEqual(0, mock_add.call_count)
|
||||
mock_remove.assert_called_once_with(broker)
|
||||
|
||||
timestamp = next(ts_iter)
|
||||
# sync-to is empty sync-key is not empty - remove from store
|
||||
broker.update_metadata(
|
||||
{'X-Container-Sync-To': ('', timestamp.internal),
|
||||
'X-Container-Sync-Key': ('secret', timestamp.internal)})
|
||||
with mock.patch(klass + '.remove_synced_container') as mock_remove:
|
||||
with mock.patch(klass + '.add_synced_container') as mock_add:
|
||||
daemon._post_replicate_hook(broker, info, [])
|
||||
self.assertEqual(0, mock_add.call_count)
|
||||
mock_remove.assert_called_once_with(broker)
|
||||
|
||||
timestamp = next(ts_iter)
|
||||
# sync-to, sync-key both not empty - add to store
|
||||
broker.update_metadata(
|
||||
{'X-Container-Sync-To': ('a', timestamp.internal),
|
||||
'X-Container-Sync-Key': ('secret', timestamp.internal)})
|
||||
with mock.patch(klass + '.remove_synced_container') as mock_remove:
|
||||
with mock.patch(klass + '.add_synced_container') as mock_add:
|
||||
daemon._post_replicate_hook(broker, info, [])
|
||||
mock_add.assert_called_once_with(broker)
|
||||
self.assertEqual(0, mock_remove.call_count)
|
||||
|
||||
timestamp = next(ts_iter)
|
||||
# container is removed - need to remove from store
|
||||
broker.delete_db(timestamp.internal)
|
||||
broker.update_metadata(
|
||||
{'X-Container-Sync-To': ('a', timestamp.internal),
|
||||
'X-Container-Sync-Key': ('secret', timestamp.internal)})
|
||||
with mock.patch(klass + '.remove_synced_container') as mock_remove:
|
||||
with mock.patch(klass + '.add_synced_container') as mock_add:
|
||||
daemon._post_replicate_hook(broker, info, [])
|
||||
self.assertEqual(0, mock_add.call_count)
|
||||
mock_remove.assert_called_once_with(broker)
|
||||
|
||||
def test_sync_triggers_sync_store_update(self):
|
||||
klass = 'swift.container.sync_store.ContainerSyncStore'
|
||||
ts_iter = make_timestamp_iter()
|
||||
# Create two containers as follows:
|
||||
# broker_1 which is not set for sync
|
||||
# broker_2 which is set for sync and then unset
|
||||
# test that while replicating both we see no activity
|
||||
# for broker_1, and the anticipated activity for broker_2
|
||||
broker_1 = self._get_broker('a', 'c', node_index=0)
|
||||
broker_1.initialize(next(ts_iter).internal, POLICIES.default.idx)
|
||||
broker_2 = self._get_broker('b', 'd', node_index=0)
|
||||
broker_2.initialize(next(ts_iter).internal, POLICIES.default.idx)
|
||||
broker_2.update_metadata(
|
||||
{'X-Container-Sync-To': ('a', next(ts_iter).internal),
|
||||
'X-Container-Sync-Key': ('secret', next(ts_iter).internal)})
|
||||
|
||||
# replicate once according to broker_1
|
||||
# relying on the fact that FakeRing would place both
|
||||
# in the same partition.
|
||||
part, node = self._get_broker_part_node(broker_1)
|
||||
with mock.patch(klass + '.remove_synced_container') as mock_remove:
|
||||
with mock.patch(klass + '.add_synced_container') as mock_add:
|
||||
self._run_once(node)
|
||||
self.assertEqual(1, mock_add.call_count)
|
||||
self.assertEqual(broker_2.db_file, mock_add.call_args[0][0].db_file)
|
||||
self.assertEqual(0, mock_remove.call_count)
|
||||
|
||||
broker_2.update_metadata(
|
||||
{'X-Container-Sync-To': ('', next(ts_iter).internal)})
|
||||
# replicate once this time according to broker_2
|
||||
# relying on the fact that FakeRing would place both
|
||||
# in the same partition.
|
||||
part, node = self._get_broker_part_node(broker_2)
|
||||
with mock.patch(klass + '.remove_synced_container') as mock_remove:
|
||||
with mock.patch(klass + '.add_synced_container') as mock_add:
|
||||
self._run_once(node)
|
||||
self.assertEqual(0, mock_add.call_count)
|
||||
self.assertEqual(1, mock_remove.call_count)
|
||||
self.assertEqual(broker_2.db_file, mock_remove.call_args[0][0].db_file)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -1153,6 +1153,75 @@ class TestContainerController(unittest.TestCase):
|
||||
self.assertEqual(info['x_container_sync_point1'], -1)
|
||||
self.assertEqual(info['x_container_sync_point2'], -1)
|
||||
|
||||
def test_update_sync_store_on_PUT(self):
|
||||
# Create a synced container and validate a link is created
|
||||
self._create_synced_container_and_validate_sync_store('PUT')
|
||||
# remove the sync using PUT and validate the link is deleted
|
||||
self._remove_sync_and_validate_sync_store('PUT')
|
||||
|
||||
def test_update_sync_store_on_POST(self):
|
||||
# Create a container and validate a link is not created
|
||||
self._create_container_and_validate_sync_store()
|
||||
# Update the container to be synced and validate a link is created
|
||||
self._create_synced_container_and_validate_sync_store('POST')
|
||||
# remove the sync using POST and validate the link is deleted
|
||||
self._remove_sync_and_validate_sync_store('POST')
|
||||
|
||||
def test_update_sync_store_on_DELETE(self):
|
||||
# Create a synced container and validate a link is created
|
||||
self._create_synced_container_and_validate_sync_store('PUT')
|
||||
# Remove the container and validate the link is deleted
|
||||
self._remove_sync_and_validate_sync_store('DELETE')
|
||||
|
||||
def _create_container_and_validate_sync_store(self):
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'x-timestamp': '0'})
|
||||
req.get_response(self.controller)
|
||||
db = self.controller._get_container_broker('sda1', 'p', 'a', 'c')
|
||||
sync_store = self.controller.sync_store
|
||||
db_path = db.db_file
|
||||
db_link = sync_store._container_to_synced_container_path(db_path)
|
||||
self.assertFalse(os.path.exists(db_link))
|
||||
sync_containers = [c for c in sync_store.synced_containers_generator()]
|
||||
self.assertFalse(sync_containers)
|
||||
|
||||
def _create_synced_container_and_validate_sync_store(self, method):
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c', environ={'REQUEST_METHOD': method},
|
||||
headers={'x-timestamp': '1',
|
||||
'x-container-sync-to': 'http://127.0.0.1:12345/v1/a/c',
|
||||
'x-container-sync-key': '1234'})
|
||||
req.get_response(self.controller)
|
||||
db = self.controller._get_container_broker('sda1', 'p', 'a', 'c')
|
||||
sync_store = self.controller.sync_store
|
||||
db_path = db.db_file
|
||||
db_link = sync_store._container_to_synced_container_path(db_path)
|
||||
self.assertTrue(os.path.exists(db_link))
|
||||
sync_containers = [c for c in sync_store.synced_containers_generator()]
|
||||
self.assertEqual(1, len(sync_containers))
|
||||
self.assertEqual(db_path, sync_containers[0])
|
||||
|
||||
def _remove_sync_and_validate_sync_store(self, method):
|
||||
if method == 'DELETE':
|
||||
headers = {'x-timestamp': '2'}
|
||||
else:
|
||||
headers = {'x-timestamp': '2',
|
||||
'x-container-sync-to': '',
|
||||
'x-container-sync-key': '1234'}
|
||||
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c', environ={'REQUEST_METHOD': method},
|
||||
headers=headers)
|
||||
req.get_response(self.controller)
|
||||
db = self.controller._get_container_broker('sda1', 'p', 'a', 'c')
|
||||
sync_store = self.controller.sync_store
|
||||
db_path = db.db_file
|
||||
db_link = sync_store._container_to_synced_container_path(db_path)
|
||||
self.assertFalse(os.path.exists(db_link))
|
||||
sync_containers = [c for c in sync_store.synced_containers_generator()]
|
||||
self.assertFalse(sync_containers)
|
||||
|
||||
def test_REPLICATE_insufficient_storage(self):
|
||||
conf = {'devices': self.testdir, 'mount_check': 'true'}
|
||||
self.container_controller = container_server.ContainerController(
|
||||
|
@ -19,8 +19,10 @@ import unittest
|
||||
from textwrap import dedent
|
||||
|
||||
import mock
|
||||
import errno
|
||||
from test.unit import debug_logger
|
||||
from swift.container import sync
|
||||
from swift.common.db import DatabaseConnectionError
|
||||
from swift.common import utils
|
||||
from swift.common.wsgi import ConfigString
|
||||
from swift.common.exceptions import ClientException
|
||||
@ -47,6 +49,7 @@ class FakeContainerBroker(object):
|
||||
def __init__(self, path, metadata=None, info=None, deleted=False,
|
||||
items_since=None):
|
||||
self.db_file = path
|
||||
self.db_dir = os.path.dirname(path)
|
||||
self.metadata = metadata if metadata else {}
|
||||
self.info = info if info else {}
|
||||
self.deleted = deleted
|
||||
@ -157,7 +160,6 @@ class TestContainerSync(unittest.TestCase):
|
||||
# interval sleep.
|
||||
time_calls = [0]
|
||||
sleep_calls = []
|
||||
audit_location_generator_calls = [0]
|
||||
|
||||
def fake_time():
|
||||
time_calls[0] += 1
|
||||
@ -176,48 +178,36 @@ class TestContainerSync(unittest.TestCase):
|
||||
def fake_sleep(amount):
|
||||
sleep_calls.append(amount)
|
||||
|
||||
def fake_audit_location_generator(*args, **kwargs):
|
||||
audit_location_generator_calls[0] += 1
|
||||
# Makes .container_sync() short-circuit
|
||||
yield 'container.db', 'device', 'partition'
|
||||
return
|
||||
gen_func = ('swift.container.sync_store.'
|
||||
'ContainerSyncStore.synced_containers_generator')
|
||||
with mock.patch('swift.container.sync.InternalClient'), \
|
||||
mock.patch('swift.container.sync.time', fake_time), \
|
||||
mock.patch('swift.container.sync.sleep', fake_sleep), \
|
||||
mock.patch(gen_func) as fake_generator, \
|
||||
mock.patch('swift.container.sync.ContainerBroker',
|
||||
lambda p: FakeContainerBroker(p, info={
|
||||
'account': 'a', 'container': 'c',
|
||||
'storage_policy_index': 0})):
|
||||
fake_generator.side_effect = [iter(['container.db']),
|
||||
iter(['container.db'])]
|
||||
cs = sync.ContainerSync({}, container_ring=FakeRing())
|
||||
try:
|
||||
cs.run_forever()
|
||||
except Exception as err:
|
||||
if str(err) != 'we are now done':
|
||||
raise
|
||||
|
||||
orig_time = sync.time
|
||||
orig_sleep = sync.sleep
|
||||
orig_ContainerBroker = sync.ContainerBroker
|
||||
orig_audit_location_generator = sync.audit_location_generator
|
||||
try:
|
||||
sync.ContainerBroker = lambda p: FakeContainerBroker(
|
||||
p, info={'account': 'a', 'container': 'c',
|
||||
'storage_policy_index': 0})
|
||||
sync.time = fake_time
|
||||
sync.sleep = fake_sleep
|
||||
|
||||
with mock.patch('swift.container.sync.InternalClient'):
|
||||
cs = sync.ContainerSync({}, container_ring=FakeRing())
|
||||
sync.audit_location_generator = fake_audit_location_generator
|
||||
cs.run_forever(1, 2, a=3, b=4, verbose=True)
|
||||
except Exception as err:
|
||||
if str(err) != 'we are now done':
|
||||
raise
|
||||
finally:
|
||||
sync.time = orig_time
|
||||
sync.sleep = orig_sleep
|
||||
sync.audit_location_generator = orig_audit_location_generator
|
||||
sync.ContainerBroker = orig_ContainerBroker
|
||||
|
||||
self.assertEqual(time_calls, [9])
|
||||
self.assertEqual(len(sleep_calls), 2)
|
||||
self.assertTrue(sleep_calls[0] <= cs.interval)
|
||||
self.assertTrue(sleep_calls[1] == cs.interval - 1)
|
||||
self.assertEqual(audit_location_generator_calls, [2])
|
||||
self.assertEqual(cs.reported, 3602)
|
||||
self.assertEqual(time_calls, [9])
|
||||
self.assertEqual(len(sleep_calls), 2)
|
||||
self.assertLessEqual(sleep_calls[0], cs.interval)
|
||||
self.assertEqual(cs.interval - 1, sleep_calls[1])
|
||||
self.assertEqual(2, fake_generator.call_count)
|
||||
self.assertEqual(cs.reported, 3602)
|
||||
|
||||
def test_run_once(self):
|
||||
# This runs runs_once with fakes twice, the first causing an interim
|
||||
# report, the second with no interim report.
|
||||
time_calls = [0]
|
||||
audit_location_generator_calls = [0]
|
||||
|
||||
def fake_time():
|
||||
time_calls[0] += 1
|
||||
@ -235,40 +225,31 @@ class TestContainerSync(unittest.TestCase):
|
||||
raise Exception('we are now done')
|
||||
return returns[time_calls[0] - 1]
|
||||
|
||||
def fake_audit_location_generator(*args, **kwargs):
|
||||
audit_location_generator_calls[0] += 1
|
||||
# Makes .container_sync() short-circuit
|
||||
yield 'container.db', 'device', 'partition'
|
||||
return
|
||||
gen_func = ('swift.container.sync_store.'
|
||||
'ContainerSyncStore.synced_containers_generator')
|
||||
with mock.patch('swift.container.sync.InternalClient'), \
|
||||
mock.patch('swift.container.sync.time', fake_time), \
|
||||
mock.patch(gen_func) as fake_generator, \
|
||||
mock.patch('swift.container.sync.ContainerBroker',
|
||||
lambda p: FakeContainerBroker(p, info={
|
||||
'account': 'a', 'container': 'c',
|
||||
'storage_policy_index': 0})):
|
||||
fake_generator.side_effect = [iter(['container.db']),
|
||||
iter(['container.db'])]
|
||||
cs = sync.ContainerSync({}, container_ring=FakeRing())
|
||||
try:
|
||||
cs.run_once()
|
||||
self.assertEqual(time_calls, [6])
|
||||
self.assertEqual(1, fake_generator.call_count)
|
||||
self.assertEqual(cs.reported, 3602)
|
||||
cs.run_once()
|
||||
except Exception as err:
|
||||
if str(err) != 'we are now done':
|
||||
raise
|
||||
|
||||
orig_time = sync.time
|
||||
orig_audit_location_generator = sync.audit_location_generator
|
||||
orig_ContainerBroker = sync.ContainerBroker
|
||||
try:
|
||||
sync.ContainerBroker = lambda p: FakeContainerBroker(
|
||||
p, info={'account': 'a', 'container': 'c',
|
||||
'storage_policy_index': 0})
|
||||
sync.time = fake_time
|
||||
|
||||
with mock.patch('swift.container.sync.InternalClient'):
|
||||
cs = sync.ContainerSync({}, container_ring=FakeRing())
|
||||
sync.audit_location_generator = fake_audit_location_generator
|
||||
cs.run_once(1, 2, a=3, b=4, verbose=True)
|
||||
self.assertEqual(time_calls, [6])
|
||||
self.assertEqual(audit_location_generator_calls, [1])
|
||||
self.assertEqual(cs.reported, 3602)
|
||||
cs.run_once()
|
||||
except Exception as err:
|
||||
if str(err) != 'we are now done':
|
||||
raise
|
||||
finally:
|
||||
sync.time = orig_time
|
||||
sync.audit_location_generator = orig_audit_location_generator
|
||||
sync.ContainerBroker = orig_ContainerBroker
|
||||
|
||||
self.assertEqual(time_calls, [10])
|
||||
self.assertEqual(audit_location_generator_calls, [2])
|
||||
self.assertEqual(cs.reported, 3604)
|
||||
self.assertEqual(time_calls, [10])
|
||||
self.assertEqual(2, fake_generator.call_count)
|
||||
self.assertEqual(cs.reported, 3604)
|
||||
|
||||
def test_container_sync_not_db(self):
|
||||
cring = FakeRing()
|
||||
@ -280,8 +261,65 @@ class TestContainerSync(unittest.TestCase):
|
||||
cring = FakeRing()
|
||||
with mock.patch('swift.container.sync.InternalClient'):
|
||||
cs = sync.ContainerSync({}, container_ring=cring)
|
||||
cs.container_sync('isa.db')
|
||||
self.assertEqual(cs.container_failures, 1)
|
||||
|
||||
broker = 'swift.container.backend.ContainerBroker'
|
||||
store = 'swift.container.sync_store.ContainerSyncStore'
|
||||
|
||||
# In this test we call the container_sync instance several
|
||||
# times with a missing db in various combinations.
|
||||
# Since we use the same ContainerSync instance for all tests
|
||||
# its failures counter increases by one with each call.
|
||||
|
||||
# Test the case where get_info returns DatabaseConnectionError
|
||||
# with DB does not exist, and we succeed in deleting it.
|
||||
with mock.patch(broker + '.get_info') as fake_get_info:
|
||||
with mock.patch(store + '.remove_synced_container') as fake_remove:
|
||||
fake_get_info.side_effect = DatabaseConnectionError(
|
||||
'a',
|
||||
"DB doesn't exist")
|
||||
cs.container_sync('isa.db')
|
||||
self.assertEqual(cs.container_failures, 1)
|
||||
self.assertEqual(cs.container_skips, 0)
|
||||
self.assertEqual(1, fake_remove.call_count)
|
||||
self.assertEqual('isa.db', fake_remove.call_args[0][0].db_file)
|
||||
|
||||
# Test the case where get_info returns DatabaseConnectionError
|
||||
# with DB does not exist, and we fail to delete it.
|
||||
with mock.patch(broker + '.get_info') as fake_get_info:
|
||||
with mock.patch(store + '.remove_synced_container') as fake_remove:
|
||||
fake_get_info.side_effect = DatabaseConnectionError(
|
||||
'a',
|
||||
"DB doesn't exist")
|
||||
fake_remove.side_effect = OSError('1')
|
||||
cs.container_sync('isa.db')
|
||||
self.assertEqual(cs.container_failures, 2)
|
||||
self.assertEqual(cs.container_skips, 0)
|
||||
self.assertEqual(1, fake_remove.call_count)
|
||||
self.assertEqual('isa.db', fake_remove.call_args[0][0].db_file)
|
||||
|
||||
# Test the case where get_info returns DatabaseConnectionError
|
||||
# with DB does not exist, and it returns an error != ENOENT.
|
||||
with mock.patch(broker + '.get_info') as fake_get_info:
|
||||
with mock.patch(store + '.remove_synced_container') as fake_remove:
|
||||
fake_get_info.side_effect = DatabaseConnectionError(
|
||||
'a',
|
||||
"DB doesn't exist")
|
||||
fake_remove.side_effect = OSError(errno.EPERM, 'a')
|
||||
cs.container_sync('isa.db')
|
||||
self.assertEqual(cs.container_failures, 3)
|
||||
self.assertEqual(cs.container_skips, 0)
|
||||
self.assertEqual(1, fake_remove.call_count)
|
||||
self.assertEqual('isa.db', fake_remove.call_args[0][0].db_file)
|
||||
|
||||
# Test the case where get_info returns DatabaseConnectionError
|
||||
# error different than DB does not exist
|
||||
with mock.patch(broker + '.get_info') as fake_get_info:
|
||||
with mock.patch(store + '.remove_synced_container') as fake_remove:
|
||||
fake_get_info.side_effect = DatabaseConnectionError('a', 'a')
|
||||
cs.container_sync('isa.db')
|
||||
self.assertEqual(cs.container_failures, 4)
|
||||
self.assertEqual(cs.container_skips, 0)
|
||||
self.assertEqual(0, fake_remove.call_count)
|
||||
|
||||
def test_container_sync_not_my_db(self):
|
||||
# Db could be there due to handoff replication so test that we ignore
|
||||
|
367
test/unit/container/test_sync_store.py
Normal file
367
test/unit/container/test_sync_store.py
Normal file
@ -0,0 +1,367 @@
|
||||
# Copyright (c) 2010-2016 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import errno
|
||||
import mock
|
||||
import random
|
||||
import logging
|
||||
import unittest
|
||||
import tempfile
|
||||
from shutil import rmtree
|
||||
from test.unit import debug_logger
|
||||
|
||||
from swift.container.backend import DATADIR
|
||||
from swift.container import sync_store
|
||||
|
||||
|
||||
class FakeContainerBroker(object):
|
||||
|
||||
def __init__(self, path):
|
||||
self.db_file = path
|
||||
self.db_dir = os.path.dirname(path)
|
||||
self.metadata = dict()
|
||||
self._is_deleted = False
|
||||
|
||||
def is_deleted(self):
|
||||
return self._is_deleted
|
||||
|
||||
|
||||
class TestContainerSyncStore(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.logger = debug_logger('test-container-sync-store')
|
||||
self.logger.level = logging.DEBUG
|
||||
self.test_dir_prefix = tempfile.mkdtemp()
|
||||
self.devices_dir = os.path.join(self.test_dir_prefix, 'srv/node/')
|
||||
os.makedirs(self.devices_dir)
|
||||
# Create dummy container dbs
|
||||
self.devices = ['sdax', 'sdb', 'sdc']
|
||||
self.partitions = ['21765', '38965', '13234']
|
||||
self.suffixes = ['312', '435']
|
||||
self.hashes = ['f19ed', '53ef', '0ab5', '9c3a']
|
||||
for device in self.devices:
|
||||
data_dir_path = os.path.join(self.devices_dir,
|
||||
device,
|
||||
DATADIR)
|
||||
os.makedirs(data_dir_path)
|
||||
for part in self.partitions:
|
||||
for suffix in self.suffixes:
|
||||
for hsh in self.hashes:
|
||||
db_dir = os.path.join(data_dir_path,
|
||||
part,
|
||||
suffix,
|
||||
hsh)
|
||||
os.makedirs(db_dir)
|
||||
db_file = os.path.join(db_dir, '%s.db' % hsh)
|
||||
with open(db_file, 'w') as outfile:
|
||||
outfile.write('%s' % db_file)
|
||||
|
||||
def teardown(self):
|
||||
rmtree(self.test_dir_prefix)
|
||||
|
||||
def pick_dbfile(self):
|
||||
hsh = random.choice(self.hashes)
|
||||
return os.path.join(self.devices_dir,
|
||||
random.choice(self.devices),
|
||||
DATADIR,
|
||||
random.choice(self.partitions),
|
||||
random.choice(self.suffixes),
|
||||
hsh,
|
||||
'%s.db' % hsh)
|
||||
|
||||
# Path conversion tests
|
||||
# container path is of the form:
|
||||
# /srv/node/sdb/containers/part/.../*.db
|
||||
# or more generally:
|
||||
# devices/device/DATADIR/part/.../*.db
|
||||
# synced container path is assumed to be of the form:
|
||||
# /srv/node/sdb/sync_containers/part/.../*.db
|
||||
# or more generally:
|
||||
# devices/device/SYNC_DATADIR/part/.../*.db
|
||||
# Indeed the ONLY DIFFERENCE is DATADIR <-> SYNC_DATADIR
|
||||
# Since, however, the strings represented by the constants
|
||||
# DATADIR or SYNC_DATADIR
|
||||
# can appear in the devices or the device part, the conversion
|
||||
# function between the two is a bit more subtle then a mere replacement.
|
||||
|
||||
# This function tests the conversion between a container path
|
||||
# and a synced container path
|
||||
def test_container_to_synced_container_path_conversion(self):
|
||||
# The conversion functions are oblivious to the suffix
|
||||
# so we just pick up a constant one.
|
||||
db_path_suffix = self._db_path_suffix()
|
||||
|
||||
# We build various container path putting in both
|
||||
# DATADIR and SYNC_DATADIR strings in the
|
||||
# device and devices parts.
|
||||
for devices, device in self._container_path_elements_generator():
|
||||
path = os.path.join(devices, device, DATADIR, db_path_suffix)
|
||||
# Call the conversion function
|
||||
sds = sync_store.ContainerSyncStore(devices, self.logger, False)
|
||||
path = sds._container_to_synced_container_path(path)
|
||||
# Validate that ONLY the DATADIR part was replaced with
|
||||
# sync_store.SYNC_DATADIR
|
||||
self._validate_container_path_parts(path, devices, device,
|
||||
sync_store.SYNC_DATADIR,
|
||||
db_path_suffix)
|
||||
|
||||
# This function tests the conversion between a synced container path
|
||||
# and a container path
|
||||
def test_synced_container_to_container_path_conversion(self):
|
||||
# The conversion functions are oblivious to the suffix
|
||||
# so we just pick up a constant one.
|
||||
db_path_suffix = ('133791/625/82a7f5a2c43281b0eab3597e35bb9625/'
|
||||
'82a7f5a2c43281b0eab3597e35bb9625.db')
|
||||
|
||||
# We build various synced container path putting in both
|
||||
# DATADIR and SYNC_DATADIR strings in the
|
||||
# device and devices parts.
|
||||
for devices, device in self._container_path_elements_generator():
|
||||
path = os.path.join(devices, device,
|
||||
sync_store.SYNC_DATADIR, db_path_suffix)
|
||||
# Call the conversion function
|
||||
sds = sync_store.ContainerSyncStore(devices, self.logger, False)
|
||||
path = sds._synced_container_to_container_path(path)
|
||||
# Validate that ONLY the SYNC_DATADIR part was replaced with
|
||||
# DATADIR
|
||||
self._validate_container_path_parts(path, devices, device,
|
||||
DATADIR,
|
||||
db_path_suffix)
|
||||
|
||||
# Constructs a db path suffix of the form:
|
||||
# 133791/625/82...25/82...25.db
|
||||
def _db_path_suffix(self):
|
||||
def random_hexa_string(length):
|
||||
'%0xlength' % random.randrange(16 ** length)
|
||||
|
||||
db = random_hexa_string(32)
|
||||
return '%s/%s/%s/%s.db' % (random_hexa_string(5),
|
||||
random_hexa_string(3),
|
||||
db, db)
|
||||
|
||||
def _container_path_elements_generator(self):
|
||||
# We build various container path elements putting in both
|
||||
# DATADIR and SYNC_DATADIR strings in the
|
||||
# device and devices parts.
|
||||
for devices in ['/srv/node', '/srv/node/',
|
||||
'/srv/node/dev',
|
||||
'/srv/node/%s' % DATADIR,
|
||||
'/srv/node/%s' % sync_store.SYNC_DATADIR]:
|
||||
for device in ['sdf1', 'sdf1/sdf2',
|
||||
'sdf1/%s' % DATADIR,
|
||||
'sdf1/%s' % sync_store.SYNC_DATADIR,
|
||||
'%s/sda' % DATADIR,
|
||||
'%s/sda' % sync_store.SYNC_DATADIR]:
|
||||
yield devices, device
|
||||
|
||||
def _validate_container_path_parts(self, path, devices,
|
||||
device, target, suffix):
|
||||
# Recall that the path is of the form:
|
||||
# devices/device/target/suffix
|
||||
# where each of the sub path elements (e.g. devices)
|
||||
# has a path structure containing path elements separated by '/'
|
||||
# We thus validate by splitting the path according to '/'
|
||||
# traversing all of its path elements making sure that the
|
||||
# first elements are those of devices,
|
||||
# the second are those of device
|
||||
# etc.
|
||||
spath = path.split('/')
|
||||
spath.reverse()
|
||||
self.assertEqual(spath.pop(), '')
|
||||
# Validate path against 'devices'
|
||||
for p in [p for p in devices.split('/') if p]:
|
||||
self.assertEqual(spath.pop(), p)
|
||||
# Validate path against 'device'
|
||||
for p in [p for p in device.split('/') if p]:
|
||||
self.assertEqual(spath.pop(), p)
|
||||
# Validate path against target
|
||||
self.assertEqual(spath.pop(), target)
|
||||
# Validate path against suffix
|
||||
for p in [p for p in suffix.split('/') if p]:
|
||||
self.assertEqual(spath.pop(), p)
|
||||
|
||||
def test_add_synced_container(self):
|
||||
# Add non-existing and existing synced containers
|
||||
sds = sync_store.ContainerSyncStore(self.devices_dir,
|
||||
self.logger,
|
||||
False)
|
||||
cfile = self.pick_dbfile()
|
||||
broker = FakeContainerBroker(cfile)
|
||||
for i in range(2):
|
||||
sds.add_synced_container(broker)
|
||||
scpath = sds._container_to_synced_container_path(cfile)
|
||||
with open(scpath, 'r') as infile:
|
||||
self.assertEqual(infile.read(), cfile)
|
||||
|
||||
iterated_synced_containers = list()
|
||||
for db_path in sds.synced_containers_generator():
|
||||
iterated_synced_containers.append(db_path)
|
||||
|
||||
self.assertEqual(len(iterated_synced_containers), 1)
|
||||
|
||||
def test_remove_synced_container(self):
|
||||
# Add a synced container to remove
|
||||
sds = sync_store.ContainerSyncStore(self.devices_dir,
|
||||
self.logger,
|
||||
False)
|
||||
cfile = self.pick_dbfile()
|
||||
# We keep here the link file so as to validate its deletion later
|
||||
lfile = sds._container_to_synced_container_path(cfile)
|
||||
broker = FakeContainerBroker(cfile)
|
||||
sds.add_synced_container(broker)
|
||||
|
||||
# Remove existing and non-existing synced containers
|
||||
for i in range(2):
|
||||
sds.remove_synced_container(broker)
|
||||
|
||||
iterated_synced_containers = list()
|
||||
for db_path in sds.synced_containers_generator():
|
||||
iterated_synced_containers.append(db_path)
|
||||
|
||||
self.assertEqual(len(iterated_synced_containers), 0)
|
||||
|
||||
# Make sure the whole link path gets deleted
|
||||
# recall that the path has the following suffix:
|
||||
# <hexa string of length 6>/<hexa string of length 3>/
|
||||
# <hexa string of length 32>/<same 32 hexa string>.db
|
||||
# and we expect the .db as well as all path elements
|
||||
# to get deleted
|
||||
self.assertFalse(os.path.exists(lfile))
|
||||
lfile = os.path.dirname(lfile)
|
||||
for i in range(3):
|
||||
self.assertFalse(os.path.exists(os.path.dirname(lfile)))
|
||||
lfile = os.path.dirname(lfile)
|
||||
|
||||
def test_iterate_synced_containers(self):
|
||||
# populate sync container db
|
||||
sds = sync_store.ContainerSyncStore(self.devices_dir,
|
||||
self.logger,
|
||||
False)
|
||||
containers = list()
|
||||
for i in range(10):
|
||||
cfile = self.pick_dbfile()
|
||||
broker = FakeContainerBroker(cfile)
|
||||
sds.add_synced_container(broker)
|
||||
containers.append(cfile)
|
||||
|
||||
iterated_synced_containers = list()
|
||||
for db_path in sds.synced_containers_generator():
|
||||
iterated_synced_containers.append(db_path)
|
||||
|
||||
self.assertEqual(
|
||||
set(containers), set(iterated_synced_containers))
|
||||
|
||||
def test_unhandled_exceptions_in_add_remove(self):
|
||||
sds = sync_store.ContainerSyncStore(self.devices_dir,
|
||||
self.logger,
|
||||
False)
|
||||
cfile = self.pick_dbfile()
|
||||
broker = FakeContainerBroker(cfile)
|
||||
|
||||
with mock.patch(
|
||||
'swift.container.sync_store.os.stat',
|
||||
side_effect=OSError(errno.EPERM, 'permission denied')):
|
||||
with self.assertRaises(OSError) as cm:
|
||||
sds.add_synced_container(broker)
|
||||
self.assertEqual(errno.EPERM, cm.exception.errno)
|
||||
|
||||
with mock.patch(
|
||||
'swift.container.sync_store.os.makedirs',
|
||||
side_effect=OSError(errno.EPERM, 'permission denied')):
|
||||
with self.assertRaises(OSError) as cm:
|
||||
sds.add_synced_container(broker)
|
||||
self.assertEqual(errno.EPERM, cm.exception.errno)
|
||||
|
||||
with mock.patch(
|
||||
'swift.container.sync_store.os.symlink',
|
||||
side_effect=OSError(errno.EPERM, 'permission denied')):
|
||||
with self.assertRaises(OSError) as cm:
|
||||
sds.add_synced_container(broker)
|
||||
self.assertEqual(errno.EPERM, cm.exception.errno)
|
||||
|
||||
with mock.patch(
|
||||
'swift.container.sync_store.os.unlink',
|
||||
side_effect=OSError(errno.EPERM, 'permission denied')):
|
||||
with self.assertRaises(OSError) as cm:
|
||||
sds.remove_synced_container(broker)
|
||||
self.assertEqual(errno.EPERM, cm.exception.errno)
|
||||
|
||||
def test_update_sync_store_according_to_metadata_and_deleted(self):
|
||||
# This function tests the update_sync_store 'logics'
|
||||
# with respect to various combinations of the
|
||||
# sync-to and sync-key metadata items and whether
|
||||
# the database is marked for delete.
|
||||
# The table below summarizes the expected result
|
||||
# for the various combinations, e.g.:
|
||||
# If metadata items exist and the database
|
||||
# is not marked for delete then add should be called.
|
||||
|
||||
results_list = [
|
||||
[False, 'a', 'b', 'add'],
|
||||
[False, 'a', '', 'remove'],
|
||||
[False, 'a', None, 'remove'],
|
||||
[False, '', 'b', 'remove'],
|
||||
[False, '', '', 'remove'],
|
||||
[False, '', None, 'remove'],
|
||||
[False, None, 'b', 'remove'],
|
||||
[False, None, '', 'remove'],
|
||||
[False, None, None, 'none'],
|
||||
[True, 'a', 'b', 'remove'],
|
||||
[True, 'a', '', 'remove'],
|
||||
[True, 'a', None, 'remove'],
|
||||
[True, '', 'b', 'remove'],
|
||||
[True, '', '', 'remove'],
|
||||
[True, '', None, 'remove'],
|
||||
[True, None, 'b', 'remove'],
|
||||
[True, None, '', 'remove'],
|
||||
[True, None, None, 'none'],
|
||||
]
|
||||
|
||||
store = 'swift.container.sync_store.ContainerSyncStore'
|
||||
with mock.patch(store + '.add_synced_container') as add_container:
|
||||
with mock.patch(
|
||||
store + '.remove_synced_container') as remove_container:
|
||||
sds = sync_store.ContainerSyncStore(self.devices_dir,
|
||||
self.logger,
|
||||
False)
|
||||
add_calls = 0
|
||||
remove_calls = 0
|
||||
# We now iterate over the list of combinations
|
||||
# Validating that add and removed are called as
|
||||
# expected
|
||||
for deleted, sync_to, sync_key, expected_op in results_list:
|
||||
cfile = self.pick_dbfile()
|
||||
broker = FakeContainerBroker(cfile)
|
||||
broker._is_deleted = deleted
|
||||
if sync_to is not None:
|
||||
broker.metadata['X-Container-Sync-To'] = [
|
||||
sync_to, 1]
|
||||
if sync_key is not None:
|
||||
broker.metadata['X-Container-Sync-Key'] = [
|
||||
sync_key, 1]
|
||||
sds.update_sync_store(broker)
|
||||
if expected_op == 'add':
|
||||
add_calls += 1
|
||||
if expected_op == 'remove':
|
||||
remove_calls += 1
|
||||
self.assertEqual(add_container.call_count,
|
||||
add_calls)
|
||||
self.assertEqual(remove_container.call_count,
|
||||
remove_calls)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
x
Reference in New Issue
Block a user