Container-Sync to iterate only over synced containers
This change introduces a sync_store which holds only containers that are enabled for sync. The store is implemented using a directory structure that resembles that of the containers directory, but has entries only for containers enabled for sync. The store is maintained in two ways: 1. Preemptively by the container server when processing PUT/POST/DELETE operations targeted at containers with x-container-sync-key / x-container-sync-to 2. In the background using the containers replicator whenever it processes a container set up for sync The change updates [1] [1] http://docs.openstack.org/developer/swift/overview_container_sync.html Change-Id: I9ae4d4c7ff6336611df4122b7c753cc4fa46c0ff Closes-Bug: #1476623
This commit is contained in:
parent
f53cf1043d
commit
85a0a6a28e
@ -29,7 +29,7 @@ synchronization key.
|
|||||||
Configuring Container Sync
|
Configuring Container Sync
|
||||||
--------------------------
|
--------------------------
|
||||||
|
|
||||||
Create a container-sync-realms.conf file specifying the allowable clusters
|
Create a ``container-sync-realms.conf`` file specifying the allowable clusters
|
||||||
and their information::
|
and their information::
|
||||||
|
|
||||||
[realm1]
|
[realm1]
|
||||||
@ -50,18 +50,18 @@ clusters that have agreed to allow container syncing with each other. Realm
|
|||||||
names will be considered case insensitive.
|
names will be considered case insensitive.
|
||||||
|
|
||||||
The key is the overall cluster-to-cluster key used in combination with the
|
The key is the overall cluster-to-cluster key used in combination with the
|
||||||
external users' key that they set on their containers' X-Container-Sync-Key
|
external users' key that they set on their containers'
|
||||||
metadata header values. These keys will be used to sign each request the
|
``X-Container-Sync-Key`` metadata header values. These keys will be used to
|
||||||
container sync daemon makes and used to validate each incoming container sync
|
sign each request the container sync daemon makes and used to validate each
|
||||||
request.
|
incoming container sync request.
|
||||||
|
|
||||||
The key2 is optional and is an additional key incoming requests will be checked
|
The key2 is optional and is an additional key incoming requests will be checked
|
||||||
against. This is so you can rotate keys if you wish; you move the existing key
|
against. This is so you can rotate keys if you wish; you move the existing key
|
||||||
to key2 and make a new key value.
|
to key2 and make a new key value.
|
||||||
|
|
||||||
Any values in the realm section whose names begin with cluster\_ will indicate
|
Any values in the realm section whose names begin with ``cluster_`` will
|
||||||
the name and endpoint of a cluster and will be used by external users in
|
indicate the name and endpoint of a cluster and will be used by external users in
|
||||||
their containers' X-Container-Sync-To metadata header values with the format
|
their containers' ``X-Container-Sync-To`` metadata header values with the format
|
||||||
"//realm_name/cluster_name/account_name/container_name". Realm and cluster
|
"//realm_name/cluster_name/account_name/container_name". Realm and cluster
|
||||||
names are considered case insensitive.
|
names are considered case insensitive.
|
||||||
|
|
||||||
@ -71,7 +71,7 @@ container servers, since that is where the container sync daemon runs. Note
|
|||||||
that the endpoint ends with /v1/ and that the container sync daemon will then
|
that the endpoint ends with /v1/ and that the container sync daemon will then
|
||||||
add the account/container/obj name after that.
|
add the account/container/obj name after that.
|
||||||
|
|
||||||
Distribute this container-sync-realms.conf file to all your proxy servers
|
Distribute this ``container-sync-realms.conf`` file to all your proxy servers
|
||||||
and container servers.
|
and container servers.
|
||||||
|
|
||||||
You also need to add the container_sync middleware to your proxy pipeline. It
|
You also need to add the container_sync middleware to your proxy pipeline. It
|
||||||
@ -95,7 +95,7 @@ section, Configuring Container Sync, for the new-style.
|
|||||||
With the old-style, the Swift cluster operator must allow synchronization with
|
With the old-style, the Swift cluster operator must allow synchronization with
|
||||||
a set of hosts before the user can enable container synchronization. First, the
|
a set of hosts before the user can enable container synchronization. First, the
|
||||||
backend container server needs to be given this list of hosts in the
|
backend container server needs to be given this list of hosts in the
|
||||||
container-server.conf file::
|
``container-server.conf`` file::
|
||||||
|
|
||||||
[DEFAULT]
|
[DEFAULT]
|
||||||
# This is a comma separated list of hosts allowed in the
|
# This is a comma separated list of hosts allowed in the
|
||||||
@ -170,8 +170,8 @@ we'll make next::
|
|||||||
|
|
||||||
The ``-t`` indicates the cluster to sync to, which is the realm name of the
|
The ``-t`` indicates the cluster to sync to, which is the realm name of the
|
||||||
section from container-sync-realms.conf, followed by the cluster name from
|
section from container-sync-realms.conf, followed by the cluster name from
|
||||||
that section (without the cluster\_ prefix), followed by the account and container names we want to sync to.
|
that section (without the cluster\_ prefix), followed by the account and container
|
||||||
The ``-k`` specifies the secret key the two containers will share for
|
names we want to sync to. The ``-k`` specifies the secret key the two containers will share for
|
||||||
synchronization; this is the user key, the cluster key in
|
synchronization; this is the user key, the cluster key in
|
||||||
container-sync-realms.conf will also be used behind the scenes.
|
container-sync-realms.conf will also be used behind the scenes.
|
||||||
|
|
||||||
@ -195,8 +195,18 @@ as it gets synchronized over to the second::
|
|||||||
list container2
|
list container2
|
||||||
|
|
||||||
[Nothing there yet, so we wait a bit...]
|
[Nothing there yet, so we wait a bit...]
|
||||||
[If you're an operator running SAIO and just testing, you may need to
|
|
||||||
run 'swift-init container-sync once' to perform a sync scan.]
|
.. note::
|
||||||
|
|
||||||
|
If you're an operator running SAIO and just testing, each time you
|
||||||
|
configure a container for synchronization and place objects in the
|
||||||
|
source container you will need to ensure that container-sync runs
|
||||||
|
before attempting to retrieve objects from the target container.
|
||||||
|
That is, you need to run::
|
||||||
|
|
||||||
|
swift-init container-sync once
|
||||||
|
|
||||||
|
Now expect to see objects copied from the first container to the second::
|
||||||
|
|
||||||
$ swift -A http://cluster2/auth/v1.0 -U test2:tester2 -K testing2 \
|
$ swift -A http://cluster2/auth/v1.0 -U test2:tester2 -K testing2 \
|
||||||
list container2
|
list container2
|
||||||
@ -340,13 +350,34 @@ synchronize to the second, we could have used this curl command::
|
|||||||
What's going on behind the scenes, in the cluster?
|
What's going on behind the scenes, in the cluster?
|
||||||
--------------------------------------------------
|
--------------------------------------------------
|
||||||
|
|
||||||
The swift-container-sync does the job of sending updates to the remote
|
Container ring devices have a directory called ``containers``, where container
|
||||||
container.
|
databases reside. In addition to ``containers``, each container ring device
|
||||||
|
also has a directory called ``sync-containers``. ``sync-containers`` holds
|
||||||
|
symlinks to container databases that were configured for container sync using
|
||||||
|
``x-container-sync-to`` and ``x-container-sync-key`` metadata keys.
|
||||||
|
|
||||||
This is done by scanning the local devices for container databases and
|
The swift-container-sync process does the job of sending updates to the remote
|
||||||
checking for x-container-sync-to and x-container-sync-key metadata values.
|
container. This is done by scanning ``sync-containers`` for container
|
||||||
If they exist, newer rows since the last sync will trigger PUTs or DELETEs
|
databases. For each container db found, newer rows since the last sync will
|
||||||
to the other container.
|
trigger PUTs or DELETEs to the other container.
|
||||||
|
|
||||||
|
``sync-containers`` is maintained as follows:
|
||||||
|
Whenever the container-server processes a PUT or a POST request that carries
|
||||||
|
``x-container-sync-to`` and ``x-container-sync-key`` metadata keys the server
|
||||||
|
creates a symlink to the container database in ``sync-containers``. Whenever
|
||||||
|
the container server deletes a synced container, the appropriate symlink
|
||||||
|
is deleted from ``sync-containers``.
|
||||||
|
|
||||||
|
In addition to the container-server, the container-replicator process does the
|
||||||
|
job of identifying containers that should be synchronized. This is done by
|
||||||
|
scanning the local devices for container databases and checking for
|
||||||
|
x-container-sync-to and x-container-sync-key metadata values. If they exist
|
||||||
|
then a symlink to the container database is created in a sync-containers
|
||||||
|
sub-directory on the same device.
|
||||||
|
|
||||||
|
Similarly, when the container sync metadata keys are deleted, the container
|
||||||
|
server and container-replicator would take care of deleting the symlinks
|
||||||
|
from ``sync-containers``.
|
||||||
|
|
||||||
.. note::
|
.. note::
|
||||||
|
|
||||||
|
@ -20,6 +20,7 @@ import time
|
|||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from eventlet import Timeout
|
from eventlet import Timeout
|
||||||
|
|
||||||
|
from swift.container.sync_store import ContainerSyncStore
|
||||||
from swift.container.backend import ContainerBroker, DATADIR
|
from swift.container.backend import ContainerBroker, DATADIR
|
||||||
from swift.container.reconciler import (
|
from swift.container.reconciler import (
|
||||||
MISPLACED_OBJECTS_ACCOUNT, incorrect_policy_index,
|
MISPLACED_OBJECTS_ACCOUNT, incorrect_policy_index,
|
||||||
@ -189,6 +190,13 @@ class ContainerReplicator(db_replicator.Replicator):
|
|||||||
def _post_replicate_hook(self, broker, info, responses):
|
def _post_replicate_hook(self, broker, info, responses):
|
||||||
if info['account'] == MISPLACED_OBJECTS_ACCOUNT:
|
if info['account'] == MISPLACED_OBJECTS_ACCOUNT:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.sync_store.update_sync_store(broker)
|
||||||
|
except Exception:
|
||||||
|
self.logger.exception('Failed to update sync_store %s' %
|
||||||
|
broker.db_file)
|
||||||
|
|
||||||
point = broker.get_reconciler_sync()
|
point = broker.get_reconciler_sync()
|
||||||
if not broker.has_multiple_policies() and info['max_row'] != point:
|
if not broker.has_multiple_policies() and info['max_row'] != point:
|
||||||
broker.update_reconciler_sync(info['max_row'])
|
broker.update_reconciler_sync(info['max_row'])
|
||||||
@ -210,6 +218,13 @@ class ContainerReplicator(db_replicator.Replicator):
|
|||||||
# this container shouldn't be here, make sure it's cleaned up
|
# this container shouldn't be here, make sure it's cleaned up
|
||||||
self.reconciler_cleanups[broker.container] = broker
|
self.reconciler_cleanups[broker.container] = broker
|
||||||
return
|
return
|
||||||
|
try:
|
||||||
|
# DB is going to get deleted. Be preemptive about it
|
||||||
|
self.sync_store.remove_synced_container(broker)
|
||||||
|
except Exception:
|
||||||
|
self.logger.exception('Failed to remove sync_store entry %s' %
|
||||||
|
broker.db_file)
|
||||||
|
|
||||||
return super(ContainerReplicator, self).delete_db(broker)
|
return super(ContainerReplicator, self).delete_db(broker)
|
||||||
|
|
||||||
def replicate_reconcilers(self):
|
def replicate_reconcilers(self):
|
||||||
@ -237,6 +252,9 @@ class ContainerReplicator(db_replicator.Replicator):
|
|||||||
def run_once(self, *args, **kwargs):
|
def run_once(self, *args, **kwargs):
|
||||||
self.reconciler_containers = {}
|
self.reconciler_containers = {}
|
||||||
self.reconciler_cleanups = {}
|
self.reconciler_cleanups = {}
|
||||||
|
self.sync_store = ContainerSyncStore(self.root,
|
||||||
|
self.logger,
|
||||||
|
self.mount_check)
|
||||||
rv = super(ContainerReplicator, self).run_once(*args, **kwargs)
|
rv = super(ContainerReplicator, self).run_once(*args, **kwargs)
|
||||||
if any([self.reconciler_containers, self.reconciler_cleanups]):
|
if any([self.reconciler_containers, self.reconciler_cleanups]):
|
||||||
self.replicate_reconcilers()
|
self.replicate_reconcilers()
|
||||||
|
@ -23,6 +23,7 @@ from xml.etree.cElementTree import Element, SubElement, tostring
|
|||||||
from eventlet import Timeout
|
from eventlet import Timeout
|
||||||
|
|
||||||
import swift.common.db
|
import swift.common.db
|
||||||
|
from swift.container.sync_store import ContainerSyncStore
|
||||||
from swift.container.backend import ContainerBroker, DATADIR
|
from swift.container.backend import ContainerBroker, DATADIR
|
||||||
from swift.container.replicator import ContainerReplicatorRpc
|
from swift.container.replicator import ContainerReplicatorRpc
|
||||||
from swift.common.db import DatabaseAlreadyExists
|
from swift.common.db import DatabaseAlreadyExists
|
||||||
@ -110,6 +111,9 @@ class ContainerController(BaseStorageServer):
|
|||||||
self.save_headers.append('x-versions-location')
|
self.save_headers.append('x-versions-location')
|
||||||
swift.common.db.DB_PREALLOCATION = \
|
swift.common.db.DB_PREALLOCATION = \
|
||||||
config_true_value(conf.get('db_preallocation', 'f'))
|
config_true_value(conf.get('db_preallocation', 'f'))
|
||||||
|
self.sync_store = ContainerSyncStore(self.root,
|
||||||
|
self.logger,
|
||||||
|
self.mount_check)
|
||||||
|
|
||||||
def _get_container_broker(self, drive, part, account, container, **kwargs):
|
def _get_container_broker(self, drive, part, account, container, **kwargs):
|
||||||
"""
|
"""
|
||||||
@ -242,6 +246,13 @@ class ContainerController(BaseStorageServer):
|
|||||||
else:
|
else:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def _update_sync_store(self, broker, method):
|
||||||
|
try:
|
||||||
|
self.sync_store.update_sync_store(broker)
|
||||||
|
except Exception:
|
||||||
|
self.logger.exception('Failed to update sync_store %s during %s' %
|
||||||
|
broker.db_file, method)
|
||||||
|
|
||||||
@public
|
@public
|
||||||
@timing_stats()
|
@timing_stats()
|
||||||
def DELETE(self, req):
|
def DELETE(self, req):
|
||||||
@ -276,6 +287,7 @@ class ContainerController(BaseStorageServer):
|
|||||||
broker.delete_db(req_timestamp.internal)
|
broker.delete_db(req_timestamp.internal)
|
||||||
if not broker.is_deleted():
|
if not broker.is_deleted():
|
||||||
return HTTPConflict(request=req)
|
return HTTPConflict(request=req)
|
||||||
|
self._update_sync_store(broker, 'DELETE')
|
||||||
resp = self.account_update(req, account, container, broker)
|
resp = self.account_update(req, account, container, broker)
|
||||||
if resp:
|
if resp:
|
||||||
return resp
|
return resp
|
||||||
@ -381,6 +393,8 @@ class ContainerController(BaseStorageServer):
|
|||||||
broker.metadata['X-Container-Sync-To'][0]:
|
broker.metadata['X-Container-Sync-To'][0]:
|
||||||
broker.set_x_container_sync_points(-1, -1)
|
broker.set_x_container_sync_points(-1, -1)
|
||||||
broker.update_metadata(metadata, validate_metadata=True)
|
broker.update_metadata(metadata, validate_metadata=True)
|
||||||
|
if metadata:
|
||||||
|
self._update_sync_store(broker, 'PUT')
|
||||||
resp = self.account_update(req, account, container, broker)
|
resp = self.account_update(req, account, container, broker)
|
||||||
if resp:
|
if resp:
|
||||||
return resp
|
return resp
|
||||||
@ -564,6 +578,7 @@ class ContainerController(BaseStorageServer):
|
|||||||
broker.metadata['X-Container-Sync-To'][0]:
|
broker.metadata['X-Container-Sync-To'][0]:
|
||||||
broker.set_x_container_sync_points(-1, -1)
|
broker.set_x_container_sync_points(-1, -1)
|
||||||
broker.update_metadata(metadata, validate_metadata=True)
|
broker.update_metadata(metadata, validate_metadata=True)
|
||||||
|
self._update_sync_store(broker, 'POST')
|
||||||
return HTTPNoContent(request=req)
|
return HTTPNoContent(request=req)
|
||||||
|
|
||||||
def __call__(self, env, start_response):
|
def __call__(self, env, start_response):
|
||||||
|
@ -24,7 +24,9 @@ from struct import unpack_from
|
|||||||
from eventlet import sleep, Timeout
|
from eventlet import sleep, Timeout
|
||||||
|
|
||||||
import swift.common.db
|
import swift.common.db
|
||||||
from swift.container.backend import ContainerBroker, DATADIR
|
from swift.common.db import DatabaseConnectionError
|
||||||
|
from swift.container.backend import ContainerBroker
|
||||||
|
from swift.container.sync_store import ContainerSyncStore
|
||||||
from swift.common.container_sync_realms import ContainerSyncRealms
|
from swift.common.container_sync_realms import ContainerSyncRealms
|
||||||
from swift.common.internal_client import (
|
from swift.common.internal_client import (
|
||||||
delete_object, put_object, InternalClient, UnexpectedResponse)
|
delete_object, put_object, InternalClient, UnexpectedResponse)
|
||||||
@ -32,7 +34,7 @@ from swift.common.exceptions import ClientException
|
|||||||
from swift.common.ring import Ring
|
from swift.common.ring import Ring
|
||||||
from swift.common.ring.utils import is_local_device
|
from swift.common.ring.utils import is_local_device
|
||||||
from swift.common.utils import (
|
from swift.common.utils import (
|
||||||
audit_location_generator, clean_content_type, config_true_value,
|
clean_content_type, config_true_value,
|
||||||
FileLikeIter, get_logger, hash_path, quote, urlparse, validate_sync_to,
|
FileLikeIter, get_logger, hash_path, quote, urlparse, validate_sync_to,
|
||||||
whataremyips, Timestamp)
|
whataremyips, Timestamp)
|
||||||
from swift.common.daemon import Daemon
|
from swift.common.daemon import Daemon
|
||||||
@ -187,6 +189,10 @@ class ContainerSync(Daemon):
|
|||||||
a.strip()
|
a.strip()
|
||||||
for a in conf.get('sync_proxy', '').split(',')
|
for a in conf.get('sync_proxy', '').split(',')
|
||||||
if a.strip()]
|
if a.strip()]
|
||||||
|
#: ContainerSyncStore instance for iterating over synced containers
|
||||||
|
self.sync_store = ContainerSyncStore(self.devices,
|
||||||
|
self.logger,
|
||||||
|
self.mount_check)
|
||||||
#: Number of containers with sync turned on that were successfully
|
#: Number of containers with sync turned on that were successfully
|
||||||
#: synced.
|
#: synced.
|
||||||
self.container_syncs = 0
|
self.container_syncs = 0
|
||||||
@ -194,7 +200,8 @@ class ContainerSync(Daemon):
|
|||||||
self.container_deletes = 0
|
self.container_deletes = 0
|
||||||
#: Number of successful PUTs triggered.
|
#: Number of successful PUTs triggered.
|
||||||
self.container_puts = 0
|
self.container_puts = 0
|
||||||
#: Number of containers that didn't have sync turned on.
|
#: Number of containers whose sync has been turned off, but
|
||||||
|
#: are not yet cleared from the sync store.
|
||||||
self.container_skips = 0
|
self.container_skips = 0
|
||||||
#: Number of containers that had a failure of some type.
|
#: Number of containers that had a failure of some type.
|
||||||
self.container_failures = 0
|
self.container_failures = 0
|
||||||
@ -247,10 +254,7 @@ class ContainerSync(Daemon):
|
|||||||
sleep(random() * self.interval)
|
sleep(random() * self.interval)
|
||||||
while True:
|
while True:
|
||||||
begin = time()
|
begin = time()
|
||||||
all_locs = audit_location_generator(self.devices, DATADIR, '.db',
|
for path in self.sync_store.synced_containers_generator():
|
||||||
mount_check=self.mount_check,
|
|
||||||
logger=self.logger)
|
|
||||||
for path, device, partition in all_locs:
|
|
||||||
self.container_sync(path)
|
self.container_sync(path)
|
||||||
if time() - self.reported >= 3600: # once an hour
|
if time() - self.reported >= 3600: # once an hour
|
||||||
self.report()
|
self.report()
|
||||||
@ -264,10 +268,7 @@ class ContainerSync(Daemon):
|
|||||||
"""
|
"""
|
||||||
self.logger.info(_('Begin container sync "once" mode'))
|
self.logger.info(_('Begin container sync "once" mode'))
|
||||||
begin = time()
|
begin = time()
|
||||||
all_locs = audit_location_generator(self.devices, DATADIR, '.db',
|
for path in self.sync_store.synced_containers_generator():
|
||||||
mount_check=self.mount_check,
|
|
||||||
logger=self.logger)
|
|
||||||
for path, device, partition in all_locs:
|
|
||||||
self.container_sync(path)
|
self.container_sync(path)
|
||||||
if time() - self.reported >= 3600: # once an hour
|
if time() - self.reported >= 3600: # once an hour
|
||||||
self.report()
|
self.report()
|
||||||
@ -308,7 +309,20 @@ class ContainerSync(Daemon):
|
|||||||
broker = None
|
broker = None
|
||||||
try:
|
try:
|
||||||
broker = ContainerBroker(path)
|
broker = ContainerBroker(path)
|
||||||
info = broker.get_info()
|
# The path we pass to the ContainerBroker is a real path of
|
||||||
|
# a container DB. If we get here, however, it means that this
|
||||||
|
# path is linked from the sync_containers dir. In rare cases
|
||||||
|
# of race or processes failures the link can be stale and
|
||||||
|
# the get_info below will raise a DB doesn't exist exception
|
||||||
|
# In this case we remove the stale link and raise an error
|
||||||
|
# since in most cases the db should be there.
|
||||||
|
try:
|
||||||
|
info = broker.get_info()
|
||||||
|
except DatabaseConnectionError as db_err:
|
||||||
|
if str(db_err).endswith("DB doesn't exist"):
|
||||||
|
self.sync_store.remove_synced_container(broker)
|
||||||
|
raise
|
||||||
|
|
||||||
x, nodes = self.container_ring.get_nodes(info['account'],
|
x, nodes = self.container_ring.get_nodes(info['account'],
|
||||||
info['container'])
|
info['container'])
|
||||||
for ordinal, node in enumerate(nodes):
|
for ordinal, node in enumerate(nodes):
|
||||||
@ -388,7 +402,7 @@ class ContainerSync(Daemon):
|
|||||||
broker.set_x_container_sync_points(sync_point1, None)
|
broker.set_x_container_sync_points(sync_point1, None)
|
||||||
self.container_syncs += 1
|
self.container_syncs += 1
|
||||||
self.logger.increment('syncs')
|
self.logger.increment('syncs')
|
||||||
except (Exception, Timeout) as err:
|
except (Exception, Timeout):
|
||||||
self.container_failures += 1
|
self.container_failures += 1
|
||||||
self.logger.increment('failures')
|
self.logger.increment('failures')
|
||||||
self.logger.exception(_('ERROR Syncing %s'),
|
self.logger.exception(_('ERROR Syncing %s'),
|
||||||
|
177
swift/container/sync_store.py
Normal file
177
swift/container/sync_store.py
Normal file
@ -0,0 +1,177 @@
|
|||||||
|
# Copyright (c) 2010-2016 OpenStack Foundation
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import os
|
||||||
|
import errno
|
||||||
|
|
||||||
|
from swift.common.utils import audit_location_generator, mkdirs
|
||||||
|
from swift.container.backend import DATADIR
|
||||||
|
|
||||||
|
SYNC_DATADIR = 'sync_containers'
|
||||||
|
|
||||||
|
|
||||||
|
class ContainerSyncStore(object):
|
||||||
|
"""
|
||||||
|
Filesystem based store for local containers that needs to be synced.
|
||||||
|
|
||||||
|
The store holds a list of containers that need to be synced by the
|
||||||
|
container sync daemon. The store is local to the container server node,
|
||||||
|
that is, only containers whose databases are kept locally on the node are
|
||||||
|
listed.
|
||||||
|
"""
|
||||||
|
def __init__(self, devices, logger, mount_check):
|
||||||
|
self.devices = os.path.normpath(os.path.join('/', devices)) + '/'
|
||||||
|
self.logger = logger
|
||||||
|
self.mount_check = mount_check
|
||||||
|
|
||||||
|
def _container_to_synced_container_path(self, path):
|
||||||
|
# path is assumed to be of the form:
|
||||||
|
# /srv/node/sdb/containers/part/.../*.db
|
||||||
|
# or more generally:
|
||||||
|
# devices/device/containers/part/.../*.db
|
||||||
|
# Below we split the path to the following parts:
|
||||||
|
# devices, device, rest
|
||||||
|
devices = self.devices
|
||||||
|
path = os.path.normpath(path)
|
||||||
|
device = path[len(devices):path.rfind(DATADIR)]
|
||||||
|
rest = path[path.rfind(DATADIR) + len(DATADIR) + 1:]
|
||||||
|
|
||||||
|
return os.path.join(devices, device, SYNC_DATADIR, rest)
|
||||||
|
|
||||||
|
def _synced_container_to_container_path(self, path):
|
||||||
|
# synced path is assumed to be of the form:
|
||||||
|
# /srv/node/sdb/sync_containers/part/.../*.db
|
||||||
|
# or more generally:
|
||||||
|
# devices/device/sync_containers/part/.../*.db
|
||||||
|
# Below we split the path to the following parts:
|
||||||
|
# devices, device, rest
|
||||||
|
devices = self.devices
|
||||||
|
path = os.path.normpath(path)
|
||||||
|
device = path[len(devices):path.rfind(SYNC_DATADIR)]
|
||||||
|
rest = path[path.rfind(SYNC_DATADIR) + len(SYNC_DATADIR) + 1:]
|
||||||
|
|
||||||
|
return os.path.join(devices, device, DATADIR, rest)
|
||||||
|
|
||||||
|
def add_synced_container(self, broker):
|
||||||
|
"""
|
||||||
|
Adds the container db represented by broker to the list of synced
|
||||||
|
containers.
|
||||||
|
|
||||||
|
:param broker: An instance of ContainerBroker representing the
|
||||||
|
container to add.
|
||||||
|
"""
|
||||||
|
sync_file = self._container_to_synced_container_path(broker.db_file)
|
||||||
|
stat = None
|
||||||
|
try:
|
||||||
|
stat = os.stat(sync_file)
|
||||||
|
except OSError as oserr:
|
||||||
|
if oserr.errno != errno.ENOENT:
|
||||||
|
raise oserr
|
||||||
|
|
||||||
|
if stat is not None:
|
||||||
|
return
|
||||||
|
|
||||||
|
sync_path = os.path.dirname(sync_file)
|
||||||
|
mkdirs(sync_path)
|
||||||
|
|
||||||
|
try:
|
||||||
|
os.symlink(broker.db_file, sync_file)
|
||||||
|
except OSError as oserr:
|
||||||
|
if (oserr.errno != errno.EEXIST or
|
||||||
|
not os.path.islink(sync_file)):
|
||||||
|
raise oserr
|
||||||
|
|
||||||
|
def remove_synced_container(self, broker):
|
||||||
|
"""
|
||||||
|
Removes the container db represented by broker from the list of synced
|
||||||
|
containers.
|
||||||
|
|
||||||
|
:param broker: An instance of ContainerBroker representing the
|
||||||
|
container to remove.
|
||||||
|
"""
|
||||||
|
sync_file = broker.db_file
|
||||||
|
sync_file = self._container_to_synced_container_path(sync_file)
|
||||||
|
try:
|
||||||
|
os.unlink(sync_file)
|
||||||
|
os.removedirs(os.path.dirname(sync_file))
|
||||||
|
except OSError as oserr:
|
||||||
|
if oserr.errno != errno.ENOENT:
|
||||||
|
raise oserr
|
||||||
|
|
||||||
|
def update_sync_store(self, broker):
|
||||||
|
"""
|
||||||
|
Add or remove a symlink to/from the sync-containers directory
|
||||||
|
according to the broker's metadata.
|
||||||
|
|
||||||
|
Decide according to the broker x-container-sync-to and
|
||||||
|
x-container-sync-key whether a symlink needs to be added or
|
||||||
|
removed.
|
||||||
|
|
||||||
|
We mention that if both metadata items do not appear
|
||||||
|
at all, the container has never been set for sync in reclaim_age
|
||||||
|
in which case we do nothing. This is important as this method is
|
||||||
|
called for ALL containers from the container replicator.
|
||||||
|
|
||||||
|
Once we realize that we do need to do something, we check if
|
||||||
|
the container is marked for delete, in which case we want to
|
||||||
|
remove the symlink
|
||||||
|
|
||||||
|
For adding a symlink we notice that both x-container-sync-to and
|
||||||
|
x-container-sync-key exist and are valid, that is, are not empty.
|
||||||
|
|
||||||
|
At this point we know we need to do something, the container
|
||||||
|
is not marked for delete and the condition to add a symlink
|
||||||
|
is not met. conclusion need to remove the symlink.
|
||||||
|
|
||||||
|
:param broker: An instance of ContainerBroker
|
||||||
|
"""
|
||||||
|
# If the broker metadata does not have both x-container-sync-to
|
||||||
|
# and x-container-sync-key it has *never* been set. Make sure
|
||||||
|
# we do nothing in this case
|
||||||
|
if ('X-Container-Sync-To' not in broker.metadata and
|
||||||
|
'X-Container-Sync-Key' not in broker.metadata):
|
||||||
|
return
|
||||||
|
|
||||||
|
if broker.is_deleted():
|
||||||
|
self.remove_synced_container(broker)
|
||||||
|
return
|
||||||
|
|
||||||
|
# If both x-container-sync-to and x-container-sync-key
|
||||||
|
# exist and valid, add the symlink
|
||||||
|
sync_to = sync_key = None
|
||||||
|
if 'X-Container-Sync-To' in broker.metadata:
|
||||||
|
sync_to = broker.metadata['X-Container-Sync-To'][0]
|
||||||
|
if 'X-Container-Sync-Key' in broker.metadata:
|
||||||
|
sync_key = broker.metadata['X-Container-Sync-Key'][0]
|
||||||
|
if sync_to and sync_key:
|
||||||
|
self.add_synced_container(broker)
|
||||||
|
return
|
||||||
|
|
||||||
|
self.remove_synced_container(broker)
|
||||||
|
|
||||||
|
def synced_containers_generator(self):
|
||||||
|
"""
|
||||||
|
Iterates over the list of synced containers
|
||||||
|
yielding the path of the container db
|
||||||
|
"""
|
||||||
|
all_locs = audit_location_generator(self.devices, SYNC_DATADIR, '.db',
|
||||||
|
mount_check=self.mount_check,
|
||||||
|
logger=self.logger)
|
||||||
|
for path, device, partition in all_locs:
|
||||||
|
# What we want to yield is the real path as its being used for
|
||||||
|
# initiating a container broker. The broker would break if not
|
||||||
|
# given the db real path, as it e.g. assumes the existence of
|
||||||
|
# .pending in the same path
|
||||||
|
yield self._synced_container_to_container_path(path)
|
@ -18,8 +18,9 @@ from nose import SkipTest
|
|||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from six.moves.urllib.parse import urlparse
|
from six.moves.urllib.parse import urlparse
|
||||||
from swiftclient import client
|
from swiftclient import client, ClientException
|
||||||
|
|
||||||
|
from swift.common.http import HTTP_NOT_FOUND
|
||||||
from swift.common.manager import Manager
|
from swift.common.manager import Manager
|
||||||
from test.probe.common import ReplProbeTest, ENABLED_POLICIES
|
from test.probe.common import ReplProbeTest, ENABLED_POLICIES
|
||||||
|
|
||||||
@ -49,25 +50,27 @@ class TestContainerSync(ReplProbeTest):
|
|||||||
super(TestContainerSync, self).setUp()
|
super(TestContainerSync, self).setUp()
|
||||||
self.realm, self.cluster = get_current_realm_cluster(self.url)
|
self.realm, self.cluster = get_current_realm_cluster(self.url)
|
||||||
|
|
||||||
def test_sync(self):
|
def _setup_synced_containers(self, skey='secret', dkey='secret'):
|
||||||
base_headers = {'X-Container-Sync-Key': 'secret'}
|
|
||||||
|
|
||||||
# setup dest container
|
# setup dest container
|
||||||
dest_container = 'dest-container-%s' % uuid.uuid4()
|
dest_container = 'dest-container-%s' % uuid.uuid4()
|
||||||
dest_headers = base_headers.copy()
|
dest_headers = {}
|
||||||
dest_policy = None
|
dest_policy = None
|
||||||
if len(ENABLED_POLICIES) > 1:
|
if len(ENABLED_POLICIES) > 1:
|
||||||
dest_policy = random.choice(ENABLED_POLICIES)
|
dest_policy = random.choice(ENABLED_POLICIES)
|
||||||
dest_headers['X-Storage-Policy'] = dest_policy.name
|
dest_headers['X-Storage-Policy'] = dest_policy.name
|
||||||
|
if dkey is not None:
|
||||||
|
dest_headers['X-Container-Sync-Key'] = dkey
|
||||||
client.put_container(self.url, self.token, dest_container,
|
client.put_container(self.url, self.token, dest_container,
|
||||||
headers=dest_headers)
|
headers=dest_headers)
|
||||||
|
|
||||||
# setup source container
|
# setup source container
|
||||||
source_container = 'source-container-%s' % uuid.uuid4()
|
source_container = 'source-container-%s' % uuid.uuid4()
|
||||||
source_headers = base_headers.copy()
|
source_headers = {}
|
||||||
sync_to = '//%s/%s/%s/%s' % (self.realm, self.cluster, self.account,
|
sync_to = '//%s/%s/%s/%s' % (self.realm, self.cluster, self.account,
|
||||||
dest_container)
|
dest_container)
|
||||||
source_headers['X-Container-Sync-To'] = sync_to
|
source_headers['X-Container-Sync-To'] = sync_to
|
||||||
|
if skey is not None:
|
||||||
|
source_headers['X-Container-Sync-Key'] = skey
|
||||||
if dest_policy:
|
if dest_policy:
|
||||||
source_policy = random.choice([p for p in ENABLED_POLICIES
|
source_policy = random.choice([p for p in ENABLED_POLICIES
|
||||||
if p is not dest_policy])
|
if p is not dest_policy])
|
||||||
@ -75,6 +78,11 @@ class TestContainerSync(ReplProbeTest):
|
|||||||
client.put_container(self.url, self.token, source_container,
|
client.put_container(self.url, self.token, source_container,
|
||||||
headers=source_headers)
|
headers=source_headers)
|
||||||
|
|
||||||
|
return source_container, dest_container
|
||||||
|
|
||||||
|
def test_sync(self):
|
||||||
|
source_container, dest_container = self._setup_synced_containers()
|
||||||
|
|
||||||
# upload to source
|
# upload to source
|
||||||
object_name = 'object-%s' % uuid.uuid4()
|
object_name = 'object-%s' % uuid.uuid4()
|
||||||
client.put_object(self.url, self.token, source_container, object_name,
|
client.put_object(self.url, self.token, source_container, object_name,
|
||||||
@ -83,11 +91,63 @@ class TestContainerSync(ReplProbeTest):
|
|||||||
# cycle container-sync
|
# cycle container-sync
|
||||||
Manager(['container-sync']).once()
|
Manager(['container-sync']).once()
|
||||||
|
|
||||||
# retrieve from sync'd container
|
_junk, body = client.get_object(self.url, self.token,
|
||||||
headers, body = client.get_object(self.url, self.token,
|
dest_container, object_name)
|
||||||
dest_container, object_name)
|
|
||||||
self.assertEqual(body, 'test-body')
|
self.assertEqual(body, 'test-body')
|
||||||
|
|
||||||
|
def test_sync_lazy_skey(self):
|
||||||
|
# Create synced containers, but with no key at source
|
||||||
|
source_container, dest_container =\
|
||||||
|
self._setup_synced_containers(None, 'secret')
|
||||||
|
|
||||||
|
# upload to source
|
||||||
|
object_name = 'object-%s' % uuid.uuid4()
|
||||||
|
client.put_object(self.url, self.token, source_container, object_name,
|
||||||
|
'test-body')
|
||||||
|
|
||||||
|
# cycle container-sync, nothing should happen
|
||||||
|
Manager(['container-sync']).once()
|
||||||
|
with self.assertRaises(ClientException) as err:
|
||||||
|
_junk, body = client.get_object(self.url, self.token,
|
||||||
|
dest_container, object_name)
|
||||||
|
self.assertEqual(err.exception.http_status, HTTP_NOT_FOUND)
|
||||||
|
|
||||||
|
# amend source key
|
||||||
|
source_headers = {'X-Container-Sync-Key': 'secret'}
|
||||||
|
client.put_container(self.url, self.token, source_container,
|
||||||
|
headers=source_headers)
|
||||||
|
# cycle container-sync, should replicate
|
||||||
|
Manager(['container-sync']).once()
|
||||||
|
_junk, body = client.get_object(self.url, self.token,
|
||||||
|
dest_container, object_name)
|
||||||
|
self.assertEqual(body, 'test-body')
|
||||||
|
|
||||||
|
def test_sync_lazy_dkey(self):
|
||||||
|
# Create synced containers, but with no key at dest
|
||||||
|
source_container, dest_container =\
|
||||||
|
self._setup_synced_containers('secret', None)
|
||||||
|
|
||||||
|
# upload to source
|
||||||
|
object_name = 'object-%s' % uuid.uuid4()
|
||||||
|
client.put_object(self.url, self.token, source_container, object_name,
|
||||||
|
'test-body')
|
||||||
|
|
||||||
|
# cycle container-sync, nothing should happen
|
||||||
|
Manager(['container-sync']).once()
|
||||||
|
with self.assertRaises(ClientException) as err:
|
||||||
|
_junk, body = client.get_object(self.url, self.token,
|
||||||
|
dest_container, object_name)
|
||||||
|
self.assertEqual(err.exception.http_status, HTTP_NOT_FOUND)
|
||||||
|
|
||||||
|
# amend dest key
|
||||||
|
dest_headers = {'X-Container-Sync-Key': 'secret'}
|
||||||
|
client.put_container(self.url, self.token, dest_container,
|
||||||
|
headers=dest_headers)
|
||||||
|
# cycle container-sync, should replicate
|
||||||
|
Manager(['container-sync']).once()
|
||||||
|
_junk, body = client.get_object(self.url, self.token,
|
||||||
|
dest_container, object_name)
|
||||||
|
self.assertEqual(body, 'test-body')
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
@ -23,14 +23,14 @@ import random
|
|||||||
import sqlite3
|
import sqlite3
|
||||||
|
|
||||||
from swift.common import db_replicator
|
from swift.common import db_replicator
|
||||||
from swift.container import replicator, backend, server
|
from swift.container import replicator, backend, server, sync_store
|
||||||
from swift.container.reconciler import (
|
from swift.container.reconciler import (
|
||||||
MISPLACED_OBJECTS_ACCOUNT, get_reconciler_container_name)
|
MISPLACED_OBJECTS_ACCOUNT, get_reconciler_container_name)
|
||||||
from swift.common.utils import Timestamp
|
from swift.common.utils import Timestamp
|
||||||
from swift.common.storage_policy import POLICIES
|
from swift.common.storage_policy import POLICIES
|
||||||
|
|
||||||
from test.unit.common import test_db_replicator
|
from test.unit.common import test_db_replicator
|
||||||
from test.unit import patch_policies, make_timestamp_iter
|
from test.unit import patch_policies, make_timestamp_iter, FakeLogger
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
|
|
||||||
|
|
||||||
@ -998,6 +998,135 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
|||||||
daemon._post_replicate_hook(broker, info, [])
|
daemon._post_replicate_hook(broker, info, [])
|
||||||
self.assertEqual(0, len(calls))
|
self.assertEqual(0, len(calls))
|
||||||
|
|
||||||
|
def test_update_sync_store_exception(self):
|
||||||
|
class FakeContainerSyncStore(object):
|
||||||
|
def update_sync_store(self, broker):
|
||||||
|
raise OSError(1, '1')
|
||||||
|
|
||||||
|
logger = FakeLogger()
|
||||||
|
daemon = replicator.ContainerReplicator({}, logger)
|
||||||
|
daemon.sync_store = FakeContainerSyncStore()
|
||||||
|
ts_iter = make_timestamp_iter()
|
||||||
|
broker = self._get_broker('a', 'c', node_index=0)
|
||||||
|
timestamp = next(ts_iter)
|
||||||
|
broker.initialize(timestamp.internal, POLICIES.default.idx)
|
||||||
|
info = broker.get_replication_info()
|
||||||
|
daemon._post_replicate_hook(broker, info, [])
|
||||||
|
log_lines = logger.get_lines_for_level('error')
|
||||||
|
self.assertEqual(1, len(log_lines))
|
||||||
|
self.assertIn('Failed to update sync_store', log_lines[0])
|
||||||
|
|
||||||
|
def test_update_sync_store(self):
|
||||||
|
klass = 'swift.container.sync_store.ContainerSyncStore'
|
||||||
|
daemon = replicator.ContainerReplicator({})
|
||||||
|
daemon.sync_store = sync_store.ContainerSyncStore(
|
||||||
|
daemon.root, daemon.logger, daemon.mount_check)
|
||||||
|
ts_iter = make_timestamp_iter()
|
||||||
|
broker = self._get_broker('a', 'c', node_index=0)
|
||||||
|
timestamp = next(ts_iter)
|
||||||
|
broker.initialize(timestamp.internal, POLICIES.default.idx)
|
||||||
|
info = broker.get_replication_info()
|
||||||
|
with mock.patch(klass + '.remove_synced_container') as mock_remove:
|
||||||
|
with mock.patch(klass + '.add_synced_container') as mock_add:
|
||||||
|
daemon._post_replicate_hook(broker, info, [])
|
||||||
|
self.assertEqual(0, mock_remove.call_count)
|
||||||
|
self.assertEqual(0, mock_add.call_count)
|
||||||
|
|
||||||
|
timestamp = next(ts_iter)
|
||||||
|
# sync-to and sync-key empty - remove from store
|
||||||
|
broker.update_metadata(
|
||||||
|
{'X-Container-Sync-To': ('', timestamp.internal),
|
||||||
|
'X-Container-Sync-Key': ('', timestamp.internal)})
|
||||||
|
with mock.patch(klass + '.remove_synced_container') as mock_remove:
|
||||||
|
with mock.patch(klass + '.add_synced_container') as mock_add:
|
||||||
|
daemon._post_replicate_hook(broker, info, [])
|
||||||
|
self.assertEqual(0, mock_add.call_count)
|
||||||
|
mock_remove.assert_called_once_with(broker)
|
||||||
|
|
||||||
|
timestamp = next(ts_iter)
|
||||||
|
# sync-to is not empty sync-key is empty - remove from store
|
||||||
|
broker.update_metadata(
|
||||||
|
{'X-Container-Sync-To': ('a', timestamp.internal)})
|
||||||
|
with mock.patch(klass + '.remove_synced_container') as mock_remove:
|
||||||
|
with mock.patch(klass + '.add_synced_container') as mock_add:
|
||||||
|
daemon._post_replicate_hook(broker, info, [])
|
||||||
|
self.assertEqual(0, mock_add.call_count)
|
||||||
|
mock_remove.assert_called_once_with(broker)
|
||||||
|
|
||||||
|
timestamp = next(ts_iter)
|
||||||
|
# sync-to is empty sync-key is not empty - remove from store
|
||||||
|
broker.update_metadata(
|
||||||
|
{'X-Container-Sync-To': ('', timestamp.internal),
|
||||||
|
'X-Container-Sync-Key': ('secret', timestamp.internal)})
|
||||||
|
with mock.patch(klass + '.remove_synced_container') as mock_remove:
|
||||||
|
with mock.patch(klass + '.add_synced_container') as mock_add:
|
||||||
|
daemon._post_replicate_hook(broker, info, [])
|
||||||
|
self.assertEqual(0, mock_add.call_count)
|
||||||
|
mock_remove.assert_called_once_with(broker)
|
||||||
|
|
||||||
|
timestamp = next(ts_iter)
|
||||||
|
# sync-to, sync-key both not empty - add to store
|
||||||
|
broker.update_metadata(
|
||||||
|
{'X-Container-Sync-To': ('a', timestamp.internal),
|
||||||
|
'X-Container-Sync-Key': ('secret', timestamp.internal)})
|
||||||
|
with mock.patch(klass + '.remove_synced_container') as mock_remove:
|
||||||
|
with mock.patch(klass + '.add_synced_container') as mock_add:
|
||||||
|
daemon._post_replicate_hook(broker, info, [])
|
||||||
|
mock_add.assert_called_once_with(broker)
|
||||||
|
self.assertEqual(0, mock_remove.call_count)
|
||||||
|
|
||||||
|
timestamp = next(ts_iter)
|
||||||
|
# container is removed - need to remove from store
|
||||||
|
broker.delete_db(timestamp.internal)
|
||||||
|
broker.update_metadata(
|
||||||
|
{'X-Container-Sync-To': ('a', timestamp.internal),
|
||||||
|
'X-Container-Sync-Key': ('secret', timestamp.internal)})
|
||||||
|
with mock.patch(klass + '.remove_synced_container') as mock_remove:
|
||||||
|
with mock.patch(klass + '.add_synced_container') as mock_add:
|
||||||
|
daemon._post_replicate_hook(broker, info, [])
|
||||||
|
self.assertEqual(0, mock_add.call_count)
|
||||||
|
mock_remove.assert_called_once_with(broker)
|
||||||
|
|
||||||
|
def test_sync_triggers_sync_store_update(self):
|
||||||
|
klass = 'swift.container.sync_store.ContainerSyncStore'
|
||||||
|
ts_iter = make_timestamp_iter()
|
||||||
|
# Create two containers as follows:
|
||||||
|
# broker_1 which is not set for sync
|
||||||
|
# broker_2 which is set for sync and then unset
|
||||||
|
# test that while replicating both we see no activity
|
||||||
|
# for broker_1, and the anticipated activity for broker_2
|
||||||
|
broker_1 = self._get_broker('a', 'c', node_index=0)
|
||||||
|
broker_1.initialize(next(ts_iter).internal, POLICIES.default.idx)
|
||||||
|
broker_2 = self._get_broker('b', 'd', node_index=0)
|
||||||
|
broker_2.initialize(next(ts_iter).internal, POLICIES.default.idx)
|
||||||
|
broker_2.update_metadata(
|
||||||
|
{'X-Container-Sync-To': ('a', next(ts_iter).internal),
|
||||||
|
'X-Container-Sync-Key': ('secret', next(ts_iter).internal)})
|
||||||
|
|
||||||
|
# replicate once according to broker_1
|
||||||
|
# relying on the fact that FakeRing would place both
|
||||||
|
# in the same partition.
|
||||||
|
part, node = self._get_broker_part_node(broker_1)
|
||||||
|
with mock.patch(klass + '.remove_synced_container') as mock_remove:
|
||||||
|
with mock.patch(klass + '.add_synced_container') as mock_add:
|
||||||
|
self._run_once(node)
|
||||||
|
self.assertEqual(1, mock_add.call_count)
|
||||||
|
self.assertEqual(broker_2.db_file, mock_add.call_args[0][0].db_file)
|
||||||
|
self.assertEqual(0, mock_remove.call_count)
|
||||||
|
|
||||||
|
broker_2.update_metadata(
|
||||||
|
{'X-Container-Sync-To': ('', next(ts_iter).internal)})
|
||||||
|
# replicate once this time according to broker_2
|
||||||
|
# relying on the fact that FakeRing would place both
|
||||||
|
# in the same partition.
|
||||||
|
part, node = self._get_broker_part_node(broker_2)
|
||||||
|
with mock.patch(klass + '.remove_synced_container') as mock_remove:
|
||||||
|
with mock.patch(klass + '.add_synced_container') as mock_add:
|
||||||
|
self._run_once(node)
|
||||||
|
self.assertEqual(0, mock_add.call_count)
|
||||||
|
self.assertEqual(1, mock_remove.call_count)
|
||||||
|
self.assertEqual(broker_2.db_file, mock_remove.call_args[0][0].db_file)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
@ -1153,6 +1153,75 @@ class TestContainerController(unittest.TestCase):
|
|||||||
self.assertEqual(info['x_container_sync_point1'], -1)
|
self.assertEqual(info['x_container_sync_point1'], -1)
|
||||||
self.assertEqual(info['x_container_sync_point2'], -1)
|
self.assertEqual(info['x_container_sync_point2'], -1)
|
||||||
|
|
||||||
|
def test_update_sync_store_on_PUT(self):
|
||||||
|
# Create a synced container and validate a link is created
|
||||||
|
self._create_synced_container_and_validate_sync_store('PUT')
|
||||||
|
# remove the sync using PUT and validate the link is deleted
|
||||||
|
self._remove_sync_and_validate_sync_store('PUT')
|
||||||
|
|
||||||
|
def test_update_sync_store_on_POST(self):
|
||||||
|
# Create a container and validate a link is not created
|
||||||
|
self._create_container_and_validate_sync_store()
|
||||||
|
# Update the container to be synced and validate a link is created
|
||||||
|
self._create_synced_container_and_validate_sync_store('POST')
|
||||||
|
# remove the sync using POST and validate the link is deleted
|
||||||
|
self._remove_sync_and_validate_sync_store('POST')
|
||||||
|
|
||||||
|
def test_update_sync_store_on_DELETE(self):
|
||||||
|
# Create a synced container and validate a link is created
|
||||||
|
self._create_synced_container_and_validate_sync_store('PUT')
|
||||||
|
# Remove the container and validate the link is deleted
|
||||||
|
self._remove_sync_and_validate_sync_store('DELETE')
|
||||||
|
|
||||||
|
def _create_container_and_validate_sync_store(self):
|
||||||
|
req = Request.blank(
|
||||||
|
'/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'},
|
||||||
|
headers={'x-timestamp': '0'})
|
||||||
|
req.get_response(self.controller)
|
||||||
|
db = self.controller._get_container_broker('sda1', 'p', 'a', 'c')
|
||||||
|
sync_store = self.controller.sync_store
|
||||||
|
db_path = db.db_file
|
||||||
|
db_link = sync_store._container_to_synced_container_path(db_path)
|
||||||
|
self.assertFalse(os.path.exists(db_link))
|
||||||
|
sync_containers = [c for c in sync_store.synced_containers_generator()]
|
||||||
|
self.assertFalse(sync_containers)
|
||||||
|
|
||||||
|
def _create_synced_container_and_validate_sync_store(self, method):
|
||||||
|
req = Request.blank(
|
||||||
|
'/sda1/p/a/c', environ={'REQUEST_METHOD': method},
|
||||||
|
headers={'x-timestamp': '1',
|
||||||
|
'x-container-sync-to': 'http://127.0.0.1:12345/v1/a/c',
|
||||||
|
'x-container-sync-key': '1234'})
|
||||||
|
req.get_response(self.controller)
|
||||||
|
db = self.controller._get_container_broker('sda1', 'p', 'a', 'c')
|
||||||
|
sync_store = self.controller.sync_store
|
||||||
|
db_path = db.db_file
|
||||||
|
db_link = sync_store._container_to_synced_container_path(db_path)
|
||||||
|
self.assertTrue(os.path.exists(db_link))
|
||||||
|
sync_containers = [c for c in sync_store.synced_containers_generator()]
|
||||||
|
self.assertEqual(1, len(sync_containers))
|
||||||
|
self.assertEqual(db_path, sync_containers[0])
|
||||||
|
|
||||||
|
def _remove_sync_and_validate_sync_store(self, method):
|
||||||
|
if method == 'DELETE':
|
||||||
|
headers = {'x-timestamp': '2'}
|
||||||
|
else:
|
||||||
|
headers = {'x-timestamp': '2',
|
||||||
|
'x-container-sync-to': '',
|
||||||
|
'x-container-sync-key': '1234'}
|
||||||
|
|
||||||
|
req = Request.blank(
|
||||||
|
'/sda1/p/a/c', environ={'REQUEST_METHOD': method},
|
||||||
|
headers=headers)
|
||||||
|
req.get_response(self.controller)
|
||||||
|
db = self.controller._get_container_broker('sda1', 'p', 'a', 'c')
|
||||||
|
sync_store = self.controller.sync_store
|
||||||
|
db_path = db.db_file
|
||||||
|
db_link = sync_store._container_to_synced_container_path(db_path)
|
||||||
|
self.assertFalse(os.path.exists(db_link))
|
||||||
|
sync_containers = [c for c in sync_store.synced_containers_generator()]
|
||||||
|
self.assertFalse(sync_containers)
|
||||||
|
|
||||||
def test_REPLICATE_insufficient_storage(self):
|
def test_REPLICATE_insufficient_storage(self):
|
||||||
conf = {'devices': self.testdir, 'mount_check': 'true'}
|
conf = {'devices': self.testdir, 'mount_check': 'true'}
|
||||||
self.container_controller = container_server.ContainerController(
|
self.container_controller = container_server.ContainerController(
|
||||||
|
@ -19,8 +19,10 @@ import unittest
|
|||||||
from textwrap import dedent
|
from textwrap import dedent
|
||||||
|
|
||||||
import mock
|
import mock
|
||||||
|
import errno
|
||||||
from test.unit import debug_logger
|
from test.unit import debug_logger
|
||||||
from swift.container import sync
|
from swift.container import sync
|
||||||
|
from swift.common.db import DatabaseConnectionError
|
||||||
from swift.common import utils
|
from swift.common import utils
|
||||||
from swift.common.wsgi import ConfigString
|
from swift.common.wsgi import ConfigString
|
||||||
from swift.common.exceptions import ClientException
|
from swift.common.exceptions import ClientException
|
||||||
@ -47,6 +49,7 @@ class FakeContainerBroker(object):
|
|||||||
def __init__(self, path, metadata=None, info=None, deleted=False,
|
def __init__(self, path, metadata=None, info=None, deleted=False,
|
||||||
items_since=None):
|
items_since=None):
|
||||||
self.db_file = path
|
self.db_file = path
|
||||||
|
self.db_dir = os.path.dirname(path)
|
||||||
self.metadata = metadata if metadata else {}
|
self.metadata = metadata if metadata else {}
|
||||||
self.info = info if info else {}
|
self.info = info if info else {}
|
||||||
self.deleted = deleted
|
self.deleted = deleted
|
||||||
@ -157,7 +160,6 @@ class TestContainerSync(unittest.TestCase):
|
|||||||
# interval sleep.
|
# interval sleep.
|
||||||
time_calls = [0]
|
time_calls = [0]
|
||||||
sleep_calls = []
|
sleep_calls = []
|
||||||
audit_location_generator_calls = [0]
|
|
||||||
|
|
||||||
def fake_time():
|
def fake_time():
|
||||||
time_calls[0] += 1
|
time_calls[0] += 1
|
||||||
@ -176,48 +178,36 @@ class TestContainerSync(unittest.TestCase):
|
|||||||
def fake_sleep(amount):
|
def fake_sleep(amount):
|
||||||
sleep_calls.append(amount)
|
sleep_calls.append(amount)
|
||||||
|
|
||||||
def fake_audit_location_generator(*args, **kwargs):
|
gen_func = ('swift.container.sync_store.'
|
||||||
audit_location_generator_calls[0] += 1
|
'ContainerSyncStore.synced_containers_generator')
|
||||||
# Makes .container_sync() short-circuit
|
with mock.patch('swift.container.sync.InternalClient'), \
|
||||||
yield 'container.db', 'device', 'partition'
|
mock.patch('swift.container.sync.time', fake_time), \
|
||||||
return
|
mock.patch('swift.container.sync.sleep', fake_sleep), \
|
||||||
|
mock.patch(gen_func) as fake_generator, \
|
||||||
|
mock.patch('swift.container.sync.ContainerBroker',
|
||||||
|
lambda p: FakeContainerBroker(p, info={
|
||||||
|
'account': 'a', 'container': 'c',
|
||||||
|
'storage_policy_index': 0})):
|
||||||
|
fake_generator.side_effect = [iter(['container.db']),
|
||||||
|
iter(['container.db'])]
|
||||||
|
cs = sync.ContainerSync({}, container_ring=FakeRing())
|
||||||
|
try:
|
||||||
|
cs.run_forever()
|
||||||
|
except Exception as err:
|
||||||
|
if str(err) != 'we are now done':
|
||||||
|
raise
|
||||||
|
|
||||||
orig_time = sync.time
|
self.assertEqual(time_calls, [9])
|
||||||
orig_sleep = sync.sleep
|
self.assertEqual(len(sleep_calls), 2)
|
||||||
orig_ContainerBroker = sync.ContainerBroker
|
self.assertLessEqual(sleep_calls[0], cs.interval)
|
||||||
orig_audit_location_generator = sync.audit_location_generator
|
self.assertEqual(cs.interval - 1, sleep_calls[1])
|
||||||
try:
|
self.assertEqual(2, fake_generator.call_count)
|
||||||
sync.ContainerBroker = lambda p: FakeContainerBroker(
|
self.assertEqual(cs.reported, 3602)
|
||||||
p, info={'account': 'a', 'container': 'c',
|
|
||||||
'storage_policy_index': 0})
|
|
||||||
sync.time = fake_time
|
|
||||||
sync.sleep = fake_sleep
|
|
||||||
|
|
||||||
with mock.patch('swift.container.sync.InternalClient'):
|
|
||||||
cs = sync.ContainerSync({}, container_ring=FakeRing())
|
|
||||||
sync.audit_location_generator = fake_audit_location_generator
|
|
||||||
cs.run_forever(1, 2, a=3, b=4, verbose=True)
|
|
||||||
except Exception as err:
|
|
||||||
if str(err) != 'we are now done':
|
|
||||||
raise
|
|
||||||
finally:
|
|
||||||
sync.time = orig_time
|
|
||||||
sync.sleep = orig_sleep
|
|
||||||
sync.audit_location_generator = orig_audit_location_generator
|
|
||||||
sync.ContainerBroker = orig_ContainerBroker
|
|
||||||
|
|
||||||
self.assertEqual(time_calls, [9])
|
|
||||||
self.assertEqual(len(sleep_calls), 2)
|
|
||||||
self.assertTrue(sleep_calls[0] <= cs.interval)
|
|
||||||
self.assertTrue(sleep_calls[1] == cs.interval - 1)
|
|
||||||
self.assertEqual(audit_location_generator_calls, [2])
|
|
||||||
self.assertEqual(cs.reported, 3602)
|
|
||||||
|
|
||||||
def test_run_once(self):
|
def test_run_once(self):
|
||||||
# This runs runs_once with fakes twice, the first causing an interim
|
# This runs runs_once with fakes twice, the first causing an interim
|
||||||
# report, the second with no interim report.
|
# report, the second with no interim report.
|
||||||
time_calls = [0]
|
time_calls = [0]
|
||||||
audit_location_generator_calls = [0]
|
|
||||||
|
|
||||||
def fake_time():
|
def fake_time():
|
||||||
time_calls[0] += 1
|
time_calls[0] += 1
|
||||||
@ -235,40 +225,31 @@ class TestContainerSync(unittest.TestCase):
|
|||||||
raise Exception('we are now done')
|
raise Exception('we are now done')
|
||||||
return returns[time_calls[0] - 1]
|
return returns[time_calls[0] - 1]
|
||||||
|
|
||||||
def fake_audit_location_generator(*args, **kwargs):
|
gen_func = ('swift.container.sync_store.'
|
||||||
audit_location_generator_calls[0] += 1
|
'ContainerSyncStore.synced_containers_generator')
|
||||||
# Makes .container_sync() short-circuit
|
with mock.patch('swift.container.sync.InternalClient'), \
|
||||||
yield 'container.db', 'device', 'partition'
|
mock.patch('swift.container.sync.time', fake_time), \
|
||||||
return
|
mock.patch(gen_func) as fake_generator, \
|
||||||
|
mock.patch('swift.container.sync.ContainerBroker',
|
||||||
|
lambda p: FakeContainerBroker(p, info={
|
||||||
|
'account': 'a', 'container': 'c',
|
||||||
|
'storage_policy_index': 0})):
|
||||||
|
fake_generator.side_effect = [iter(['container.db']),
|
||||||
|
iter(['container.db'])]
|
||||||
|
cs = sync.ContainerSync({}, container_ring=FakeRing())
|
||||||
|
try:
|
||||||
|
cs.run_once()
|
||||||
|
self.assertEqual(time_calls, [6])
|
||||||
|
self.assertEqual(1, fake_generator.call_count)
|
||||||
|
self.assertEqual(cs.reported, 3602)
|
||||||
|
cs.run_once()
|
||||||
|
except Exception as err:
|
||||||
|
if str(err) != 'we are now done':
|
||||||
|
raise
|
||||||
|
|
||||||
orig_time = sync.time
|
self.assertEqual(time_calls, [10])
|
||||||
orig_audit_location_generator = sync.audit_location_generator
|
self.assertEqual(2, fake_generator.call_count)
|
||||||
orig_ContainerBroker = sync.ContainerBroker
|
self.assertEqual(cs.reported, 3604)
|
||||||
try:
|
|
||||||
sync.ContainerBroker = lambda p: FakeContainerBroker(
|
|
||||||
p, info={'account': 'a', 'container': 'c',
|
|
||||||
'storage_policy_index': 0})
|
|
||||||
sync.time = fake_time
|
|
||||||
|
|
||||||
with mock.patch('swift.container.sync.InternalClient'):
|
|
||||||
cs = sync.ContainerSync({}, container_ring=FakeRing())
|
|
||||||
sync.audit_location_generator = fake_audit_location_generator
|
|
||||||
cs.run_once(1, 2, a=3, b=4, verbose=True)
|
|
||||||
self.assertEqual(time_calls, [6])
|
|
||||||
self.assertEqual(audit_location_generator_calls, [1])
|
|
||||||
self.assertEqual(cs.reported, 3602)
|
|
||||||
cs.run_once()
|
|
||||||
except Exception as err:
|
|
||||||
if str(err) != 'we are now done':
|
|
||||||
raise
|
|
||||||
finally:
|
|
||||||
sync.time = orig_time
|
|
||||||
sync.audit_location_generator = orig_audit_location_generator
|
|
||||||
sync.ContainerBroker = orig_ContainerBroker
|
|
||||||
|
|
||||||
self.assertEqual(time_calls, [10])
|
|
||||||
self.assertEqual(audit_location_generator_calls, [2])
|
|
||||||
self.assertEqual(cs.reported, 3604)
|
|
||||||
|
|
||||||
def test_container_sync_not_db(self):
|
def test_container_sync_not_db(self):
|
||||||
cring = FakeRing()
|
cring = FakeRing()
|
||||||
@ -280,8 +261,65 @@ class TestContainerSync(unittest.TestCase):
|
|||||||
cring = FakeRing()
|
cring = FakeRing()
|
||||||
with mock.patch('swift.container.sync.InternalClient'):
|
with mock.patch('swift.container.sync.InternalClient'):
|
||||||
cs = sync.ContainerSync({}, container_ring=cring)
|
cs = sync.ContainerSync({}, container_ring=cring)
|
||||||
cs.container_sync('isa.db')
|
|
||||||
self.assertEqual(cs.container_failures, 1)
|
broker = 'swift.container.backend.ContainerBroker'
|
||||||
|
store = 'swift.container.sync_store.ContainerSyncStore'
|
||||||
|
|
||||||
|
# In this test we call the container_sync instance several
|
||||||
|
# times with a missing db in various combinations.
|
||||||
|
# Since we use the same ContainerSync instance for all tests
|
||||||
|
# its failures counter increases by one with each call.
|
||||||
|
|
||||||
|
# Test the case where get_info returns DatabaseConnectionError
|
||||||
|
# with DB does not exist, and we succeed in deleting it.
|
||||||
|
with mock.patch(broker + '.get_info') as fake_get_info:
|
||||||
|
with mock.patch(store + '.remove_synced_container') as fake_remove:
|
||||||
|
fake_get_info.side_effect = DatabaseConnectionError(
|
||||||
|
'a',
|
||||||
|
"DB doesn't exist")
|
||||||
|
cs.container_sync('isa.db')
|
||||||
|
self.assertEqual(cs.container_failures, 1)
|
||||||
|
self.assertEqual(cs.container_skips, 0)
|
||||||
|
self.assertEqual(1, fake_remove.call_count)
|
||||||
|
self.assertEqual('isa.db', fake_remove.call_args[0][0].db_file)
|
||||||
|
|
||||||
|
# Test the case where get_info returns DatabaseConnectionError
|
||||||
|
# with DB does not exist, and we fail to delete it.
|
||||||
|
with mock.patch(broker + '.get_info') as fake_get_info:
|
||||||
|
with mock.patch(store + '.remove_synced_container') as fake_remove:
|
||||||
|
fake_get_info.side_effect = DatabaseConnectionError(
|
||||||
|
'a',
|
||||||
|
"DB doesn't exist")
|
||||||
|
fake_remove.side_effect = OSError('1')
|
||||||
|
cs.container_sync('isa.db')
|
||||||
|
self.assertEqual(cs.container_failures, 2)
|
||||||
|
self.assertEqual(cs.container_skips, 0)
|
||||||
|
self.assertEqual(1, fake_remove.call_count)
|
||||||
|
self.assertEqual('isa.db', fake_remove.call_args[0][0].db_file)
|
||||||
|
|
||||||
|
# Test the case where get_info returns DatabaseConnectionError
|
||||||
|
# with DB does not exist, and it returns an error != ENOENT.
|
||||||
|
with mock.patch(broker + '.get_info') as fake_get_info:
|
||||||
|
with mock.patch(store + '.remove_synced_container') as fake_remove:
|
||||||
|
fake_get_info.side_effect = DatabaseConnectionError(
|
||||||
|
'a',
|
||||||
|
"DB doesn't exist")
|
||||||
|
fake_remove.side_effect = OSError(errno.EPERM, 'a')
|
||||||
|
cs.container_sync('isa.db')
|
||||||
|
self.assertEqual(cs.container_failures, 3)
|
||||||
|
self.assertEqual(cs.container_skips, 0)
|
||||||
|
self.assertEqual(1, fake_remove.call_count)
|
||||||
|
self.assertEqual('isa.db', fake_remove.call_args[0][0].db_file)
|
||||||
|
|
||||||
|
# Test the case where get_info returns DatabaseConnectionError
|
||||||
|
# error different than DB does not exist
|
||||||
|
with mock.patch(broker + '.get_info') as fake_get_info:
|
||||||
|
with mock.patch(store + '.remove_synced_container') as fake_remove:
|
||||||
|
fake_get_info.side_effect = DatabaseConnectionError('a', 'a')
|
||||||
|
cs.container_sync('isa.db')
|
||||||
|
self.assertEqual(cs.container_failures, 4)
|
||||||
|
self.assertEqual(cs.container_skips, 0)
|
||||||
|
self.assertEqual(0, fake_remove.call_count)
|
||||||
|
|
||||||
def test_container_sync_not_my_db(self):
|
def test_container_sync_not_my_db(self):
|
||||||
# Db could be there due to handoff replication so test that we ignore
|
# Db could be there due to handoff replication so test that we ignore
|
||||||
|
367
test/unit/container/test_sync_store.py
Normal file
367
test/unit/container/test_sync_store.py
Normal file
@ -0,0 +1,367 @@
|
|||||||
|
# Copyright (c) 2010-2016 OpenStack Foundation
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import os
|
||||||
|
import errno
|
||||||
|
import mock
|
||||||
|
import random
|
||||||
|
import logging
|
||||||
|
import unittest
|
||||||
|
import tempfile
|
||||||
|
from shutil import rmtree
|
||||||
|
from test.unit import debug_logger
|
||||||
|
|
||||||
|
from swift.container.backend import DATADIR
|
||||||
|
from swift.container import sync_store
|
||||||
|
|
||||||
|
|
||||||
|
class FakeContainerBroker(object):
|
||||||
|
|
||||||
|
def __init__(self, path):
|
||||||
|
self.db_file = path
|
||||||
|
self.db_dir = os.path.dirname(path)
|
||||||
|
self.metadata = dict()
|
||||||
|
self._is_deleted = False
|
||||||
|
|
||||||
|
def is_deleted(self):
|
||||||
|
return self._is_deleted
|
||||||
|
|
||||||
|
|
||||||
|
class TestContainerSyncStore(unittest.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.logger = debug_logger('test-container-sync-store')
|
||||||
|
self.logger.level = logging.DEBUG
|
||||||
|
self.test_dir_prefix = tempfile.mkdtemp()
|
||||||
|
self.devices_dir = os.path.join(self.test_dir_prefix, 'srv/node/')
|
||||||
|
os.makedirs(self.devices_dir)
|
||||||
|
# Create dummy container dbs
|
||||||
|
self.devices = ['sdax', 'sdb', 'sdc']
|
||||||
|
self.partitions = ['21765', '38965', '13234']
|
||||||
|
self.suffixes = ['312', '435']
|
||||||
|
self.hashes = ['f19ed', '53ef', '0ab5', '9c3a']
|
||||||
|
for device in self.devices:
|
||||||
|
data_dir_path = os.path.join(self.devices_dir,
|
||||||
|
device,
|
||||||
|
DATADIR)
|
||||||
|
os.makedirs(data_dir_path)
|
||||||
|
for part in self.partitions:
|
||||||
|
for suffix in self.suffixes:
|
||||||
|
for hsh in self.hashes:
|
||||||
|
db_dir = os.path.join(data_dir_path,
|
||||||
|
part,
|
||||||
|
suffix,
|
||||||
|
hsh)
|
||||||
|
os.makedirs(db_dir)
|
||||||
|
db_file = os.path.join(db_dir, '%s.db' % hsh)
|
||||||
|
with open(db_file, 'w') as outfile:
|
||||||
|
outfile.write('%s' % db_file)
|
||||||
|
|
||||||
|
def teardown(self):
|
||||||
|
rmtree(self.test_dir_prefix)
|
||||||
|
|
||||||
|
def pick_dbfile(self):
|
||||||
|
hsh = random.choice(self.hashes)
|
||||||
|
return os.path.join(self.devices_dir,
|
||||||
|
random.choice(self.devices),
|
||||||
|
DATADIR,
|
||||||
|
random.choice(self.partitions),
|
||||||
|
random.choice(self.suffixes),
|
||||||
|
hsh,
|
||||||
|
'%s.db' % hsh)
|
||||||
|
|
||||||
|
# Path conversion tests
|
||||||
|
# container path is of the form:
|
||||||
|
# /srv/node/sdb/containers/part/.../*.db
|
||||||
|
# or more generally:
|
||||||
|
# devices/device/DATADIR/part/.../*.db
|
||||||
|
# synced container path is assumed to be of the form:
|
||||||
|
# /srv/node/sdb/sync_containers/part/.../*.db
|
||||||
|
# or more generally:
|
||||||
|
# devices/device/SYNC_DATADIR/part/.../*.db
|
||||||
|
# Indeed the ONLY DIFFERENCE is DATADIR <-> SYNC_DATADIR
|
||||||
|
# Since, however, the strings represented by the constants
|
||||||
|
# DATADIR or SYNC_DATADIR
|
||||||
|
# can appear in the devices or the device part, the conversion
|
||||||
|
# function between the two is a bit more subtle then a mere replacement.
|
||||||
|
|
||||||
|
# This function tests the conversion between a container path
|
||||||
|
# and a synced container path
|
||||||
|
def test_container_to_synced_container_path_conversion(self):
|
||||||
|
# The conversion functions are oblivious to the suffix
|
||||||
|
# so we just pick up a constant one.
|
||||||
|
db_path_suffix = self._db_path_suffix()
|
||||||
|
|
||||||
|
# We build various container path putting in both
|
||||||
|
# DATADIR and SYNC_DATADIR strings in the
|
||||||
|
# device and devices parts.
|
||||||
|
for devices, device in self._container_path_elements_generator():
|
||||||
|
path = os.path.join(devices, device, DATADIR, db_path_suffix)
|
||||||
|
# Call the conversion function
|
||||||
|
sds = sync_store.ContainerSyncStore(devices, self.logger, False)
|
||||||
|
path = sds._container_to_synced_container_path(path)
|
||||||
|
# Validate that ONLY the DATADIR part was replaced with
|
||||||
|
# sync_store.SYNC_DATADIR
|
||||||
|
self._validate_container_path_parts(path, devices, device,
|
||||||
|
sync_store.SYNC_DATADIR,
|
||||||
|
db_path_suffix)
|
||||||
|
|
||||||
|
# This function tests the conversion between a synced container path
|
||||||
|
# and a container path
|
||||||
|
def test_synced_container_to_container_path_conversion(self):
|
||||||
|
# The conversion functions are oblivious to the suffix
|
||||||
|
# so we just pick up a constant one.
|
||||||
|
db_path_suffix = ('133791/625/82a7f5a2c43281b0eab3597e35bb9625/'
|
||||||
|
'82a7f5a2c43281b0eab3597e35bb9625.db')
|
||||||
|
|
||||||
|
# We build various synced container path putting in both
|
||||||
|
# DATADIR and SYNC_DATADIR strings in the
|
||||||
|
# device and devices parts.
|
||||||
|
for devices, device in self._container_path_elements_generator():
|
||||||
|
path = os.path.join(devices, device,
|
||||||
|
sync_store.SYNC_DATADIR, db_path_suffix)
|
||||||
|
# Call the conversion function
|
||||||
|
sds = sync_store.ContainerSyncStore(devices, self.logger, False)
|
||||||
|
path = sds._synced_container_to_container_path(path)
|
||||||
|
# Validate that ONLY the SYNC_DATADIR part was replaced with
|
||||||
|
# DATADIR
|
||||||
|
self._validate_container_path_parts(path, devices, device,
|
||||||
|
DATADIR,
|
||||||
|
db_path_suffix)
|
||||||
|
|
||||||
|
# Constructs a db path suffix of the form:
|
||||||
|
# 133791/625/82...25/82...25.db
|
||||||
|
def _db_path_suffix(self):
|
||||||
|
def random_hexa_string(length):
|
||||||
|
'%0xlength' % random.randrange(16 ** length)
|
||||||
|
|
||||||
|
db = random_hexa_string(32)
|
||||||
|
return '%s/%s/%s/%s.db' % (random_hexa_string(5),
|
||||||
|
random_hexa_string(3),
|
||||||
|
db, db)
|
||||||
|
|
||||||
|
def _container_path_elements_generator(self):
|
||||||
|
# We build various container path elements putting in both
|
||||||
|
# DATADIR and SYNC_DATADIR strings in the
|
||||||
|
# device and devices parts.
|
||||||
|
for devices in ['/srv/node', '/srv/node/',
|
||||||
|
'/srv/node/dev',
|
||||||
|
'/srv/node/%s' % DATADIR,
|
||||||
|
'/srv/node/%s' % sync_store.SYNC_DATADIR]:
|
||||||
|
for device in ['sdf1', 'sdf1/sdf2',
|
||||||
|
'sdf1/%s' % DATADIR,
|
||||||
|
'sdf1/%s' % sync_store.SYNC_DATADIR,
|
||||||
|
'%s/sda' % DATADIR,
|
||||||
|
'%s/sda' % sync_store.SYNC_DATADIR]:
|
||||||
|
yield devices, device
|
||||||
|
|
||||||
|
def _validate_container_path_parts(self, path, devices,
|
||||||
|
device, target, suffix):
|
||||||
|
# Recall that the path is of the form:
|
||||||
|
# devices/device/target/suffix
|
||||||
|
# where each of the sub path elements (e.g. devices)
|
||||||
|
# has a path structure containing path elements separated by '/'
|
||||||
|
# We thus validate by splitting the path according to '/'
|
||||||
|
# traversing all of its path elements making sure that the
|
||||||
|
# first elements are those of devices,
|
||||||
|
# the second are those of device
|
||||||
|
# etc.
|
||||||
|
spath = path.split('/')
|
||||||
|
spath.reverse()
|
||||||
|
self.assertEqual(spath.pop(), '')
|
||||||
|
# Validate path against 'devices'
|
||||||
|
for p in [p for p in devices.split('/') if p]:
|
||||||
|
self.assertEqual(spath.pop(), p)
|
||||||
|
# Validate path against 'device'
|
||||||
|
for p in [p for p in device.split('/') if p]:
|
||||||
|
self.assertEqual(spath.pop(), p)
|
||||||
|
# Validate path against target
|
||||||
|
self.assertEqual(spath.pop(), target)
|
||||||
|
# Validate path against suffix
|
||||||
|
for p in [p for p in suffix.split('/') if p]:
|
||||||
|
self.assertEqual(spath.pop(), p)
|
||||||
|
|
||||||
|
def test_add_synced_container(self):
|
||||||
|
# Add non-existing and existing synced containers
|
||||||
|
sds = sync_store.ContainerSyncStore(self.devices_dir,
|
||||||
|
self.logger,
|
||||||
|
False)
|
||||||
|
cfile = self.pick_dbfile()
|
||||||
|
broker = FakeContainerBroker(cfile)
|
||||||
|
for i in range(2):
|
||||||
|
sds.add_synced_container(broker)
|
||||||
|
scpath = sds._container_to_synced_container_path(cfile)
|
||||||
|
with open(scpath, 'r') as infile:
|
||||||
|
self.assertEqual(infile.read(), cfile)
|
||||||
|
|
||||||
|
iterated_synced_containers = list()
|
||||||
|
for db_path in sds.synced_containers_generator():
|
||||||
|
iterated_synced_containers.append(db_path)
|
||||||
|
|
||||||
|
self.assertEqual(len(iterated_synced_containers), 1)
|
||||||
|
|
||||||
|
def test_remove_synced_container(self):
|
||||||
|
# Add a synced container to remove
|
||||||
|
sds = sync_store.ContainerSyncStore(self.devices_dir,
|
||||||
|
self.logger,
|
||||||
|
False)
|
||||||
|
cfile = self.pick_dbfile()
|
||||||
|
# We keep here the link file so as to validate its deletion later
|
||||||
|
lfile = sds._container_to_synced_container_path(cfile)
|
||||||
|
broker = FakeContainerBroker(cfile)
|
||||||
|
sds.add_synced_container(broker)
|
||||||
|
|
||||||
|
# Remove existing and non-existing synced containers
|
||||||
|
for i in range(2):
|
||||||
|
sds.remove_synced_container(broker)
|
||||||
|
|
||||||
|
iterated_synced_containers = list()
|
||||||
|
for db_path in sds.synced_containers_generator():
|
||||||
|
iterated_synced_containers.append(db_path)
|
||||||
|
|
||||||
|
self.assertEqual(len(iterated_synced_containers), 0)
|
||||||
|
|
||||||
|
# Make sure the whole link path gets deleted
|
||||||
|
# recall that the path has the following suffix:
|
||||||
|
# <hexa string of length 6>/<hexa string of length 3>/
|
||||||
|
# <hexa string of length 32>/<same 32 hexa string>.db
|
||||||
|
# and we expect the .db as well as all path elements
|
||||||
|
# to get deleted
|
||||||
|
self.assertFalse(os.path.exists(lfile))
|
||||||
|
lfile = os.path.dirname(lfile)
|
||||||
|
for i in range(3):
|
||||||
|
self.assertFalse(os.path.exists(os.path.dirname(lfile)))
|
||||||
|
lfile = os.path.dirname(lfile)
|
||||||
|
|
||||||
|
def test_iterate_synced_containers(self):
|
||||||
|
# populate sync container db
|
||||||
|
sds = sync_store.ContainerSyncStore(self.devices_dir,
|
||||||
|
self.logger,
|
||||||
|
False)
|
||||||
|
containers = list()
|
||||||
|
for i in range(10):
|
||||||
|
cfile = self.pick_dbfile()
|
||||||
|
broker = FakeContainerBroker(cfile)
|
||||||
|
sds.add_synced_container(broker)
|
||||||
|
containers.append(cfile)
|
||||||
|
|
||||||
|
iterated_synced_containers = list()
|
||||||
|
for db_path in sds.synced_containers_generator():
|
||||||
|
iterated_synced_containers.append(db_path)
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
set(containers), set(iterated_synced_containers))
|
||||||
|
|
||||||
|
def test_unhandled_exceptions_in_add_remove(self):
|
||||||
|
sds = sync_store.ContainerSyncStore(self.devices_dir,
|
||||||
|
self.logger,
|
||||||
|
False)
|
||||||
|
cfile = self.pick_dbfile()
|
||||||
|
broker = FakeContainerBroker(cfile)
|
||||||
|
|
||||||
|
with mock.patch(
|
||||||
|
'swift.container.sync_store.os.stat',
|
||||||
|
side_effect=OSError(errno.EPERM, 'permission denied')):
|
||||||
|
with self.assertRaises(OSError) as cm:
|
||||||
|
sds.add_synced_container(broker)
|
||||||
|
self.assertEqual(errno.EPERM, cm.exception.errno)
|
||||||
|
|
||||||
|
with mock.patch(
|
||||||
|
'swift.container.sync_store.os.makedirs',
|
||||||
|
side_effect=OSError(errno.EPERM, 'permission denied')):
|
||||||
|
with self.assertRaises(OSError) as cm:
|
||||||
|
sds.add_synced_container(broker)
|
||||||
|
self.assertEqual(errno.EPERM, cm.exception.errno)
|
||||||
|
|
||||||
|
with mock.patch(
|
||||||
|
'swift.container.sync_store.os.symlink',
|
||||||
|
side_effect=OSError(errno.EPERM, 'permission denied')):
|
||||||
|
with self.assertRaises(OSError) as cm:
|
||||||
|
sds.add_synced_container(broker)
|
||||||
|
self.assertEqual(errno.EPERM, cm.exception.errno)
|
||||||
|
|
||||||
|
with mock.patch(
|
||||||
|
'swift.container.sync_store.os.unlink',
|
||||||
|
side_effect=OSError(errno.EPERM, 'permission denied')):
|
||||||
|
with self.assertRaises(OSError) as cm:
|
||||||
|
sds.remove_synced_container(broker)
|
||||||
|
self.assertEqual(errno.EPERM, cm.exception.errno)
|
||||||
|
|
||||||
|
def test_update_sync_store_according_to_metadata_and_deleted(self):
|
||||||
|
# This function tests the update_sync_store 'logics'
|
||||||
|
# with respect to various combinations of the
|
||||||
|
# sync-to and sync-key metadata items and whether
|
||||||
|
# the database is marked for delete.
|
||||||
|
# The table below summarizes the expected result
|
||||||
|
# for the various combinations, e.g.:
|
||||||
|
# If metadata items exist and the database
|
||||||
|
# is not marked for delete then add should be called.
|
||||||
|
|
||||||
|
results_list = [
|
||||||
|
[False, 'a', 'b', 'add'],
|
||||||
|
[False, 'a', '', 'remove'],
|
||||||
|
[False, 'a', None, 'remove'],
|
||||||
|
[False, '', 'b', 'remove'],
|
||||||
|
[False, '', '', 'remove'],
|
||||||
|
[False, '', None, 'remove'],
|
||||||
|
[False, None, 'b', 'remove'],
|
||||||
|
[False, None, '', 'remove'],
|
||||||
|
[False, None, None, 'none'],
|
||||||
|
[True, 'a', 'b', 'remove'],
|
||||||
|
[True, 'a', '', 'remove'],
|
||||||
|
[True, 'a', None, 'remove'],
|
||||||
|
[True, '', 'b', 'remove'],
|
||||||
|
[True, '', '', 'remove'],
|
||||||
|
[True, '', None, 'remove'],
|
||||||
|
[True, None, 'b', 'remove'],
|
||||||
|
[True, None, '', 'remove'],
|
||||||
|
[True, None, None, 'none'],
|
||||||
|
]
|
||||||
|
|
||||||
|
store = 'swift.container.sync_store.ContainerSyncStore'
|
||||||
|
with mock.patch(store + '.add_synced_container') as add_container:
|
||||||
|
with mock.patch(
|
||||||
|
store + '.remove_synced_container') as remove_container:
|
||||||
|
sds = sync_store.ContainerSyncStore(self.devices_dir,
|
||||||
|
self.logger,
|
||||||
|
False)
|
||||||
|
add_calls = 0
|
||||||
|
remove_calls = 0
|
||||||
|
# We now iterate over the list of combinations
|
||||||
|
# Validating that add and removed are called as
|
||||||
|
# expected
|
||||||
|
for deleted, sync_to, sync_key, expected_op in results_list:
|
||||||
|
cfile = self.pick_dbfile()
|
||||||
|
broker = FakeContainerBroker(cfile)
|
||||||
|
broker._is_deleted = deleted
|
||||||
|
if sync_to is not None:
|
||||||
|
broker.metadata['X-Container-Sync-To'] = [
|
||||||
|
sync_to, 1]
|
||||||
|
if sync_key is not None:
|
||||||
|
broker.metadata['X-Container-Sync-Key'] = [
|
||||||
|
sync_key, 1]
|
||||||
|
sds.update_sync_store(broker)
|
||||||
|
if expected_op == 'add':
|
||||||
|
add_calls += 1
|
||||||
|
if expected_op == 'remove':
|
||||||
|
remove_calls += 1
|
||||||
|
self.assertEqual(add_container.call_count,
|
||||||
|
add_calls)
|
||||||
|
self.assertEqual(remove_container.call_count,
|
||||||
|
remove_calls)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
Loading…
Reference in New Issue
Block a user