diff --git a/bin/swift-object-server b/bin/swift-object-server index 74119d61da..d879684b1c 100755 --- a/bin/swift-object-server +++ b/bin/swift-object-server @@ -16,7 +16,10 @@ from swift.common.utils import parse_options from swift.common.wsgi import run_wsgi +from swift.obj import server + if __name__ == '__main__': conf_file, options = parse_options() - run_wsgi(conf_file, 'object-server', default_port=6000, **options) + run_wsgi(conf_file, 'object-server', default_port=6000, + global_conf_callback=server.global_conf_callback, **options) diff --git a/doc/source/deployment_guide.rst b/doc/source/deployment_guide.rst index c10143349a..90470671cc 100644 --- a/doc/source/deployment_guide.rst +++ b/doc/source/deployment_guide.rst @@ -357,43 +357,67 @@ fallocate_reserve 0 You can set fallocate_reserve to the number of when they completely run out of space; you can make the services pretend they're out of space early. +conn_timeout 0.5 Time to wait while attempting to connect to + another backend node. +node_timeout 3 Time to wait while sending each chunk of data + to another backend node. +client_timeout 60 Time to wait while receiving each chunk of + data from a client or another backend node. +network_chunk_size 65536 Size of chunks to read/write over the network +disk_chunk_size 65536 Size of chunks to read/write to disk =================== ========== ============================================= .. _object-server-options: [object-server] -================== ============= =========================================== -Option Default Description ------------------- ------------- ------------------------------------------- -use paste.deploy entry point for the object - server. For most cases, this should be - `egg:swift#object`. -set log_name object-server Label used when logging -set log_facility LOG_LOCAL0 Syslog log facility -set log_level INFO Logging level -set log_requests True Whether or not to log each request -user swift User to run as -node_timeout 3 Request timeout to external services -conn_timeout 0.5 Connection timeout to external services -network_chunk_size 65536 Size of chunks to read/write over the - network -disk_chunk_size 65536 Size of chunks to read/write to disk -max_upload_time 86400 Maximum time allowed to upload an object -slow 0 If > 0, Minimum time in seconds for a PUT - or DELETE request to complete -mb_per_sync 512 On PUT requests, sync file every n MB -keep_cache_size 5242880 Largest object size to keep in buffer cache -keep_cache_private false Allow non-public objects to stay in - kernel's buffer cache -threads_per_disk 0 Size of the per-disk thread pool used for - performing disk I/O. The default of 0 means - to not use a per-disk thread pool. It is - recommended to keep this value small, as - large values can result in high read - latencies due to large queue depths. A good - starting point is 4 threads per disk. -================== ============= =========================================== +============================= ============= ================================= +Option Default Description +----------------------------- ------------- --------------------------------- +use paste.deploy entry point for the + object server. For most cases, + this should be + `egg:swift#object`. +set log_name object-server Label used when logging +set log_facility LOG_LOCAL0 Syslog log facility +set log_level INFO Logging level +set log_requests True Whether or not to log each + request +user swift User to run as +max_upload_time 86400 Maximum time allowed to upload an + object +slow 0 If > 0, Minimum time in seconds + for a PUT or DELETE request to + complete +mb_per_sync 512 On PUT requests, sync file every + n MB +keep_cache_size 5242880 Largest object size to keep in + buffer cache +keep_cache_private false Allow non-public objects to stay + in kernel's buffer cache +threads_per_disk 0 Size of the per-disk thread pool + used for performing disk I/O. The + default of 0 means to not use a + per-disk thread pool. It is + recommended to keep this value + small, as large values can result + in high read latencies due to + large queue depths. A good + starting point is 4 threads per + disk. +replication_concurrency 4 Set to restrict the number of + concurrent incoming REPLICATION + requests; set to 0 for unlimited +replication_failure_threshold 100 The number of subrequest failures + before the + replication_failure_ratio is + checked +replication_failure_ratio 1.0 If the value of failures / + successes of REPLICATION + subrequests exceeds this ratio, + the overall REPLICATION request + will be aborted +============================= ============= ================================= [object-replicator] @@ -427,6 +451,11 @@ handoff_delete auto By default handoff partitions will be replicated to n nodes. The default setting should not be changed, except for extremem situations. +node_timeout DEFAULT or 10 Request timeout to external services. + This uses what's set here, or what's set + in the DEFAULT section, or 10 (though + other sections use 3 as the final + default). ================== ================= ======================================= [object-updater] @@ -439,8 +468,10 @@ log_facility LOG_LOCAL0 Syslog log facility log_level INFO Logging level interval 300 Minimum time for a pass to take concurrency 1 Number of updater workers to spawn -node_timeout 10 Request timeout to external services -conn_timeout 0.5 Connection timeout to external services +node_timeout DEFAULT or 10 Request timeout to external services. This + uses what's set here, or what's set in the + DEFAULT section, or 10 (though other + sections use 3 as the final default). slowdown 0.01 Time in seconds to wait between objects ================== ============== ========================================== diff --git a/doc/source/object.rst b/doc/source/object.rst index 6da9b9a18b..8b802f5631 100644 --- a/doc/source/object.rst +++ b/doc/source/object.rst @@ -24,6 +24,16 @@ Object Replicator :undoc-members: :show-inheritance: +.. automodule:: swift.obj.ssync_sender + :members: + :undoc-members: + :show-inheritance: + +.. automodule:: swift.obj.ssync_receiver + :members: + :undoc-members: + :show-inheritance: + .. _object-updater: Object Updater diff --git a/doc/source/overview_replication.rst b/doc/source/overview_replication.rst index 1b3b227012..cd343299f3 100644 --- a/doc/source/overview_replication.rst +++ b/doc/source/overview_replication.rst @@ -93,6 +93,28 @@ systems, it was designed so that around 2% of the hash space on a normal node will be invalidated per day, which has experimentally given us acceptable replication speeds. +Work continues with a new ssync method where rsync is not used at all and +instead all-Swift code is used to transfer the objects. At first, this ssync +will just strive to emulate the rsync behavior. Once deemed stable it will open +the way for future improvements in replication since we'll be able to easily +add code in the replication path instead of trying to alter the rsync code +base and distributing such modifications. + +One of the first improvements planned is an "index.db" that will replace the +hashes.pkl. This will allow quicker updates to that data as well as more +streamlined queries. Quite likely we'll implement a better scheme than the +current one hashes.pkl uses (hash-trees, that sort of thing). + +Another improvement planned all along the way is separating the local disk +structure from the protocol path structure. This separation will allow ring +resizing at some point, or at least ring-doubling. + +FOR NOW, IT IS NOT RECOMMENDED TO USE SSYNC ON PRODUCTION CLUSTERS. Some of us +will be in a limited fashion to look for any subtle issues, tuning, etc. but +generally ssync is an experimental feature. In its current implementation it is +probably going to be a bit slower than RSync, but if all goes according to plan +it will end up much faster. + ----------------------------- Dedicated replication network diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index 714279c9bc..015b816a10 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -44,6 +44,17 @@ # You can set fallocate_reserve to the number of bytes you'd like fallocate to # reserve, whether there is space for the given file size or not. # fallocate_reserve = 0 +# +# Time to wait while attempting to connect to another backend node. +# conn_timeout = 0.5 +# Time to wait while sending each chunk of data to another backend node. +# node_timeout = 3 +# Time to wait while receiving each chunk of data from a client or another +# backend node. +# client_timeout = 60 +# +# network_chunk_size = 65536 +# disk_chunk_size = 65536 [pipeline:main] pipeline = healthcheck recon object-server @@ -57,10 +68,6 @@ use = egg:swift#object # set log_requests = true # set log_address = /dev/log # -# node_timeout = 3 -# conn_timeout = 0.5 -# network_chunk_size = 65536 -# disk_chunk_size = 65536 # max_upload_time = 86400 # slow = 0 # @@ -81,6 +88,10 @@ use = egg:swift#object # # auto_create_account_prefix = . # +# A value of 0 means "don't use thread pools". A reasonable starting point is +# 4. +# threads_per_disk = 0 +# # Configure parameter for creating specific server # To handle all verbs, including replication verbs, do not specify # "replication_server" (this is the default). To only handle replication, @@ -88,8 +99,20 @@ use = egg:swift#object # verbs, set to "False". Unless you have a separate replication network, you # should not specify any value for "replication_server". # replication_server = false -# A value of 0 means "don't use thread pools". A reasonable starting point is 4. -# threads_per_disk = 0 +# +# Set to restrict the number of concurrent incoming REPLICATION requests +# Set to 0 for unlimited +# Note that REPLICATION is currently an ssync only item +# replication_concurrency = 4 +# +# These next two settings control when the REPLICATION subrequest handler will +# abort an incoming REPLICATION attempt. An abort will occur if there are at +# least threshold number of failures and the value of failures / successes +# exceeds the ratio. The defaults of 100 and 1.0 means that at least 100 +# failures have to occur and there have to be more failures than successes for +# an abort to occur. +# replication_failure_threshold = 100 +# replication_failure_ratio = 1.0 [filter:healthcheck] use = egg:swift#healthcheck @@ -115,6 +138,12 @@ use = egg:swift#recon # concurrency = 1 # stats_interval = 300 # +# The sync method to use; default is rsync but you can use ssync to try the +# EXPERIMENTAL all-swift-code-no-rsync-callouts method. Once verified as stable +# and nearly as efficient (or moreso) than rsync, we plan to deprecate rsync so +# we can move on with more features for replication. +# sync_method = rsync +# # max duration of a partition rsync # rsync_timeout = 900 # @@ -124,7 +153,9 @@ use = egg:swift#recon # passed to rsync for io op timeout # rsync_io_timeout = 30 # -# max duration of an http request +# node_timeout = +# max duration of an http request; this is for REPLICATE finalization calls and +# so should be longer than node_timeout # http_timeout = 60 # # attempts to kill all workers if nothing replicates for lockup_timeout seconds @@ -149,9 +180,7 @@ use = egg:swift#recon # # interval = 300 # concurrency = 1 -# node_timeout = 10 -# conn_timeout = 0.5 -# +# node_timeout = # slowdown will sleep that amount between objects # slowdown = 0.01 # diff --git a/swift/common/exceptions.py b/swift/common/exceptions.py index 3b30b014ec..985088c7fa 100644 --- a/swift/common/exceptions.py +++ b/swift/common/exceptions.py @@ -118,3 +118,7 @@ class ListingIterNotAuthorized(ListingIterError): class SegmentError(SwiftException): pass + + +class ReplicationException(Exception): + pass diff --git a/swift/common/wsgi.py b/swift/common/wsgi.py index d5e07d2b49..193322f2fa 100644 --- a/swift/common/wsgi.py +++ b/swift/common/wsgi.py @@ -196,7 +196,7 @@ class RestrictedGreenPool(GreenPool): self.waitall() -def run_server(conf, logger, sock): +def run_server(conf, logger, sock, global_conf=None): # Ensure TZ environment variable exists to avoid stat('/etc/localtime') on # some platforms. This locks in reported times to the timezone in which # the server first starts running in locations that periodically change @@ -216,11 +216,13 @@ def run_server(conf, logger, sock): eventlet_debug = config_true_value(conf.get('eventlet_debug', 'no')) eventlet.debug.hub_exceptions(eventlet_debug) # utils.LogAdapter stashes name in server; fallback on unadapted loggers - if hasattr(logger, 'server'): - log_name = logger.server - else: - log_name = logger.name - app = loadapp(conf['__file__'], global_conf={'log_name': log_name}) + if not global_conf: + if hasattr(logger, 'server'): + log_name = logger.server + else: + log_name = logger.name + global_conf = {'log_name': log_name} + app = loadapp(conf['__file__'], global_conf=global_conf) max_clients = int(conf.get('max_clients', '1024')) pool = RestrictedGreenPool(size=max_clients) try: @@ -252,8 +254,11 @@ def run_wsgi(conf_path, app_section, *args, **kwargs): # remaining tasks should not require elevated privileges drop_privileges(conf.get('user', 'swift')) - # Ensure the application can be loaded before proceeding. - loadapp(conf_path, global_conf={'log_name': log_name}) + # Ensure the configuration and application can be loaded before proceeding. + global_conf = {'log_name': log_name} + if 'global_conf_callback' in kwargs: + kwargs['global_conf_callback'](conf, global_conf) + loadapp(conf_path, global_conf=global_conf) # set utils.FALLOCATE_RESERVE if desired reserve = int(conf.get('fallocate_reserve', 0)) @@ -266,7 +271,7 @@ def run_wsgi(conf_path, app_section, *args, **kwargs): # Useful for profiling [no forks]. if worker_count == 0: - run_server(conf, logger, sock) + run_server(conf, logger, sock, global_conf=global_conf) return def kill_children(*args): diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index 351e63ad34..d19446075c 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -53,7 +53,7 @@ from swift.common.constraints import check_mount from swift.common.utils import mkdirs, normalize_timestamp, \ storage_directory, hash_path, renamer, fallocate, fsync, \ fdatasync, drop_buffer_cache, ThreadPool, lock_path, write_pickle, \ - config_true_value, listdir + config_true_value, listdir, split_path from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \ DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \ DiskFileDeleted, DiskFileError, DiskFileNotOpen, PathNotDir @@ -381,6 +381,7 @@ class DiskFileManager(object): self.keep_cache_size = int(conf.get('keep_cache_size', 5242880)) self.bytes_per_sync = int(conf.get('mb_per_sync', 512)) * 1024 * 1024 self.mount_check = config_true_value(conf.get('mount_check', 'true')) + self.reclaim_age = int(conf.get('reclaim_age', ONE_WEEK)) threads_per_disk = int(conf.get('threads_per_disk', '0')) self.threadpools = defaultdict( lambda: ThreadPool(nthreads=threads_per_disk)) @@ -443,6 +444,47 @@ class DiskFileManager(object): self, audit_location.path, dev_path, audit_location.partition) + def get_diskfile_from_hash(self, device, partition, object_hash, **kwargs): + """ + Returns a DiskFile instance for an object at the given + object_hash. Just in case someone thinks of refactoring, be + sure DiskFileDeleted is *not* raised, but the DiskFile + instance representing the tombstoned object is returned + instead. + + :raises DiskFileNotExist: if the object does not exist + """ + dev_path = self.get_dev_path(device) + if not dev_path: + raise DiskFileDeviceUnavailable() + object_path = os.path.join( + dev_path, DATADIR, partition, object_hash[-3:], object_hash) + try: + filenames = hash_cleanup_listdir(object_path, self.reclaim_age) + except OSError as err: + if err.errno == errno.ENOTDIR: + quar_path = quarantine_renamer(dev_path, object_path) + logging.exception( + _('Quarantined %s to %s because it is not a ' + 'directory') % (object_path, quar_path)) + raise DiskFileNotExist() + if err.errno != errno.ENOENT: + raise + raise DiskFileNotExist() + if not filenames: + raise DiskFileNotExist() + try: + metadata = read_metadata(os.path.join(object_path, filenames[-1])) + except EOFError: + raise DiskFileNotExist() + try: + account, container, obj = split_path( + metadata.get('name', ''), 3, 3, True) + except ValueError: + raise DiskFileNotExist() + return DiskFile(self, dev_path, self.threadpools[device], + partition, account, container, obj, **kwargs) + def get_hashes(self, device, partition, suffix): dev_path = self.get_dev_path(device) if not dev_path: @@ -455,6 +497,62 @@ class DiskFileManager(object): get_hashes, partition_path, recalculate=suffixes) return hashes + def _listdir(self, path): + try: + return os.listdir(path) + except OSError as err: + if err.errno != errno.ENOENT: + self.logger.error( + 'ERROR: Skipping %r due to error with listdir attempt: %s', + path, err) + return [] + + def yield_suffixes(self, device, partition): + """ + Yields tuples of (full_path, suffix_only) for suffixes stored + on the given device and partition. + """ + dev_path = self.get_dev_path(device) + if not dev_path: + raise DiskFileDeviceUnavailable() + partition_path = os.path.join(dev_path, DATADIR, partition) + for suffix in self._listdir(partition_path): + if len(suffix) != 3: + continue + try: + int(suffix, 16) + except ValueError: + continue + yield (os.path.join(partition_path, suffix), suffix) + + def yield_hashes(self, device, partition, suffixes=None): + """ + Yields tuples of (full_path, hash_only, timestamp) for object + information stored for the given device, partition, and + (optionally) suffixes. If suffixes is None, all stored + suffixes will be searched for object hashes. Note that if + suffixes is not None but empty, such as [], then nothing will + be yielded. + """ + dev_path = self.get_dev_path(device) + if not dev_path: + raise DiskFileDeviceUnavailable() + if suffixes is None: + suffixes = self.yield_suffixes(device, partition) + else: + partition_path = os.path.join(dev_path, DATADIR, partition) + suffixes = ( + (os.path.join(partition_path, suffix), suffix) + for suffix in suffixes) + for suffix_path, suffix in suffixes: + for object_hash in self._listdir(suffix_path): + object_path = os.path.join(suffix_path, object_hash) + for name in hash_cleanup_listdir( + object_path, self.reclaim_age): + ts, ext = name.rsplit('.', 1) + yield (object_path, object_hash, ts) + break + class DiskFileWriter(object): """ @@ -775,15 +873,25 @@ class DiskFile(object): self._bytes_per_sync = mgr.bytes_per_sync if account and container and obj: self._name = '/' + '/'.join((account, container, obj)) + self._account = account + self._container = container + self._obj = obj + name_hash = hash_path(account, container, obj) + self._datadir = join( + device_path, storage_directory(DATADIR, partition, name_hash)) else: # gets populated when we read the metadata self._name = None + self._account = None + self._container = None + self._obj = None + self._datadir = None self._tmpdir = join(device_path, 'tmp') self._metadata = None self._data_file = None self._fp = None self._quarantined_dir = None - + self._content_length = None if _datadir: self._datadir = _datadir else: @@ -791,6 +899,30 @@ class DiskFile(object): self._datadir = join( device_path, storage_directory(DATADIR, partition, name_hash)) + @property + def account(self): + return self._account + + @property + def container(self): + return self._container + + @property + def obj(self): + return self._obj + + @property + def content_length(self): + if self._metadata is None: + raise DiskFileNotOpen() + return self._content_length + + @property + def timestamp(self): + if self._metadata is None: + raise DiskFileNotOpen() + return self._metadata.get('X-Timestamp') + @classmethod def from_hash_dir(cls, mgr, hash_dir_path, device_path, partition): return cls(mgr, device_path, None, partition, _datadir=hash_dir_path) @@ -1036,6 +1168,7 @@ class DiskFile(object): data_file, "metadata content-length %s does" " not match actual object size %s" % ( metadata_size, statbuf.st_size)) + self._content_length = obj_size return obj_size def _failsafe_read_metadata(self, source, quarantine_filename=None): diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index 066ca7b521..01b7a29bb0 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -35,7 +35,8 @@ from swift.common.utils import whataremyips, unlink_older_than, \ from swift.common.bufferedhttp import http_connect from swift.common.daemon import Daemon from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE -from swift.obj.diskfile import get_hashes +from swift.obj import ssync_sender +from swift.obj.diskfile import DiskFileManager, get_hashes hubs.use_hub(get_hub()) @@ -78,6 +79,11 @@ class ObjectReplicator(Daemon): self.recon_cache_path = conf.get('recon_cache_path', '/var/cache/swift') self.rcache = os.path.join(self.recon_cache_path, "object.recon") + self.conn_timeout = float(conf.get('conn_timeout', 0.5)) + self.node_timeout = float(conf.get('node_timeout', 10)) + self.sync_method = getattr(self, conf.get('sync_method') or 'rsync') + self.network_chunk_size = int(conf.get('network_chunk_size', 65536)) + self.disk_chunk_size = int(conf.get('disk_chunk_size', 65536)) self.headers = { 'Content-Length': '0', 'user-agent': 'obj-replicator %s' % os.getpid()} @@ -87,6 +93,20 @@ class ObjectReplicator(Daemon): False)) self.handoff_delete = config_auto_int_value( conf.get('handoff_delete', 'auto'), 0) + self._diskfile_mgr = DiskFileManager(conf, self.logger) + + def sync(self, node, job, suffixes): # Just exists for doc anchor point + """ + Synchronize local suffix directories from a partition with a remote + node. + + :param node: the "dev" entry for the remote node to sync with + :param job: information about the partition being synced + :param suffixes: a list of suffixes which need to be pushed + + :returns: boolean indicating success or failure + """ + return self.sync_method(node, job, suffixes) def _rsync(self, args): """ @@ -135,14 +155,8 @@ class ObjectReplicator(Daemon): def rsync(self, node, job, suffixes): """ - Synchronize local suffix directories from a partition with a remote - node. - - :param node: the "dev" entry for the remote node to sync with - :param job: information about the partition being synced - :param suffixes: a list of suffixes which need to be pushed - - :returns: boolean indicating success or failure + Uses rsync to implement the sync method. This was the first + sync method in Swift. """ if not os.path.exists(job['path']): return False @@ -175,6 +189,9 @@ class ObjectReplicator(Daemon): 'objects', job['partition'])) return self._rsync(args) == 0 + def ssync(self, node, job, suffixes): + return ssync_sender.Sender(self, node, job, suffixes)() + def check_ring(self): """ Check to see if the ring has been updated @@ -206,7 +223,7 @@ class ObjectReplicator(Daemon): suffixes = tpool.execute(tpool_get_suffixes, job['path']) if suffixes: for node in job['nodes']: - success = self.rsync(node, job, suffixes) + success = self.sync(node, job, suffixes) if success: with Timeout(self.http_timeout): conn = http_connect( @@ -290,7 +307,7 @@ class ObjectReplicator(Daemon): suffixes = [suffix for suffix in local_hash if local_hash[suffix] != remote_hash.get(suffix, -1)] - self.rsync(node, job, suffixes) + self.sync(node, job, suffixes) with Timeout(self.http_timeout): conn = http_connect( node['replication_ip'], node['replication_port'], @@ -380,7 +397,7 @@ class ObjectReplicator(Daemon): def collect_jobs(self): """ Returns a sorted list of jobs (dictionaries) that specify the - partitions, nodes, etc to be rsynced. + partitions, nodes, etc to be synced. """ jobs = [] ips = whataremyips() diff --git a/swift/obj/server.py b/swift/obj/server.py index 6756fef6be..293909704c 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -18,6 +18,7 @@ from __future__ import with_statement import cPickle as pickle import os +import multiprocessing import time import traceback import socket @@ -35,6 +36,7 @@ from swift.common.constraints import check_object_creation, \ from swift.common.exceptions import ConnectionTimeout, DiskFileQuarantined, \ DiskFileNotExist, DiskFileCollision, DiskFileNoSpace, DiskFileDeleted, \ DiskFileDeviceUnavailable +from swift.obj import ssync_receiver from swift.common.http import is_success from swift.common.request_helpers import split_and_validate_path from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \ @@ -59,6 +61,8 @@ class ObjectController(object): self.logger = get_logger(conf, log_route='object-server') self.node_timeout = int(conf.get('node_timeout', 3)) self.conn_timeout = float(conf.get('conn_timeout', 0.5)) + self.client_timeout = int(conf.get('client_timeout', 60)) + self.disk_chunk_size = int(conf.get('disk_chunk_size', 65536)) self.network_chunk_size = int(conf.get('network_chunk_size', 65536)) self.log_requests = config_true_value(conf.get('log_requests', 'true')) self.max_upload_time = int(conf.get('max_upload_time', 86400)) @@ -122,6 +126,17 @@ class ObjectController(object): # Common on-disk hierarchy shared across account, container and object # servers. self._diskfile_mgr = DiskFileManager(conf, self.logger) + # This is populated by global_conf_callback way below as the semaphore + # is shared by all workers. + if 'replication_semaphore' in conf: + # The value was put in a list so it could get past paste + self.replication_semaphore = conf['replication_semaphore'][0] + else: + self.replication_semaphore = None + self.replication_failure_threshold = int( + conf.get('replication_failure_threshold') or 100) + self.replication_failure_ratio = float( + conf.get('replication_failure_ratio') or 1.0) def get_diskfile(self, device, partition, account, container, obj, **kwargs): @@ -414,7 +429,9 @@ class ObjectController(object): metadata.update(val for val in request.headers.iteritems() if val[0].lower().startswith('x-object-meta-') and len(val[0]) > 14) - for header_key in self.allowed_headers: + for header_key in ( + request.headers.get('X-Backend-Replication-Headers') or + self.allowed_headers): if header_key in request.headers: header_caps = header_key.title() metadata[header_caps] = request.headers[header_key] @@ -619,6 +636,12 @@ class ObjectController(object): resp = Response(body=pickle.dumps(hashes)) return resp + @public + @replication + @timing_stats(sample_rate=0.1) + def REPLICATION(self, request): + return Response(app_iter=ssync_receiver.Receiver(self, request)()) + def __call__(self, env, start_response): """WSGI Application entry point for the Swift Object Server.""" start_time = time.time() @@ -661,7 +684,8 @@ class ObjectController(object): req.headers.get('x-trans-id', '-'), req.user_agent or '-', trans_time) - if req.method == 'REPLICATE': + if req.method in ('REPLICATE', 'REPLICATION') or \ + 'X-Backend-Replication' in req.headers: self.logger.debug(log_line) else: self.logger.info(log_line) @@ -672,6 +696,30 @@ class ObjectController(object): return res(env, start_response) +def global_conf_callback(preloaded_app_conf, global_conf): + """ + Callback for swift.common.wsgi.run_wsgi during the global_conf + creation so that we can add our replication_semaphore, used to + limit the number of concurrent REPLICATION_REQUESTS across all + workers. + + :param preloaded_app_conf: The preloaded conf for the WSGI app. + This conf instance will go away, so + just read from it, don't write. + :param global_conf: The global conf that will eventually be + passed to the app_factory function later. + This conf is created before the worker + subprocesses are forked, so can be useful to + set up semaphores, shared memory, etc. + """ + replication_concurrency = int( + preloaded_app_conf.get('replication_concurrency') or 4) + if replication_concurrency: + # Have to put the value in a list so it can get past paste + global_conf['replication_semaphore'] = [ + multiprocessing.BoundedSemaphore(replication_concurrency)] + + def app_factory(global_conf, **local_conf): """paste.deploy app factory for creating WSGI object server apps""" conf = global_conf.copy() diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py new file mode 100644 index 0000000000..88b555188a --- /dev/null +++ b/swift/obj/ssync_receiver.py @@ -0,0 +1,379 @@ +# Copyright (c) 2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import with_statement + +import urllib + +import eventlet +import eventlet.wsgi +import eventlet.greenio + +from swift.common import constraints +from swift.common import exceptions +from swift.common import http +from swift.common import swob +from swift.common import utils + + +class Receiver(object): + """ + Handles incoming REPLICATION requests to the object server. + + These requests come from the object-replicator daemon that uses + :py:mod:`.ssync_sender`. + + The number of concurrent REPLICATION requests is restricted by + use of a replication_semaphore and can be configured with the + object-server.conf [object-server] replication_concurrency + setting. + + A REPLICATION request is really just an HTTP conduit for + sender/receiver replication communication. The overall + REPLICATION request should always succeed, but it will contain + multiple requests within its request and response bodies. This + "hack" is done so that replication concurrency can be managed. + + The general process inside a REPLICATION request is: + + 1. Initialize the request: Basic request validation, mount check, + acquire semaphore lock, etc.. + + 2. Missing check: Sender sends the hashes and timestamps of + the object information it can send, receiver sends back + the hashes it wants (doesn't have or has an older + timestamp). + + 3. Updates: Sender sends the object information requested. + + 4. Close down: Release semaphore lock, etc. + """ + + def __init__(self, app, request): + self.app = app + self.request = request + self.device = None + self.partition = None + self.fp = None + self.disconnect = False + + def __call__(self): + """ + Processes a REPLICATION request. + + Acquires a semaphore lock and then proceeds through the steps + of the REPLICATION process. + """ + # The general theme for functions __call__ calls is that they should + # raise exceptions.MessageTimeout for client timeouts (logged locally), + # swob.HTTPException classes for exceptions to return to the caller but + # not log locally (unmounted, for example), and any other Exceptions + # will be logged with a full stack trace. + # This is because the client is never just some random user but + # is instead also our code and we definitely want to know if our code + # is broken or doing something unexpected. + try: + # Double try blocks in case our main error handlers fail. + try: + # intialize_request is for preamble items that can be done + # outside a replication semaphore lock. + for data in self.initialize_request(): + yield data + # If semaphore is in use, try to acquire it, non-blocking, and + # return a 503 if it fails. + if self.app.replication_semaphore: + if not self.app.replication_semaphore.acquire(False): + raise swob.HTTPServiceUnavailable() + try: + for data in self.missing_check(): + yield data + for data in self.updates(): + yield data + finally: + if self.app.replication_semaphore: + self.app.replication_semaphore.release() + except exceptions.MessageTimeout as err: + self.app.logger.error( + '%s/%s/%s TIMEOUT in replication.Receiver: %s' % ( + self.request.remote_addr, self.device, self.partition, + err)) + yield ':ERROR: %d %r\n' % (408, str(err)) + except swob.HTTPException as err: + body = ''.join(err({}, lambda *args: None)) + yield ':ERROR: %d %r\n' % (err.status_int, body) + except Exception as err: + self.app.logger.exception( + '%s/%s/%s EXCEPTION in replication.Receiver' % + (self.request.remote_addr, self.device, self.partition)) + yield ':ERROR: %d %r\n' % (0, str(err)) + except Exception: + self.app.logger.exception('EXCEPTION in replication.Receiver') + if self.disconnect: + # This makes the socket close early so the remote side doesn't have + # to send its whole request while the lower Eventlet-level just + # reads it and throws it away. Instead, the connection is dropped + # and the remote side will get a broken-pipe exception. + try: + socket = self.request.environ['wsgi.input'].get_socket() + eventlet.greenio.shutdown_safe(socket) + socket.close() + except Exception: + pass # We're okay with the above failing. + + def _ensure_flush(self): + """ + Sends a blank line sufficient to flush buffers. + + This is to ensure Eventlet versions that don't support + eventlet.minimum_write_chunk_size will send any previous data + buffered. + + If https://bitbucket.org/eventlet/eventlet/pull-request/37 + ever gets released in an Eventlet version, we should make + this yield only for versions older than that. + """ + yield ' ' * eventlet.wsgi.MINIMUM_CHUNK_SIZE + '\r\n' + + def initialize_request(self): + """ + Basic validation of request and mount check. + + This function will be called before attempting to acquire a + replication semaphore lock, so contains only quick checks. + """ + # The following is the setting we talk about above in _ensure_flush. + self.request.environ['eventlet.minimum_write_chunk_size'] = 0 + self.device, self.partition = utils.split_path( + urllib.unquote(self.request.path), 2, 2, False) + utils.validate_device_partition(self.device, self.partition) + if self.app._diskfile_mgr.mount_check and \ + not constraints.check_mount( + self.app._diskfile_mgr.devices, self.device): + raise swob.HTTPInsufficientStorage(drive=self.device) + self.fp = self.request.environ['wsgi.input'] + for data in self._ensure_flush(): + yield data + + def missing_check(self): + """ + Handles the receiver-side of the MISSING_CHECK step of a + REPLICATION request. + + Receives a list of hashes and timestamps of object + information the sender can provide and responds with a list + of hashes desired, either because they're missing or have an + older timestamp locally. + + The process is generally: + + 1. Sender sends `:MISSING_CHECK: START` and begins + sending `hash timestamp` lines. + + 2. Receiver gets `:MISSING_CHECK: START` and begins + reading the `hash timestamp` lines, collecting the + hashes of those it desires. + + 3. Sender sends `:MISSING_CHECK: END`. + + 4. Receiver gets `:MISSING_CHECK: END`, responds with + `:MISSING_CHECK: START`, followed by the list of + hashes it collected as being wanted (one per line), + `:MISSING_CHECK: END`, and flushes any buffers. + + 5. Sender gets `:MISSING_CHECK: START` and reads the list + of hashes desired by the receiver until reading + `:MISSING_CHECK: END`. + + The collection and then response is so the sender doesn't + have to read while it writes to ensure network buffers don't + fill up and block everything. + """ + with exceptions.MessageTimeout( + self.app.client_timeout, 'missing_check start'): + line = self.fp.readline(self.app.network_chunk_size) + if line.strip() != ':MISSING_CHECK: START': + raise Exception( + 'Looking for :MISSING_CHECK: START got %r' % line[:1024]) + object_hashes = [] + while True: + with exceptions.MessageTimeout( + self.app.client_timeout, 'missing_check line'): + line = self.fp.readline(self.app.network_chunk_size) + if not line or line.strip() == ':MISSING_CHECK: END': + break + object_hash, timestamp = [urllib.unquote(v) for v in line.split()] + want = False + try: + df = self.app._diskfile_mgr.get_diskfile_from_hash( + self.device, self.partition, object_hash) + except exceptions.DiskFileNotExist: + want = True + else: + try: + df.open() + except exceptions.DiskFileDeleted as err: + want = err.timestamp < timestamp + except exceptions.DiskFileError, err: + want = True + else: + want = df.timestamp < timestamp + if want: + object_hashes.append(object_hash) + yield ':MISSING_CHECK: START\r\n' + yield '\r\n'.join(object_hashes) + yield '\r\n' + yield ':MISSING_CHECK: END\r\n' + for data in self._ensure_flush(): + yield data + + def updates(self): + """ + Handles the UPDATES step of a REPLICATION request. + + Receives a set of PUT and DELETE subrequests that will be + routed to the object server itself for processing. These + contain the information requested by the MISSING_CHECK step. + + The PUT and DELETE subrequests are formatted pretty much + exactly like regular HTTP requests, excepting the HTTP + version on the first request line. + + The process is generally: + + 1. Sender sends `:UPDATES: START` and begins sending the + PUT and DELETE subrequests. + + 2. Receiver gets `:UPDATES: START` and begins routing the + subrequests to the object server. + + 3. Sender sends `:UPDATES: END`. + + 4. Receiver gets `:UPDATES: END` and sends `:UPDATES: + START` and `:UPDATES: END` (assuming no errors). + + 5. Sender gets `:UPDATES: START` and `:UPDATES: END`. + + If too many subrequests fail, as configured by + replication_failure_threshold and replication_failure_ratio, + the receiver will hang up the request early so as to not + waste any more time. + + At step 4, the receiver will send back an error if there were + any failures (that didn't cause a hangup due to the above + thresholds) so the sender knows the whole was not entirely a + success. This is so the sender knows if it can remove an out + of place partition, for example. + """ + with exceptions.MessageTimeout( + self.app.client_timeout, 'updates start'): + line = self.fp.readline(self.app.network_chunk_size) + if line.strip() != ':UPDATES: START': + raise Exception('Looking for :UPDATES: START got %r' % line[:1024]) + successes = 0 + failures = 0 + # We default to dropping the connection in case there is any exception + # raised during processing because otherwise the sender could send for + # quite some time before realizing it was all in vain. + self.disconnect = True + while True: + with exceptions.MessageTimeout( + self.app.client_timeout, 'updates line'): + line = self.fp.readline(self.app.network_chunk_size) + if not line or line.strip() == ':UPDATES: END': + break + # Read first line METHOD PATH of subrequest. + method, path = line.strip().split(' ', 1) + subreq = swob.Request.blank( + '/%s/%s%s' % (self.device, self.partition, path), + environ={'REQUEST_METHOD': method}) + # Read header lines. + content_length = None + replication_headers = [] + while True: + with exceptions.MessageTimeout(self.app.client_timeout): + line = self.fp.readline(self.app.network_chunk_size) + if not line: + raise Exception( + 'Got no headers for %s %s' % (method, path)) + line = line.strip() + if not line: + break + header, value = line.split(':', 1) + header = header.strip().lower() + value = value.strip() + subreq.headers[header] = value + replication_headers.append(header) + if header == 'content-length': + content_length = int(value) + # Establish subrequest body, if needed. + if method == 'DELETE': + if content_length not in (None, 0): + raise Exception( + 'DELETE subrequest with content-length %s' % path) + elif method == 'PUT': + if content_length is None: + raise Exception( + 'No content-length sent for %s %s' % (method, path)) + + def subreq_iter(): + left = content_length + while left > 0: + with exceptions.MessageTimeout( + self.app.client_timeout, + 'updates content'): + chunk = self.fp.read( + min(left, self.app.network_chunk_size)) + if not chunk: + raise Exception( + 'Early termination for %s %s' % (method, path)) + left -= len(chunk) + yield chunk + subreq.environ['wsgi.input'] = utils.FileLikeIter( + subreq_iter()) + else: + raise Exception('Invalid subrequest method %s' % method) + subreq.headers['X-Backend-Replication'] = 'True' + if replication_headers: + subreq.headers['X-Backend-Replication-Headers'] = \ + ' '.join(replication_headers) + # Route subrequest and translate response. + resp = subreq.get_response(self.app) + if http.is_success(resp.status_int) or \ + resp.status_int == http.HTTP_NOT_FOUND: + successes += 1 + else: + failures += 1 + if failures >= self.app.replication_failure_threshold and ( + not successes or + float(failures) / successes > + self.app.replication_failure_ratio): + raise Exception( + 'Too many %d failures to %d successes' % + (failures, successes)) + # The subreq may have failed, but we want to read the rest of the + # body from the remote side so we can continue on with the next + # subreq. + for junk in subreq.environ['wsgi.input']: + pass + if failures: + raise swob.HTTPInternalServerError( + 'ERROR: With :UPDATES: %d failures to %d successes' % + (failures, successes)) + # We didn't raise an exception, so end the request normally. + self.disconnect = False + yield ':UPDATES: START\r\n' + yield ':UPDATES: END\r\n' + for data in self._ensure_flush(): + yield data diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py new file mode 100644 index 0000000000..b90c039ff0 --- /dev/null +++ b/swift/obj/ssync_sender.py @@ -0,0 +1,309 @@ +# Copyright (c) 2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import with_statement + +import urllib + +import eventlet +import eventlet.wsgi +import eventlet.greenio + +from swift.common import bufferedhttp +from swift.common import exceptions +from swift.common import http + + +class Sender(object): + """ + Sends REPLICATION requests to the object server. + + These requests are eventually handled by + :py:mod:`.ssync_receiver` and full documentation about the + process is there. + """ + + def __init__(self, daemon, node, job, suffixes): + self.daemon = daemon + self.node = node + self.job = job + self.suffixes = suffixes + self.connection = None + self.response = None + self.response_buffer = '' + self.response_chunk_left = 0 + self.send_list = None + self.failures = 0 + + def __call__(self): + if not self.suffixes: + return True + try: + # Double try blocks in case our main error handler fails. + try: + # The general theme for these functions is that they should + # raise exceptions.MessageTimeout for client timeouts and + # exceptions.ReplicationException for common issues that will + # abort the replication attempt and log a simple error. All + # other exceptions will be logged with a full stack trace. + self.connect() + self.missing_check() + self.updates() + self.disconnect() + return self.failures == 0 + except (exceptions.MessageTimeout, + exceptions.ReplicationException) as err: + self.daemon.logger.error( + '%s:%s/%s/%s %s', self.node.get('ip'), + self.node.get('port'), self.node.get('device'), + self.job.get('partition'), err) + except Exception: + # We don't want any exceptions to escape our code and possibly + # mess up the original replicator code that called us since it + # was originally written to shell out to rsync which would do + # no such thing. + self.daemon.logger.exception( + '%s:%s/%s/%s EXCEPTION in replication.Sender', + self.node.get('ip'), self.node.get('port'), + self.node.get('device'), self.job.get('partition')) + except Exception: + # We don't want any exceptions to escape our code and possibly + # mess up the original replicator code that called us since it + # was originally written to shell out to rsync which would do + # no such thing. + # This particular exception handler does the minimal amount as it + # would only get called if the above except Exception handler + # failed (bad node or job data). + self.daemon.logger.exception('EXCEPTION in replication.Sender') + return False + + def connect(self): + """ + Establishes a connection and starts a REPLICATION request + with the object server. + """ + with exceptions.MessageTimeout( + self.daemon.conn_timeout, 'connect send'): + self.connection = bufferedhttp.BufferedHTTPConnection( + '%s:%s' % (self.node['ip'], self.node['port'])) + self.connection.putrequest('REPLICATION', '/%s/%s' % ( + self.node['device'], self.job['partition'])) + self.connection.putheader('Transfer-Encoding', 'chunked') + self.connection.endheaders() + with exceptions.MessageTimeout( + self.daemon.node_timeout, 'connect receive'): + self.response = self.connection.getresponse() + if self.response.status != http.HTTP_OK: + raise exceptions.ReplicationException( + 'Expected status %s; got %s' % + (http.HTTP_OK, self.response.status)) + + def readline(self): + """ + Reads a line from the REPLICATION response body. + + httplib has no readline and will block on read(x) until x is + read, so we have to do the work ourselves. A bit of this is + taken from Python's httplib itself. + """ + data = self.response_buffer + self.response_buffer = '' + while '\n' not in data and len(data) < self.daemon.network_chunk_size: + if self.response_chunk_left == -1: # EOF-already indicator + break + if self.response_chunk_left == 0: + line = self.response.fp.readline() + i = line.find(';') + if i >= 0: + line = line[:i] # strip chunk-extensions + try: + self.response_chunk_left = int(line.strip(), 16) + except ValueError: + # close the connection as protocol synchronisation is + # probably lost + self.response.close() + raise exceptions.ReplicationException('Early disconnect') + if self.response_chunk_left == 0: + self.response_chunk_left = -1 + break + chunk = self.response.fp.read(min( + self.response_chunk_left, + self.daemon.network_chunk_size - len(data))) + if not chunk: + # close the connection as protocol synchronisation is + # probably lost + self.response.close() + raise exceptions.ReplicationException('Early disconnect') + self.response_chunk_left -= len(chunk) + if self.response_chunk_left == 0: + self.response.fp.read(2) # discard the trailing \r\n + data += chunk + if '\n' in data: + data, self.response_buffer = data.split('\n', 1) + data += '\n' + return data + + def missing_check(self): + """ + Handles the sender-side of the MISSING_CHECK step of a + REPLICATION request. + + Full documentation of this can be found at + :py:meth:`.Receiver.missing_check`. + """ + # First, send our list. + with exceptions.MessageTimeout( + self.daemon.node_timeout, 'missing_check start'): + msg = ':MISSING_CHECK: START\r\n' + self.connection.send('%x\r\n%s\r\n' % (len(msg), msg)) + for path, object_hash, timestamp in \ + self.daemon._diskfile_mgr.yield_hashes( + self.job['device'], self.job['partition'], self.suffixes): + with exceptions.MessageTimeout( + self.daemon.node_timeout, + 'missing_check send line'): + msg = '%s %s\r\n' % ( + urllib.quote(object_hash), + urllib.quote(timestamp)) + self.connection.send('%x\r\n%s\r\n' % (len(msg), msg)) + with exceptions.MessageTimeout( + self.daemon.node_timeout, 'missing_check end'): + msg = ':MISSING_CHECK: END\r\n' + self.connection.send('%x\r\n%s\r\n' % (len(msg), msg)) + # Now, retrieve the list of what they want. + while True: + with exceptions.MessageTimeout( + self.daemon.http_timeout, 'missing_check start wait'): + line = self.readline() + if not line: + raise exceptions.ReplicationException('Early disconnect') + line = line.strip() + if line == ':MISSING_CHECK: START': + break + elif line: + raise exceptions.ReplicationException( + 'Unexpected response: %r' % line[:1024]) + self.send_list = [] + while True: + with exceptions.MessageTimeout( + self.daemon.http_timeout, 'missing_check line wait'): + line = self.readline() + if not line: + raise exceptions.ReplicationException('Early disconnect') + line = line.strip() + if line == ':MISSING_CHECK: END': + break + if line: + self.send_list.append(line) + + def updates(self): + """ + Handles the sender-side of the UPDATES step of a REPLICATION + request. + + Full documentation of this can be found at + :py:meth:`.Receiver.updates`. + """ + # First, send all our subrequests based on the send_list. + with exceptions.MessageTimeout( + self.daemon.node_timeout, 'updates start'): + msg = ':UPDATES: START\r\n' + self.connection.send('%x\r\n%s\r\n' % (len(msg), msg)) + for object_hash in self.send_list: + try: + df = self.daemon._diskfile_mgr.get_diskfile_from_hash( + self.job['device'], self.job['partition'], object_hash) + except exceptions.DiskFileNotExist: + continue + url_path = urllib.quote( + '/%s/%s/%s' % (df.account, df.container, df.obj)) + try: + df.open() + except exceptions.DiskFileDeleted as err: + self.send_delete(url_path, err.timestamp) + except exceptions.DiskFileError: + pass + else: + self.send_put(url_path, df) + with exceptions.MessageTimeout( + self.daemon.node_timeout, 'updates end'): + msg = ':UPDATES: END\r\n' + self.connection.send('%x\r\n%s\r\n' % (len(msg), msg)) + # Now, read their response for any issues. + while True: + with exceptions.MessageTimeout( + self.daemon.http_timeout, 'updates start wait'): + line = self.readline() + if not line: + raise exceptions.ReplicationException('Early disconnect') + line = line.strip() + if line == ':UPDATES: START': + break + elif line: + raise exceptions.ReplicationException( + 'Unexpected response: %r' % line[:1024]) + while True: + with exceptions.MessageTimeout( + self.daemon.http_timeout, 'updates line wait'): + line = self.readline() + if not line: + raise exceptions.ReplicationException('Early disconnect') + line = line.strip() + if line == ':UPDATES: END': + break + elif line: + raise exceptions.ReplicationException( + 'Unexpected response: %r' % line[:1024]) + + def send_delete(self, url_path, timestamp): + """ + Sends a DELETE subrequest with the given information. + """ + msg = ['DELETE ' + url_path, 'X-Timestamp: ' + timestamp] + msg = '\r\n'.join(msg) + '\r\n\r\n' + with exceptions.MessageTimeout( + self.daemon.node_timeout, 'send_delete'): + self.connection.send('%x\r\n%s\r\n' % (len(msg), msg)) + + def send_put(self, url_path, df): + """ + Sends a PUT subrequest for the url_path using the source df + (DiskFile) and content_length. + """ + msg = ['PUT ' + url_path, 'Content-Length: ' + str(df.content_length)] + # Sorted to make it easier to test. + for key, value in sorted(df.get_metadata().iteritems()): + if key not in ('name', 'Content-Length'): + msg.append('%s: %s' % (key, value)) + msg = '\r\n'.join(msg) + '\r\n\r\n' + with exceptions.MessageTimeout(self.daemon.node_timeout, 'send_put'): + self.connection.send('%x\r\n%s\r\n' % (len(msg), msg)) + for chunk in df.reader(iter_hook=eventlet.sleep): + with exceptions.MessageTimeout( + self.daemon.node_timeout, 'send_put chunk'): + self.connection.send('%x\r\n%s\r\n' % (len(chunk), chunk)) + + def disconnect(self): + """ + Closes down the connection to the object server once done + with the REPLICATION request. + """ + try: + with exceptions.MessageTimeout( + self.daemon.node_timeout, 'disconnect'): + self.connection.send('0\r\n\r\n') + except (Exception, exceptions.Timeout): + pass # We're okay with the above failing. + self.connection.close() diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 6ee229a849..691e2d005d 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -196,6 +196,11 @@ class Application(object): try: if self.memcache is None: self.memcache = cache_from_env(env) + # Remove any x-backend-* headers since those are reserved for use + # by backends communicating with each other; no end user should be + # able to send those into the cluster. + for key in list(k for k in env if k.startswith('HTTP_X_BACKEND_')): + del env[key] req = self.update_request(Request(env)) return self.handle_request(req)(env, start_response) except UnicodeError: diff --git a/test/unit/common/test_exceptions.py b/test/unit/common/test_exceptions.py index 6bd4d9a277..83dd25acbd 100644 --- a/test/unit/common/test_exceptions.py +++ b/test/unit/common/test_exceptions.py @@ -16,12 +16,14 @@ # TODO(creiht): Tests import unittest +from swift.common import exceptions class TestExceptions(unittest.TestCase): - def test_placeholder(self): - pass + def test_replication_exception(self): + self.assertEqual(str(exceptions.ReplicationException()), '') + self.assertEqual(str(exceptions.ReplicationException('test')), 'test') if __name__ == '__main__': diff --git a/test/unit/common/test_wsgi.py b/test/unit/common/test_wsgi.py index 294ad611d6..ab4b3bf672 100644 --- a/test/unit/common/test_wsgi.py +++ b/test/unit/common/test_wsgi.py @@ -25,6 +25,7 @@ import os import pickle from textwrap import dedent from gzip import GzipFile +from contextlib import nested from StringIO import StringIO from collections import defaultdict from contextlib import closing @@ -33,6 +34,8 @@ from urllib import quote from eventlet import listen import swift +import mock + from swift.common.swob import Request from swift.common import wsgi, utils, ring @@ -498,6 +501,40 @@ class TestWSGI(unittest.TestCase): self.assertEquals(r.body, 'the body') self.assertEquals(r.environ['swift.source'], 'UT') + def test_run_server_global_conf_callback(self): + calls = defaultdict(lambda: 0) + + def _initrp(conf_file, app_section, *args, **kwargs): + return ( + {'__file__': 'test', 'workers': 0}, + 'logger', + 'log_name') + + def _global_conf_callback(preloaded_app_conf, global_conf): + calls['_global_conf_callback'] += 1 + self.assertEqual( + preloaded_app_conf, {'__file__': 'test', 'workers': 0}) + self.assertEqual(global_conf, {'log_name': 'log_name'}) + global_conf['test1'] = 'one' + + def _loadapp(uri, name=None, **kwargs): + calls['_loadapp'] += 1 + self.assertTrue('global_conf' in kwargs) + self.assertEqual(kwargs['global_conf'], + {'log_name': 'log_name', 'test1': 'one'}) + + with nested( + mock.patch.object(wsgi, '_initrp', _initrp), + mock.patch.object(wsgi, 'get_socket'), + mock.patch.object(wsgi, 'drop_privileges'), + mock.patch.object(wsgi, 'loadapp', _loadapp), + mock.patch.object(wsgi, 'capture_stdio'), + mock.patch.object(wsgi, 'run_server')): + wsgi.run_wsgi('conf_file', 'app_section', + global_conf_callback=_global_conf_callback) + self.assertEqual(calls['_global_conf_callback'], 1) + self.assertEqual(calls['_loadapp'], 1) + def test_pre_auth_req_with_empty_env_no_path(self): r = wsgi.make_pre_authed_request( {}, 'GET') diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index 804d989523..e527569116 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -30,7 +30,7 @@ from shutil import rmtree from time import time from tempfile import mkdtemp from hashlib import md5 -from contextlib import closing +from contextlib import closing, nested from gzip import GzipFile from eventlet import tpool @@ -1160,3 +1160,371 @@ class TestDiskFile(unittest.TestCase): reader.close() log_lines = df._logger.get_lines_for_level('error') self.assert_('a very special error' in log_lines[-1]) + + def test_get_diskfile_from_hash_dev_path_fail(self): + self.df_mgr.get_dev_path = mock.MagicMock(return_value=None) + with nested( + mock.patch('swift.obj.diskfile.DiskFile'), + mock.patch('swift.obj.diskfile.hash_cleanup_listdir'), + mock.patch('swift.obj.diskfile.read_metadata')) as \ + (dfclass, hclistdir, readmeta): + hclistdir.return_value = ['1381679759.90941.data'] + readmeta.return_value = {'name': '/a/c/o'} + self.assertRaises( + DiskFileDeviceUnavailable, + self.df_mgr.get_diskfile_from_hash, + 'dev', '9', '9a7175077c01a23ade5956b8a2bba900') + + def test_get_diskfile_from_hash_not_dir(self): + self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/') + with nested( + mock.patch('swift.obj.diskfile.DiskFile'), + mock.patch('swift.obj.diskfile.hash_cleanup_listdir'), + mock.patch('swift.obj.diskfile.read_metadata'), + mock.patch('swift.obj.diskfile.quarantine_renamer')) as \ + (dfclass, hclistdir, readmeta, quarantine_renamer): + osexc = OSError() + osexc.errno = errno.ENOTDIR + hclistdir.side_effect = osexc + readmeta.return_value = {'name': '/a/c/o'} + self.assertRaises( + DiskFileNotExist, + self.df_mgr.get_diskfile_from_hash, + 'dev', '9', '9a7175077c01a23ade5956b8a2bba900') + quarantine_renamer.assert_called_once_with( + '/srv/dev/', + '/srv/dev/objects/9/900/9a7175077c01a23ade5956b8a2bba900') + + def test_get_diskfile_from_hash_no_dir(self): + self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/') + with nested( + mock.patch('swift.obj.diskfile.DiskFile'), + mock.patch('swift.obj.diskfile.hash_cleanup_listdir'), + mock.patch('swift.obj.diskfile.read_metadata')) as \ + (dfclass, hclistdir, readmeta): + osexc = OSError() + osexc.errno = errno.ENOENT + hclistdir.side_effect = osexc + readmeta.return_value = {'name': '/a/c/o'} + self.assertRaises( + DiskFileNotExist, + self.df_mgr.get_diskfile_from_hash, + 'dev', '9', '9a7175077c01a23ade5956b8a2bba900') + + def test_get_diskfile_from_hash_other_oserror(self): + self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/') + with nested( + mock.patch('swift.obj.diskfile.DiskFile'), + mock.patch('swift.obj.diskfile.hash_cleanup_listdir'), + mock.patch('swift.obj.diskfile.read_metadata')) as \ + (dfclass, hclistdir, readmeta): + osexc = OSError() + hclistdir.side_effect = osexc + readmeta.return_value = {'name': '/a/c/o'} + self.assertRaises( + OSError, + self.df_mgr.get_diskfile_from_hash, + 'dev', '9', '9a7175077c01a23ade5956b8a2bba900') + + def test_get_diskfile_from_hash_no_actual_files(self): + self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/') + with nested( + mock.patch('swift.obj.diskfile.DiskFile'), + mock.patch('swift.obj.diskfile.hash_cleanup_listdir'), + mock.patch('swift.obj.diskfile.read_metadata')) as \ + (dfclass, hclistdir, readmeta): + hclistdir.return_value = [] + readmeta.return_value = {'name': '/a/c/o'} + self.assertRaises( + DiskFileNotExist, + self.df_mgr.get_diskfile_from_hash, + 'dev', '9', '9a7175077c01a23ade5956b8a2bba900') + + def test_get_diskfile_from_hash_read_metadata_problem(self): + self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/') + with nested( + mock.patch('swift.obj.diskfile.DiskFile'), + mock.patch('swift.obj.diskfile.hash_cleanup_listdir'), + mock.patch('swift.obj.diskfile.read_metadata')) as \ + (dfclass, hclistdir, readmeta): + hclistdir.return_value = ['1381679759.90941.data'] + readmeta.side_effect = EOFError() + self.assertRaises( + DiskFileNotExist, + self.df_mgr.get_diskfile_from_hash, + 'dev', '9', '9a7175077c01a23ade5956b8a2bba900') + + def test_get_diskfile_from_hash_no_meta_name(self): + self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/') + with nested( + mock.patch('swift.obj.diskfile.DiskFile'), + mock.patch('swift.obj.diskfile.hash_cleanup_listdir'), + mock.patch('swift.obj.diskfile.read_metadata')) as \ + (dfclass, hclistdir, readmeta): + hclistdir.return_value = ['1381679759.90941.data'] + readmeta.return_value = {} + try: + self.df_mgr.get_diskfile_from_hash( + 'dev', '9', '9a7175077c01a23ade5956b8a2bba900') + except DiskFileNotExist as err: + exc = err + self.assertEqual(str(exc), '') + + def test_get_diskfile_from_hash_bad_meta_name(self): + self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/') + with nested( + mock.patch('swift.obj.diskfile.DiskFile'), + mock.patch('swift.obj.diskfile.hash_cleanup_listdir'), + mock.patch('swift.obj.diskfile.read_metadata')) as \ + (dfclass, hclistdir, readmeta): + hclistdir.return_value = ['1381679759.90941.data'] + readmeta.return_value = {'name': 'bad'} + try: + self.df_mgr.get_diskfile_from_hash( + 'dev', '9', '9a7175077c01a23ade5956b8a2bba900') + except DiskFileNotExist as err: + exc = err + self.assertEqual(str(exc), '') + + def test_get_diskfile_from_hash(self): + self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/') + with nested( + mock.patch('swift.obj.diskfile.DiskFile'), + mock.patch('swift.obj.diskfile.hash_cleanup_listdir'), + mock.patch('swift.obj.diskfile.read_metadata')) as \ + (dfclass, hclistdir, readmeta): + hclistdir.return_value = ['1381679759.90941.data'] + readmeta.return_value = {'name': '/a/c/o'} + self.df_mgr.get_diskfile_from_hash( + 'dev', '9', '9a7175077c01a23ade5956b8a2bba900') + dfclass.assert_called_once_with( + self.df_mgr, '/srv/dev/', self.df_mgr.threadpools['dev'], '9', + 'a', 'c', 'o') + hclistdir.assert_called_once_with( + '/srv/dev/objects/9/900/9a7175077c01a23ade5956b8a2bba900', + 604800) + readmeta.assert_called_once_with( + '/srv/dev/objects/9/900/9a7175077c01a23ade5956b8a2bba900/' + '1381679759.90941.data') + + def test_listdir_enoent(self): + oserror = OSError() + oserror.errno = errno.ENOENT + self.df_mgr.logger.error = mock.MagicMock() + with mock.patch('os.listdir', side_effect=oserror): + self.assertEqual(self.df_mgr._listdir('path'), []) + self.assertEqual(self.df_mgr.logger.error.mock_calls, []) + + def test_listdir_other_oserror(self): + oserror = OSError() + self.df_mgr.logger.error = mock.MagicMock() + with mock.patch('os.listdir', side_effect=oserror): + self.assertEqual(self.df_mgr._listdir('path'), []) + self.df_mgr.logger.error.assert_called_once_with( + 'ERROR: Skipping %r due to error with listdir attempt: %s', + 'path', oserror) + + def test_listdir(self): + self.df_mgr.logger.error = mock.MagicMock() + with mock.patch('os.listdir', return_value=['abc', 'def']): + self.assertEqual(self.df_mgr._listdir('path'), ['abc', 'def']) + self.assertEqual(self.df_mgr.logger.error.mock_calls, []) + + def test_yield_suffixes_dev_path_fail(self): + self.df_mgr.get_dev_path = mock.MagicMock(return_value=None) + exc = None + try: + list(self.df_mgr.yield_suffixes('dev', '9')) + except DiskFileDeviceUnavailable as err: + exc = err + self.assertEqual(str(exc), '') + + def test_yield_suffixes(self): + self.df_mgr._listdir = mock.MagicMock(return_value=[ + 'abc', 'def', 'ghi', 'abcd', '012']) + self.assertEqual( + list(self.df_mgr.yield_suffixes('dev', '9')), + [(self.testdir + '/dev/objects/9/abc', 'abc'), + (self.testdir + '/dev/objects/9/def', 'def'), + (self.testdir + '/dev/objects/9/012', '012')]) + + def test_yield_hashes_dev_path_fail(self): + self.df_mgr.get_dev_path = mock.MagicMock(return_value=None) + exc = None + try: + list(self.df_mgr.yield_hashes('dev', '9')) + except DiskFileDeviceUnavailable as err: + exc = err + self.assertEqual(str(exc), '') + + def test_yield_hashes_empty(self): + def _listdir(path): + return [] + + with mock.patch('os.listdir', _listdir): + self.assertEqual(list(self.df_mgr.yield_hashes('dev', '9')), []) + + def test_yield_hashes_empty_suffixes(self): + def _listdir(path): + return [] + + with mock.patch('os.listdir', _listdir): + self.assertEqual( + list(self.df_mgr.yield_hashes('dev', '9', suffixes=['456'])), + []) + + def test_yield_hashes(self): + fresh_ts = normalize_timestamp(time() - 10) + fresher_ts = normalize_timestamp(time() - 1) + + def _listdir(path): + if path.endswith('/dev/objects/9'): + return ['abc', '456', 'def'] + elif path.endswith('/dev/objects/9/abc'): + return ['9373a92d072897b136b3fc06595b4abc'] + elif path.endswith( + '/dev/objects/9/abc/9373a92d072897b136b3fc06595b4abc'): + return [fresh_ts + '.ts'] + elif path.endswith('/dev/objects/9/456'): + return ['9373a92d072897b136b3fc06595b0456', + '9373a92d072897b136b3fc06595b7456'] + elif path.endswith( + '/dev/objects/9/456/9373a92d072897b136b3fc06595b0456'): + return ['1383180000.12345.data'] + elif path.endswith( + '/dev/objects/9/456/9373a92d072897b136b3fc06595b7456'): + return [fresh_ts + '.ts', + fresher_ts + '.data'] + elif path.endswith('/dev/objects/9/def'): + return [] + else: + raise Exception('Unexpected listdir of %r' % path) + + with nested( + mock.patch('os.listdir', _listdir), + mock.patch('os.unlink')): + self.assertEqual( + list(self.df_mgr.yield_hashes('dev', '9')), + [(self.testdir + + '/dev/objects/9/abc/9373a92d072897b136b3fc06595b4abc', + '9373a92d072897b136b3fc06595b4abc', fresh_ts), + (self.testdir + + '/dev/objects/9/456/9373a92d072897b136b3fc06595b0456', + '9373a92d072897b136b3fc06595b0456', '1383180000.12345'), + (self.testdir + + '/dev/objects/9/456/9373a92d072897b136b3fc06595b7456', + '9373a92d072897b136b3fc06595b7456', fresher_ts)]) + + def test_yield_hashes_suffixes(self): + fresh_ts = normalize_timestamp(time() - 10) + fresher_ts = normalize_timestamp(time() - 1) + + def _listdir(path): + if path.endswith('/dev/objects/9'): + return ['abc', '456', 'def'] + elif path.endswith('/dev/objects/9/abc'): + return ['9373a92d072897b136b3fc06595b4abc'] + elif path.endswith( + '/dev/objects/9/abc/9373a92d072897b136b3fc06595b4abc'): + return [fresh_ts + '.ts'] + elif path.endswith('/dev/objects/9/456'): + return ['9373a92d072897b136b3fc06595b0456', + '9373a92d072897b136b3fc06595b7456'] + elif path.endswith( + '/dev/objects/9/456/9373a92d072897b136b3fc06595b0456'): + return ['1383180000.12345.data'] + elif path.endswith( + '/dev/objects/9/456/9373a92d072897b136b3fc06595b7456'): + return [fresh_ts + '.ts', + fresher_ts + '.data'] + elif path.endswith('/dev/objects/9/def'): + return [] + else: + raise Exception('Unexpected listdir of %r' % path) + + with nested( + mock.patch('os.listdir', _listdir), + mock.patch('os.unlink')): + self.assertEqual( + list(self.df_mgr.yield_hashes( + 'dev', '9', suffixes=['456'])), + [(self.testdir + + '/dev/objects/9/456/9373a92d072897b136b3fc06595b0456', + '9373a92d072897b136b3fc06595b0456', '1383180000.12345'), + (self.testdir + + '/dev/objects/9/456/9373a92d072897b136b3fc06595b7456', + '9373a92d072897b136b3fc06595b7456', fresher_ts)]) + + def test_diskfile_names(self): + df = self.df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o') + self.assertEqual(df.account, 'a') + self.assertEqual(df.container, 'c') + self.assertEqual(df.obj, 'o') + + def test_diskfile_content_length_not_open(self): + df = self.df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o') + exc = None + try: + df.content_length + except DiskFileNotOpen as err: + exc = err + self.assertEqual(str(exc), '') + + def test_diskfile_content_length_deleted(self): + df = self._get_open_disk_file() + ts = time() + df.delete(ts) + exp_name = '%s.ts' % str(normalize_timestamp(ts)) + dl = os.listdir(df._datadir) + self.assertEquals(len(dl), 1) + self.assertTrue(exp_name in set(dl)) + df = self.df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o') + exc = None + try: + with df.open(): + df.content_length + except DiskFileDeleted as err: + exc = err + self.assertEqual(str(exc), '') + + def test_diskfile_content_length(self): + self._get_open_disk_file() + df = self.df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o') + with df.open(): + self.assertEqual(df.content_length, 1024) + + def test_diskfile_timestamp_not_open(self): + df = self.df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o') + exc = None + try: + df.timestamp + except DiskFileNotOpen as err: + exc = err + self.assertEqual(str(exc), '') + + def test_diskfile_timestamp_deleted(self): + df = self._get_open_disk_file() + ts = time() + df.delete(ts) + exp_name = '%s.ts' % str(normalize_timestamp(ts)) + dl = os.listdir(df._datadir) + self.assertEquals(len(dl), 1) + self.assertTrue(exp_name in set(dl)) + df = self.df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o') + exc = None + try: + with df.open(): + df.timestamp + except DiskFileDeleted as err: + exc = err + self.assertEqual(str(exc), '') + + def test_diskfile_timestamp(self): + self._get_open_disk_file(ts='1383181759.12345') + df = self.df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o') + with df.open(): + self.assertEqual(df.timestamp, '1383181759.12345') + + +if __name__ == '__main__': + unittest.main() diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index e8fcb30f45..46d90f6678 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -554,6 +554,12 @@ class TestObjectReplicator(unittest.TestCase): mock_http_connect(200)): self.replicator.replicate() + def test_sync_just_calls_sync_method(self): + self.replicator.sync_method = mock.MagicMock() + self.replicator.sync('node', 'job', 'suffixes') + self.replicator.sync_method.assert_called_once_with( + 'node', 'job', 'suffixes') + @mock.patch('swift.obj.replicator.tpool_reraise', autospec=True) @mock.patch('swift.obj.replicator.http_connect', autospec=True) def test_update(self, mock_http, mock_tpool_reraise): @@ -638,13 +644,13 @@ class TestObjectReplicator(unittest.TestCase): self.assertEquals(self.replicator.suffix_count, 0) mock_logger.reset_mock() - # Check seccesfull http_connect and rsync for local node + # Check successful http_connect and sync for local node mock_tpool_reraise.return_value = (1, {'a83': 'ba47fd314242ec8c' '7efb91f5d57336e4'}) resp.read.return_value = pickle.dumps({'a83': 'c130a2c17ed45102a' 'ada0f4eee69494ff'}) set_default(self) - self.replicator.rsync = fake_func = mock.MagicMock() + self.replicator.sync = fake_func = mock.MagicMock() self.replicator.update(local_job) reqs = [] for node in local_job['nodes']: diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index bc8b491245..56c97917a6 100755 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -2819,6 +2819,26 @@ class TestObjectController(unittest.TestCase): finally: diskfile.fallocate = orig_fallocate + def test_global_conf_callback_does_nothing(self): + preloaded_app_conf = {} + global_conf = {} + object_server.global_conf_callback(preloaded_app_conf, global_conf) + self.assertEqual(preloaded_app_conf, {}) + self.assertEqual(global_conf.keys(), ['replication_semaphore']) + self.assertEqual( + global_conf['replication_semaphore'][0].get_value(), 4) + + def test_global_conf_callback_replication_semaphore(self): + preloaded_app_conf = {'replication_concurrency': 123} + global_conf = {} + with mock.patch.object( + object_server.multiprocessing, 'BoundedSemaphore', + return_value='test1') as mocked_Semaphore: + object_server.global_conf_callback(preloaded_app_conf, global_conf) + self.assertEqual(preloaded_app_conf, {'replication_concurrency': 123}) + self.assertEqual(global_conf, {'replication_semaphore': ['test1']}) + mocked_Semaphore.assert_called_once_with(123) + def test_serv_reserv(self): # Test replication_server flag was set from configuration file. conf = {'devices': self.testdir, 'mount_check': 'false'} @@ -2836,7 +2856,7 @@ class TestObjectController(unittest.TestCase): def test_list_allowed_methods(self): # Test list of allowed_methods obj_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'POST'] - repl_methods = ['REPLICATE'] + repl_methods = ['REPLICATE', 'REPLICATION'] for method_name in obj_methods: method = getattr(self.object_controller, method_name) self.assertFalse(hasattr(method, 'replication')) diff --git a/test/unit/obj/test_ssync_receiver.py b/test/unit/obj/test_ssync_receiver.py new file mode 100644 index 0000000000..b3b6b83214 --- /dev/null +++ b/test/unit/obj/test_ssync_receiver.py @@ -0,0 +1,1227 @@ +# Copyright (c) 2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +import os +import shutil +import StringIO +import tempfile +import unittest + +import eventlet +import mock + +from swift.common import constraints +from swift.common import swob +from swift.common import utils +from swift.obj import diskfile +from swift.obj import server +from swift.obj import ssync_receiver + +from test import unit + + +class TestReceiver(unittest.TestCase): + + def setUp(self): + utils.HASH_PATH_SUFFIX = 'endcap' + utils.HASH_PATH_PREFIX = 'startcap' + # Not sure why the test.unit stuff isn't taking effect here; so I'm + # reenforcing it. + diskfile.getxattr = unit._getxattr + diskfile.setxattr = unit._setxattr + self.testdir = os.path.join( + tempfile.mkdtemp(), 'tmp_test_ssync_receiver') + utils.mkdirs(os.path.join(self.testdir, 'sda1', 'tmp')) + conf = {'devices': self.testdir, 'mount_check': 'false'} + self.controller = server.ObjectController(conf) + self.controller.bytes_per_sync = 1 + + self.account1 = 'a' + self.container1 = 'c' + self.object1 = 'o1' + self.name1 = '/' + '/'.join(( + self.account1, self.container1, self.object1)) + self.hash1 = utils.hash_path( + self.account1, self.container1, self.object1) + self.ts1 = '1372800001.00000' + self.metadata1 = { + 'name': self.name1, + 'X-Timestamp': self.ts1, + 'Content-Length': '0'} + + self.account2 = 'a' + self.container2 = 'c' + self.object2 = 'o2' + self.name2 = '/' + '/'.join(( + self.account2, self.container2, self.object2)) + self.hash2 = utils.hash_path( + self.account2, self.container2, self.object2) + self.ts2 = '1372800002.00000' + self.metadata2 = { + 'name': self.name2, + 'X-Timestamp': self.ts2, + 'Content-Length': '0'} + + def tearDown(self): + shutil.rmtree(os.path.dirname(self.testdir)) + + def body_lines(self, body): + lines = [] + for line in body.split('\n'): + line = line.strip() + if line: + lines.append(line) + return lines + + def test_REPLICATION_semaphore_locked(self): + with mock.patch.object( + self.controller, 'replication_semaphore') as \ + mocked_replication_semaphore: + self.controller.logger = mock.MagicMock() + mocked_replication_semaphore.acquire.return_value = False + req = swob.Request.blank( + '/device/partition', environ={'REQUEST_METHOD': 'REPLICATION'}) + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [":ERROR: 503 '

Service Unavailable

The " + "server is currently unavailable. Please try again at a " + "later time.

'"]) + self.assertEqual(resp.status_int, 200) + self.assertFalse(self.controller.logger.error.called) + self.assertFalse(self.controller.logger.exception.called) + + def test_REPLICATION_initial_path(self): + with mock.patch.object( + self.controller, 'replication_semaphore') as \ + mocked_replication_semaphore: + req = swob.Request.blank( + '/device', environ={'REQUEST_METHOD': 'REPLICATION'}) + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [":ERROR: 0 'Invalid path: /device'"]) + self.assertEqual(resp.status_int, 200) + self.assertFalse(mocked_replication_semaphore.acquire.called) + self.assertFalse(mocked_replication_semaphore.release.called) + + with mock.patch.object( + self.controller, 'replication_semaphore') as \ + mocked_replication_semaphore: + req = swob.Request.blank( + '/device/', environ={'REQUEST_METHOD': 'REPLICATION'}) + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [":ERROR: 0 'Invalid path: /device/'"]) + self.assertEqual(resp.status_int, 200) + self.assertFalse(mocked_replication_semaphore.acquire.called) + self.assertFalse(mocked_replication_semaphore.release.called) + + with mock.patch.object( + self.controller, 'replication_semaphore') as \ + mocked_replication_semaphore: + req = swob.Request.blank( + '/device/partition', environ={'REQUEST_METHOD': 'REPLICATION'}) + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':ERROR: 0 "Looking for :MISSING_CHECK: START got \'\'"']) + self.assertEqual(resp.status_int, 200) + mocked_replication_semaphore.acquire.assert_called_once_with(0) + mocked_replication_semaphore.release.assert_called_once_with() + + with mock.patch.object( + self.controller, 'replication_semaphore') as \ + mocked_replication_semaphore: + req = swob.Request.blank( + '/device/partition/junk', + environ={'REQUEST_METHOD': 'REPLICATION'}) + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [":ERROR: 0 'Invalid path: /device/partition/junk'"]) + self.assertEqual(resp.status_int, 200) + self.assertFalse(mocked_replication_semaphore.acquire.called) + self.assertFalse(mocked_replication_semaphore.release.called) + + def test_REPLICATION_mount_check(self): + with contextlib.nested( + mock.patch.object( + self.controller, 'replication_semaphore'), + mock.patch.object( + self.controller._diskfile_mgr, 'mount_check', False), + mock.patch.object( + constraints, 'check_mount', return_value=False)) as ( + mocked_replication_semaphore, + mocked_mount_check, + mocked_check_mount): + req = swob.Request.blank( + '/device/partition', environ={'REQUEST_METHOD': 'REPLICATION'}) + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':ERROR: 0 "Looking for :MISSING_CHECK: START got \'\'"']) + self.assertEqual(resp.status_int, 200) + self.assertFalse(mocked_check_mount.called) + + with contextlib.nested( + mock.patch.object( + self.controller, 'replication_semaphore'), + mock.patch.object( + self.controller._diskfile_mgr, 'mount_check', True), + mock.patch.object( + constraints, 'check_mount', return_value=False)) as ( + mocked_replication_semaphore, + mocked_mount_check, + mocked_check_mount): + req = swob.Request.blank( + '/device/partition', environ={'REQUEST_METHOD': 'REPLICATION'}) + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [":ERROR: 507 '

Insufficient Storage

There " + "was not enough space to save the resource. Drive: " + "device

'"]) + self.assertEqual(resp.status_int, 200) + mocked_check_mount.assert_called_once_with( + self.controller._diskfile_mgr.devices, 'device') + + mocked_check_mount.reset_mock() + mocked_check_mount.return_value = True + req = swob.Request.blank( + '/device/partition', environ={'REQUEST_METHOD': 'REPLICATION'}) + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':ERROR: 0 "Looking for :MISSING_CHECK: START got \'\'"']) + self.assertEqual(resp.status_int, 200) + mocked_check_mount.assert_called_once_with( + self.controller._diskfile_mgr.devices, 'device') + + def test_REPLICATION_Exception(self): + + class _Wrapper(StringIO.StringIO): + + def __init__(self, value): + StringIO.StringIO.__init__(self, value) + self.mock_socket = mock.MagicMock() + + def get_socket(self): + return self.mock_socket + + with mock.patch.object( + ssync_receiver.eventlet.greenio, 'shutdown_safe') as \ + mock_shutdown_safe: + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/device/partition', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' + ':UPDATES: START\r\nBad content is here') + req.remote_addr = '1.2.3.4' + mock_wsgi_input = _Wrapper(req.body) + req.environ['wsgi.input'] = mock_wsgi_input + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ":ERROR: 0 'Got no headers for Bad content is here'"]) + self.assertEqual(resp.status_int, 200) + mock_shutdown_safe.assert_called_once_with( + mock_wsgi_input.mock_socket) + mock_wsgi_input.mock_socket.close.assert_called_once_with() + self.controller.logger.exception.assert_called_once_with( + '1.2.3.4/device/partition EXCEPTION in replication.Receiver') + + def test_REPLICATION_Exception_Exception(self): + + class _Wrapper(StringIO.StringIO): + + def __init__(self, value): + StringIO.StringIO.__init__(self, value) + self.mock_socket = mock.MagicMock() + + def get_socket(self): + return self.mock_socket + + with mock.patch.object( + ssync_receiver.eventlet.greenio, 'shutdown_safe') as \ + mock_shutdown_safe: + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/device/partition', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' + ':UPDATES: START\r\nBad content is here') + req.remote_addr = mock.MagicMock() + req.remote_addr.__str__ = None + mock_wsgi_input = _Wrapper(req.body) + req.environ['wsgi.input'] = mock_wsgi_input + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', ':MISSING_CHECK: END']) + self.assertEqual(resp.status_int, 200) + mock_shutdown_safe.assert_called_once_with( + mock_wsgi_input.mock_socket) + mock_wsgi_input.mock_socket.close.assert_called_once_with() + self.controller.logger.exception.assert_called_once_with( + 'EXCEPTION in replication.Receiver') + + def test_MISSING_CHECK_timeout(self): + + class _Wrapper(StringIO.StringIO): + + def __init__(self, value): + StringIO.StringIO.__init__(self, value) + self.mock_socket = mock.MagicMock() + + def readline(self, sizehint=-1): + line = StringIO.StringIO.readline(self) + if line.startswith('hash'): + eventlet.sleep(0.1) + return line + + def get_socket(self): + return self.mock_socket + + self.controller.client_timeout = 0.01 + with mock.patch.object( + ssync_receiver.eventlet.greenio, 'shutdown_safe') as \ + mock_shutdown_safe: + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/sda1/1', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n' + 'hash ts\r\n' + ':MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n:UPDATES: END\r\n') + req.remote_addr = '2.3.4.5' + mock_wsgi_input = _Wrapper(req.body) + req.environ['wsgi.input'] = mock_wsgi_input + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [":ERROR: 408 '0.01 seconds: missing_check line'"]) + self.assertEqual(resp.status_int, 200) + self.assertFalse(mock_shutdown_safe.called) + self.controller.logger.error.assert_called_once_with( + '2.3.4.5/sda1/1 TIMEOUT in replication.Receiver: ' + '0.01 seconds: missing_check line') + + def test_MISSING_CHECK_other_exception(self): + + class _Wrapper(StringIO.StringIO): + + def __init__(self, value): + StringIO.StringIO.__init__(self, value) + self.mock_socket = mock.MagicMock() + + def readline(self, sizehint=-1): + line = StringIO.StringIO.readline(self) + if line.startswith('hash'): + raise Exception('test exception') + return line + + def get_socket(self): + return self.mock_socket + + self.controller.client_timeout = 0.01 + with mock.patch.object( + ssync_receiver.eventlet.greenio, 'shutdown_safe') as \ + mock_shutdown_safe: + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/sda1/1', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n' + 'hash ts\r\n' + ':MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n:UPDATES: END\r\n') + req.remote_addr = '3.4.5.6' + mock_wsgi_input = _Wrapper(req.body) + req.environ['wsgi.input'] = mock_wsgi_input + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [":ERROR: 0 'test exception'"]) + self.assertEqual(resp.status_int, 200) + self.assertFalse(mock_shutdown_safe.called) + self.controller.logger.exception.assert_called_once_with( + '3.4.5.6/sda1/1 EXCEPTION in replication.Receiver') + + def test_MISSING_CHECK_empty_list(self): + + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/sda1/1', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n' + ':MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n:UPDATES: END\r\n') + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ':UPDATES: START', ':UPDATES: END']) + self.assertEqual(resp.status_int, 200) + self.assertFalse(self.controller.logger.error.called) + self.assertFalse(self.controller.logger.exception.called) + + def test_MISSING_CHECK_have_none(self): + + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/sda1/1', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n' + + self.hash1 + ' ' + self.ts1 + '\r\n' + + self.hash2 + ' ' + self.ts2 + '\r\n' + ':MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n:UPDATES: END\r\n') + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', + self.hash1, + self.hash2, + ':MISSING_CHECK: END', + ':UPDATES: START', ':UPDATES: END']) + self.assertEqual(resp.status_int, 200) + self.assertFalse(self.controller.logger.error.called) + self.assertFalse(self.controller.logger.exception.called) + + def test_MISSING_CHECK_have_one_exact(self): + object_dir = utils.storage_directory( + os.path.join(self.testdir, 'sda1', diskfile.DATADIR), + '1', self.hash1) + utils.mkdirs(object_dir) + fp = open(os.path.join(object_dir, self.ts1 + '.data'), 'w+') + fp.write('1') + fp.flush() + self.metadata1['Content-Length'] = '1' + diskfile.write_metadata(fp, self.metadata1) + + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/sda1/1', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n' + + self.hash1 + ' ' + self.ts1 + '\r\n' + + self.hash2 + ' ' + self.ts2 + '\r\n' + ':MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n:UPDATES: END\r\n') + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', + self.hash2, + ':MISSING_CHECK: END', + ':UPDATES: START', ':UPDATES: END']) + self.assertEqual(resp.status_int, 200) + self.assertFalse(self.controller.logger.error.called) + self.assertFalse(self.controller.logger.exception.called) + + def test_MISSING_CHECK_have_one_newer(self): + object_dir = utils.storage_directory( + os.path.join(self.testdir, 'sda1', diskfile.DATADIR), + '1', self.hash1) + utils.mkdirs(object_dir) + newer_ts1 = utils.normalize_timestamp(float(self.ts1) + 1) + self.metadata1['X-Timestamp'] = newer_ts1 + fp = open(os.path.join(object_dir, newer_ts1 + '.data'), 'w+') + fp.write('1') + fp.flush() + self.metadata1['Content-Length'] = '1' + diskfile.write_metadata(fp, self.metadata1) + + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/sda1/1', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n' + + self.hash1 + ' ' + self.ts1 + '\r\n' + + self.hash2 + ' ' + self.ts2 + '\r\n' + ':MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n:UPDATES: END\r\n') + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', + self.hash2, + ':MISSING_CHECK: END', + ':UPDATES: START', ':UPDATES: END']) + self.assertEqual(resp.status_int, 200) + self.assertFalse(self.controller.logger.error.called) + self.assertFalse(self.controller.logger.exception.called) + + def test_MISSING_CHECK_have_one_older(self): + object_dir = utils.storage_directory( + os.path.join(self.testdir, 'sda1', diskfile.DATADIR), + '1', self.hash1) + utils.mkdirs(object_dir) + older_ts1 = utils.normalize_timestamp(float(self.ts1) - 1) + self.metadata1['X-Timestamp'] = older_ts1 + fp = open(os.path.join(object_dir, older_ts1 + '.data'), 'w+') + fp.write('1') + fp.flush() + self.metadata1['Content-Length'] = '1' + diskfile.write_metadata(fp, self.metadata1) + + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/sda1/1', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n' + + self.hash1 + ' ' + self.ts1 + '\r\n' + + self.hash2 + ' ' + self.ts2 + '\r\n' + ':MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n:UPDATES: END\r\n') + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', + self.hash1, + self.hash2, + ':MISSING_CHECK: END', + ':UPDATES: START', ':UPDATES: END']) + self.assertEqual(resp.status_int, 200) + self.assertFalse(self.controller.logger.error.called) + self.assertFalse(self.controller.logger.exception.called) + + def test_UPDATES_timeout(self): + + class _Wrapper(StringIO.StringIO): + + def __init__(self, value): + StringIO.StringIO.__init__(self, value) + self.mock_socket = mock.MagicMock() + + def readline(self, sizehint=-1): + line = StringIO.StringIO.readline(self) + if line.startswith('DELETE'): + eventlet.sleep(0.1) + return line + + def get_socket(self): + return self.mock_socket + + self.controller.client_timeout = 0.01 + with mock.patch.object( + ssync_receiver.eventlet.greenio, 'shutdown_safe') as \ + mock_shutdown_safe: + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/device/partition', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n' + 'DELETE /a/c/o\r\n' + 'X-Timestamp: 1364456113.76334\r\n' + '\r\n' + ':UPDATES: END\r\n') + req.remote_addr = '2.3.4.5' + mock_wsgi_input = _Wrapper(req.body) + req.environ['wsgi.input'] = mock_wsgi_input + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ":ERROR: 408 '0.01 seconds: updates line'"]) + self.assertEqual(resp.status_int, 200) + mock_shutdown_safe.assert_called_once_with( + mock_wsgi_input.mock_socket) + mock_wsgi_input.mock_socket.close.assert_called_once_with() + self.controller.logger.error.assert_called_once_with( + '2.3.4.5/device/partition TIMEOUT in replication.Receiver: ' + '0.01 seconds: updates line') + + def test_UPDATES_other_exception(self): + + class _Wrapper(StringIO.StringIO): + + def __init__(self, value): + StringIO.StringIO.__init__(self, value) + self.mock_socket = mock.MagicMock() + + def readline(self, sizehint=-1): + line = StringIO.StringIO.readline(self) + if line.startswith('DELETE'): + raise Exception('test exception') + return line + + def get_socket(self): + return self.mock_socket + + self.controller.client_timeout = 0.01 + with mock.patch.object( + ssync_receiver.eventlet.greenio, 'shutdown_safe') as \ + mock_shutdown_safe: + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/device/partition', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n' + 'DELETE /a/c/o\r\n' + 'X-Timestamp: 1364456113.76334\r\n' + '\r\n' + ':UPDATES: END\r\n') + req.remote_addr = '3.4.5.6' + mock_wsgi_input = _Wrapper(req.body) + req.environ['wsgi.input'] = mock_wsgi_input + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ":ERROR: 0 'test exception'"]) + self.assertEqual(resp.status_int, 200) + mock_shutdown_safe.assert_called_once_with( + mock_wsgi_input.mock_socket) + mock_wsgi_input.mock_socket.close.assert_called_once_with() + self.controller.logger.exception.assert_called_once_with( + '3.4.5.6/device/partition EXCEPTION in replication.Receiver') + + def test_UPDATES_no_problems_no_hard_disconnect(self): + + class _Wrapper(StringIO.StringIO): + + def __init__(self, value): + StringIO.StringIO.__init__(self, value) + self.mock_socket = mock.MagicMock() + + def get_socket(self): + return self.mock_socket + + self.controller.client_timeout = 0.01 + with contextlib.nested( + mock.patch.object( + ssync_receiver.eventlet.greenio, 'shutdown_safe'), + mock.patch.object( + self.controller, 'DELETE', + return_value=swob.HTTPNoContent())) as ( + mock_shutdown_safe, mock_delete): + req = swob.Request.blank( + '/device/partition', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n' + 'DELETE /a/c/o\r\n' + 'X-Timestamp: 1364456113.76334\r\n' + '\r\n' + ':UPDATES: END\r\n') + mock_wsgi_input = _Wrapper(req.body) + req.environ['wsgi.input'] = mock_wsgi_input + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ':UPDATES: START', ':UPDATES: END']) + self.assertEqual(resp.status_int, 200) + self.assertFalse(mock_shutdown_safe.called) + self.assertFalse(mock_wsgi_input.mock_socket.close.called) + + def test_UPDATES_bad_subrequest_line(self): + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/device/partition', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n' + 'bad_subrequest_line\r\n') + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ":ERROR: 0 'need more than 1 value to unpack'"]) + self.assertEqual(resp.status_int, 200) + self.controller.logger.exception.assert_called_once_with( + 'None/device/partition EXCEPTION in replication.Receiver') + + with mock.patch.object( + self.controller, 'DELETE', + return_value=swob.HTTPNoContent()): + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/device/partition', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n' + 'DELETE /a/c/o\r\n' + 'X-Timestamp: 1364456113.76334\r\n' + '\r\n' + 'bad_subrequest_line2') + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ":ERROR: 0 'need more than 1 value to unpack'"]) + self.assertEqual(resp.status_int, 200) + self.controller.logger.exception.assert_called_once_with( + 'None/device/partition EXCEPTION in replication.Receiver') + + def test_UPDATES_no_headers(self): + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/device/partition', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n' + 'DELETE /a/c/o\r\n') + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ":ERROR: 0 'Got no headers for DELETE /a/c/o'"]) + self.assertEqual(resp.status_int, 200) + self.controller.logger.exception.assert_called_once_with( + 'None/device/partition EXCEPTION in replication.Receiver') + + def test_UPDATES_bad_headers(self): + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/device/partition', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n' + 'DELETE /a/c/o\r\n' + 'Bad-Header Test\r\n') + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ":ERROR: 0 'need more than 1 value to unpack'"]) + self.assertEqual(resp.status_int, 200) + self.controller.logger.exception.assert_called_once_with( + 'None/device/partition EXCEPTION in replication.Receiver') + + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/device/partition', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n' + 'DELETE /a/c/o\r\n' + 'Good-Header: Test\r\n' + 'Bad-Header Test\r\n') + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ":ERROR: 0 'need more than 1 value to unpack'"]) + self.assertEqual(resp.status_int, 200) + self.controller.logger.exception.assert_called_once_with( + 'None/device/partition EXCEPTION in replication.Receiver') + + def test_UPDATES_bad_content_length(self): + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/device/partition', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n' + 'PUT /a/c/o\r\n' + 'Content-Length: a\r\n\r\n') + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ':ERROR: 0 "invalid literal for int() with base 10: \'a\'"']) + self.assertEqual(resp.status_int, 200) + self.controller.logger.exception.assert_called_once_with( + 'None/device/partition EXCEPTION in replication.Receiver') + + def test_UPDATES_content_length_with_DELETE(self): + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/device/partition', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n' + 'DELETE /a/c/o\r\n' + 'Content-Length: 1\r\n\r\n') + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ":ERROR: 0 'DELETE subrequest with content-length /a/c/o'"]) + self.assertEqual(resp.status_int, 200) + self.controller.logger.exception.assert_called_once_with( + 'None/device/partition EXCEPTION in replication.Receiver') + + def test_UPDATES_no_content_length_with_PUT(self): + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/device/partition', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n' + 'PUT /a/c/o\r\n\r\n') + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ":ERROR: 0 'No content-length sent for PUT /a/c/o'"]) + self.assertEqual(resp.status_int, 200) + self.controller.logger.exception.assert_called_once_with( + 'None/device/partition EXCEPTION in replication.Receiver') + + def test_UPDATES_early_termination(self): + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/device/partition', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n' + 'PUT /a/c/o\r\n' + 'Content-Length: 1\r\n\r\n') + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ":ERROR: 0 'Early termination for PUT /a/c/o'"]) + self.assertEqual(resp.status_int, 200) + self.controller.logger.exception.assert_called_once_with( + 'None/device/partition EXCEPTION in replication.Receiver') + + def test_UPDATES_failures(self): + + @server.public + def _DELETE(request): + if request.path == '/device/partition/a/c/works': + return swob.HTTPOk() + else: + return swob.HTTPInternalServerError() + + # failures never hit threshold + with mock.patch.object(self.controller, 'DELETE', _DELETE): + self.controller.replication_failure_threshold = 4 + self.controller.replication_failure_ratio = 1.5 + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/device/partition', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n' + 'DELETE /a/c/o\r\n\r\n' + 'DELETE /a/c/o\r\n\r\n' + 'DELETE /a/c/o\r\n\r\n') + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ":ERROR: 500 'ERROR: With :UPDATES: 3 failures to 0 " + "successes'"]) + self.assertEqual(resp.status_int, 200) + self.assertFalse(self.controller.logger.exception.called) + self.assertFalse(self.controller.logger.error.called) + + # failures hit threshold and no successes, so ratio is like infinity + with mock.patch.object(self.controller, 'DELETE', _DELETE): + self.controller.replication_failure_threshold = 4 + self.controller.replication_failure_ratio = 1.5 + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/device/partition', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n' + 'DELETE /a/c/o\r\n\r\n' + 'DELETE /a/c/o\r\n\r\n' + 'DELETE /a/c/o\r\n\r\n' + 'DELETE /a/c/o\r\n\r\n' + 'DELETE /a/c/o\r\n\r\n' + ':UPDATES: END\r\n') + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ":ERROR: 0 'Too many 4 failures to 0 successes'"]) + self.assertEqual(resp.status_int, 200) + self.controller.logger.exception.assert_called_once_with( + 'None/device/partition EXCEPTION in replication.Receiver') + self.assertFalse(self.controller.logger.error.called) + + # failures hit threshold and ratio hits 1.33333333333 + with mock.patch.object(self.controller, 'DELETE', _DELETE): + self.controller.replication_failure_threshold = 4 + self.controller.replication_failure_ratio = 1.5 + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/device/partition', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n' + 'DELETE /a/c/o\r\n\r\n' + 'DELETE /a/c/o\r\n\r\n' + 'DELETE /a/c/works\r\n\r\n' + 'DELETE /a/c/works\r\n\r\n' + 'DELETE /a/c/works\r\n\r\n' + 'DELETE /a/c/o\r\n\r\n' + 'DELETE /a/c/o\r\n\r\n' + ':UPDATES: END\r\n') + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ":ERROR: 500 'ERROR: With :UPDATES: 4 failures to 3 " + "successes'"]) + self.assertEqual(resp.status_int, 200) + self.assertFalse(self.controller.logger.exception.called) + self.assertFalse(self.controller.logger.error.called) + + # failures hit threshold and ratio hits 2.0 + with mock.patch.object(self.controller, 'DELETE', _DELETE): + self.controller.replication_failure_threshold = 4 + self.controller.replication_failure_ratio = 1.5 + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/device/partition', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n' + 'DELETE /a/c/o\r\n\r\n' + 'DELETE /a/c/o\r\n\r\n' + 'DELETE /a/c/works\r\n\r\n' + 'DELETE /a/c/works\r\n\r\n' + 'DELETE /a/c/o\r\n\r\n' + 'DELETE /a/c/o\r\n\r\n' + ':UPDATES: END\r\n') + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ":ERROR: 0 'Too many 4 failures to 2 successes'"]) + self.assertEqual(resp.status_int, 200) + self.controller.logger.exception.assert_called_once_with( + 'None/device/partition EXCEPTION in replication.Receiver') + self.assertFalse(self.controller.logger.error.called) + + def test_UPDATES_PUT(self): + _PUT_request = [None] + + @server.public + def _PUT(request): + _PUT_request[0] = request + request.read_body = request.environ['wsgi.input'].read() + return swob.HTTPOk() + + with mock.patch.object(self.controller, 'PUT', _PUT): + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/device/partition', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n' + 'PUT /a/c/o\r\n' + 'Content-Length: 1\r\n' + 'X-Timestamp: 1364456113.12344\r\n' + 'X-Object-Meta-Test1: one\r\n' + 'Content-Encoding: gzip\r\n' + 'Specialty-Header: value\r\n' + '\r\n' + '1') + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ':UPDATES: START', ':UPDATES: END']) + self.assertEqual(resp.status_int, 200) + self.assertFalse(self.controller.logger.exception.called) + self.assertFalse(self.controller.logger.error.called) + req = _PUT_request[0] + self.assertEqual(req.path, '/device/partition/a/c/o') + self.assertEqual(req.content_length, 1) + self.assertEqual(req.headers, { + 'Content-Length': '1', + 'X-Timestamp': '1364456113.12344', + 'X-Object-Meta-Test1': 'one', + 'Content-Encoding': 'gzip', + 'Specialty-Header': 'value', + 'Host': 'localhost:80', + 'X-Backend-Replication': 'True', + 'X-Backend-Replication-Headers': ( + 'content-length x-timestamp x-object-meta-test1 ' + 'content-encoding specialty-header')}) + self.assertEqual(req.read_body, '1') + + def test_UPDATES_DELETE(self): + _DELETE_request = [None] + + @server.public + def _DELETE(request): + _DELETE_request[0] = request + return swob.HTTPOk() + + with mock.patch.object(self.controller, 'DELETE', _DELETE): + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/device/partition', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n' + 'DELETE /a/c/o\r\n' + 'X-Timestamp: 1364456113.76334\r\n' + '\r\n') + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ':UPDATES: START', ':UPDATES: END']) + self.assertEqual(resp.status_int, 200) + self.assertFalse(self.controller.logger.exception.called) + self.assertFalse(self.controller.logger.error.called) + req = _DELETE_request[0] + self.assertEqual(req.path, '/device/partition/a/c/o') + self.assertEqual(req.headers, { + 'X-Timestamp': '1364456113.76334', + 'Host': 'localhost:80', + 'X-Backend-Replication': 'True', + 'X-Backend-Replication-Headers': 'x-timestamp'}) + + def test_UPDATES_BONK(self): + _BONK_request = [None] + + @server.public + def _BONK(request): + _BONK_request[0] = request + return swob.HTTPOk() + + self.controller.BONK = _BONK + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/device/partition', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n' + 'BONK /a/c/o\r\n' + 'X-Timestamp: 1364456113.76334\r\n' + '\r\n') + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ":ERROR: 0 'Invalid subrequest method BONK'"]) + self.assertEqual(resp.status_int, 200) + self.controller.logger.exception.assert_called_once_with( + 'None/device/partition EXCEPTION in replication.Receiver') + self.assertEqual(_BONK_request[0], None) + + def test_UPDATES_multiple(self): + _requests = [] + + @server.public + def _PUT(request): + _requests.append(request) + request.read_body = request.environ['wsgi.input'].read() + return swob.HTTPOk() + + @server.public + def _DELETE(request): + _requests.append(request) + return swob.HTTPOk() + + with contextlib.nested( + mock.patch.object(self.controller, 'PUT', _PUT), + mock.patch.object(self.controller, 'DELETE', _DELETE)): + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/device/partition', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n' + 'PUT /a/c/o1\r\n' + 'Content-Length: 1\r\n' + 'X-Timestamp: 1364456113.00001\r\n' + 'X-Object-Meta-Test1: one\r\n' + 'Content-Encoding: gzip\r\n' + 'Specialty-Header: value\r\n' + '\r\n' + '1' + 'DELETE /a/c/o2\r\n' + 'X-Timestamp: 1364456113.00002\r\n' + '\r\n' + 'PUT /a/c/o3\r\n' + 'Content-Length: 3\r\n' + 'X-Timestamp: 1364456113.00003\r\n' + '\r\n' + '123' + 'PUT /a/c/o4\r\n' + 'Content-Length: 4\r\n' + 'X-Timestamp: 1364456113.00004\r\n' + '\r\n' + '1\r\n4' + 'DELETE /a/c/o5\r\n' + 'X-Timestamp: 1364456113.00005\r\n' + '\r\n' + 'DELETE /a/c/o6\r\n' + 'X-Timestamp: 1364456113.00006\r\n' + '\r\n') + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ':UPDATES: START', ':UPDATES: END']) + self.assertEqual(resp.status_int, 200) + self.assertFalse(self.controller.logger.exception.called) + self.assertFalse(self.controller.logger.error.called) + req = _requests.pop(0) + self.assertEqual(req.method, 'PUT') + self.assertEqual(req.path, '/device/partition/a/c/o1') + self.assertEqual(req.content_length, 1) + self.assertEqual(req.headers, { + 'Content-Length': '1', + 'X-Timestamp': '1364456113.00001', + 'X-Object-Meta-Test1': 'one', + 'Content-Encoding': 'gzip', + 'Specialty-Header': 'value', + 'Host': 'localhost:80', + 'X-Backend-Replication': 'True', + 'X-Backend-Replication-Headers': ( + 'content-length x-timestamp x-object-meta-test1 ' + 'content-encoding specialty-header')}) + self.assertEqual(req.read_body, '1') + req = _requests.pop(0) + self.assertEqual(req.method, 'DELETE') + self.assertEqual(req.path, '/device/partition/a/c/o2') + self.assertEqual(req.headers, { + 'X-Timestamp': '1364456113.00002', + 'Host': 'localhost:80', + 'X-Backend-Replication': 'True', + 'X-Backend-Replication-Headers': 'x-timestamp'}) + req = _requests.pop(0) + self.assertEqual(req.method, 'PUT') + self.assertEqual(req.path, '/device/partition/a/c/o3') + self.assertEqual(req.content_length, 3) + self.assertEqual(req.headers, { + 'Content-Length': '3', + 'X-Timestamp': '1364456113.00003', + 'Host': 'localhost:80', + 'X-Backend-Replication': 'True', + 'X-Backend-Replication-Headers': ( + 'content-length x-timestamp')}) + self.assertEqual(req.read_body, '123') + req = _requests.pop(0) + self.assertEqual(req.method, 'PUT') + self.assertEqual(req.path, '/device/partition/a/c/o4') + self.assertEqual(req.content_length, 4) + self.assertEqual(req.headers, { + 'Content-Length': '4', + 'X-Timestamp': '1364456113.00004', + 'Host': 'localhost:80', + 'X-Backend-Replication': 'True', + 'X-Backend-Replication-Headers': ( + 'content-length x-timestamp')}) + self.assertEqual(req.read_body, '1\r\n4') + req = _requests.pop(0) + self.assertEqual(req.method, 'DELETE') + self.assertEqual(req.path, '/device/partition/a/c/o5') + self.assertEqual(req.headers, { + 'X-Timestamp': '1364456113.00005', + 'Host': 'localhost:80', + 'X-Backend-Replication': 'True', + 'X-Backend-Replication-Headers': 'x-timestamp'}) + req = _requests.pop(0) + self.assertEqual(req.method, 'DELETE') + self.assertEqual(req.path, '/device/partition/a/c/o6') + self.assertEqual(req.headers, { + 'X-Timestamp': '1364456113.00006', + 'Host': 'localhost:80', + 'X-Backend-Replication': 'True', + 'X-Backend-Replication-Headers': 'x-timestamp'}) + self.assertEqual(_requests, []) + + def test_UPDATES_subreq_does_not_read_all(self): + # This tests that if a REPLICATION subrequest fails and doesn't read + # all the subrequest body that it will read and throw away the rest of + # the body before moving on to the next subrequest. + # If you comment out the part in ssync_receiver where it does: + # for junk in subreq.environ['wsgi.input']: + # pass + # You can then see this test fail. + _requests = [] + + @server.public + def _PUT(request): + _requests.append(request) + # Deliberately just reading up to first 2 bytes. + request.read_body = request.environ['wsgi.input'].read(2) + return swob.HTTPInternalServerError() + + class _IgnoreReadlineHint(StringIO.StringIO): + + def __init__(self, value): + StringIO.StringIO.__init__(self, value) + + def readline(self, hint=-1): + return StringIO.StringIO.readline(self) + + self.controller.PUT = _PUT + self.controller.network_chunk_size = 2 + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/device/partition', + environ={'REQUEST_METHOD': 'REPLICATION'}, + body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n' + 'PUT /a/c/o1\r\n' + 'Content-Length: 3\r\n' + 'X-Timestamp: 1364456113.00001\r\n' + '\r\n' + '123' + 'PUT /a/c/o2\r\n' + 'Content-Length: 1\r\n' + 'X-Timestamp: 1364456113.00002\r\n' + '\r\n' + '1') + req.environ['wsgi.input'] = _IgnoreReadlineHint(req.body) + resp = self.controller.REPLICATION(req) + self.assertEqual( + self.body_lines(resp.body), + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ":ERROR: 500 'ERROR: With :UPDATES: 2 failures to 0 successes'"]) + self.assertEqual(resp.status_int, 200) + self.assertFalse(self.controller.logger.exception.called) + self.assertFalse(self.controller.logger.error.called) + req = _requests.pop(0) + self.assertEqual(req.path, '/device/partition/a/c/o1') + self.assertEqual(req.content_length, 3) + self.assertEqual(req.headers, { + 'Content-Length': '3', + 'X-Timestamp': '1364456113.00001', + 'Host': 'localhost:80', + 'X-Backend-Replication': 'True', + 'X-Backend-Replication-Headers': ( + 'content-length x-timestamp')}) + self.assertEqual(req.read_body, '12') + req = _requests.pop(0) + self.assertEqual(req.path, '/device/partition/a/c/o2') + self.assertEqual(req.content_length, 1) + self.assertEqual(req.headers, { + 'Content-Length': '1', + 'X-Timestamp': '1364456113.00002', + 'Host': 'localhost:80', + 'X-Backend-Replication': 'True', + 'X-Backend-Replication-Headers': ( + 'content-length x-timestamp')}) + self.assertEqual(req.read_body, '1') + self.assertEqual(_requests, []) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py new file mode 100644 index 0000000000..9b401b93ba --- /dev/null +++ b/test/unit/obj/test_ssync_sender.py @@ -0,0 +1,800 @@ +# Copyright (c) 2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import StringIO +import unittest + +import eventlet +import mock + +from swift.common import exceptions +from swift.obj import ssync_sender + + +class FakeReplicator(object): + + def __init__(self): + self.logger = mock.MagicMock() + self.conn_timeout = 1 + self.node_timeout = 2 + self.http_timeout = 3 + self.network_chunk_size = 65536 + self.disk_chunk_size = 4096 + self._diskfile_mgr = mock.MagicMock() + + +class NullBufferedHTTPConnection(object): + + def __init__(*args, **kwargs): + pass + + def putrequest(*args, **kwargs): + pass + + def putheader(*args, **kwargs): + pass + + def endheaders(*args, **kwargs): + pass + + def getresponse(*args, **kwargs): + pass + + +class FakeResponse(object): + + def __init__(self, chunk_body=''): + self.status = 200 + self.close_called = False + if chunk_body: + self.fp = StringIO.StringIO( + '%x\r\n%s\r\n0\r\n\r\n' % (len(chunk_body), chunk_body)) + + def close(self): + self.close_called = True + + +class FakeConnection(object): + + def __init__(self): + self.sent = [] + self.closed = False + + def send(self, data): + self.sent.append(data) + + def close(self): + self.closed = True + + +class TestSender(unittest.TestCase): + + def setUp(self): + self.replicator = FakeReplicator() + self.sender = ssync_sender.Sender(self.replicator, None, None, None) + + def test_call_catches_MessageTimeout(self): + + def connect(self): + exc = exceptions.MessageTimeout(1, 'test connect') + # Cancels Eventlet's raising of this since we're about to do it. + exc.cancel() + raise exc + + with mock.patch.object(ssync_sender.Sender, 'connect', connect): + node = dict(ip='1.2.3.4', port=5678, device='sda1') + job = dict(partition='9') + self.sender = ssync_sender.Sender(self.replicator, node, job, None) + self.sender.suffixes = ['abc'] + self.assertFalse(self.sender()) + call = self.replicator.logger.error.mock_calls[0] + self.assertEqual( + call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9')) + self.assertEqual(str(call[1][-1]), '1 second: test connect') + + def test_call_catches_ReplicationException(self): + + def connect(self): + raise exceptions.ReplicationException('test connect') + + with mock.patch.object(ssync_sender.Sender, 'connect', connect): + node = dict(ip='1.2.3.4', port=5678, device='sda1') + job = dict(partition='9') + self.sender = ssync_sender.Sender(self.replicator, node, job, None) + self.sender.suffixes = ['abc'] + self.assertFalse(self.sender()) + call = self.replicator.logger.error.mock_calls[0] + self.assertEqual( + call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9')) + self.assertEqual(str(call[1][-1]), 'test connect') + + def test_call_catches_other_exceptions(self): + node = dict(ip='1.2.3.4', port=5678, device='sda1') + job = dict(partition='9') + self.sender = ssync_sender.Sender(self.replicator, node, job, None) + self.sender.suffixes = ['abc'] + self.sender.connect = 'cause exception' + self.assertFalse(self.sender()) + call = self.replicator.logger.exception.mock_calls[0] + self.assertEqual( + call[1], + ('%s:%s/%s/%s EXCEPTION in replication.Sender', '1.2.3.4', 5678, + 'sda1', '9')) + + def test_call_catches_exception_handling_exception(self): + node = dict(ip='1.2.3.4', port=5678, device='sda1') + job = None # Will cause inside exception handler to fail + self.sender = ssync_sender.Sender(self.replicator, node, job, None) + self.sender.suffixes = ['abc'] + self.sender.connect = 'cause exception' + self.assertFalse(self.sender()) + self.replicator.logger.exception.assert_called_once_with( + 'EXCEPTION in replication.Sender') + + def test_call_calls_others(self): + self.sender.suffixes = ['abc'] + self.sender.connect = mock.MagicMock() + self.sender.missing_check = mock.MagicMock() + self.sender.updates = mock.MagicMock() + self.sender.disconnect = mock.MagicMock() + self.assertTrue(self.sender()) + self.sender.connect.assert_called_once_with() + self.sender.missing_check.assert_called_once_with() + self.sender.updates.assert_called_once_with() + self.sender.disconnect.assert_called_once_with() + + def test_call_calls_others_returns_failure(self): + self.sender.suffixes = ['abc'] + self.sender.connect = mock.MagicMock() + self.sender.missing_check = mock.MagicMock() + self.sender.updates = mock.MagicMock() + self.sender.disconnect = mock.MagicMock() + self.sender.failures = 1 + self.assertFalse(self.sender()) + self.sender.connect.assert_called_once_with() + self.sender.missing_check.assert_called_once_with() + self.sender.updates.assert_called_once_with() + self.sender.disconnect.assert_called_once_with() + + def test_connect_send_timeout(self): + self.replicator.conn_timeout = 0.01 + node = dict(ip='1.2.3.4', port=5678, device='sda1') + job = dict(partition='9') + self.sender = ssync_sender.Sender(self.replicator, node, job, None) + self.sender.suffixes = ['abc'] + + def putrequest(*args, **kwargs): + eventlet.sleep(0.1) + + with mock.patch.object( + ssync_sender.bufferedhttp.BufferedHTTPConnection, + 'putrequest', putrequest): + self.assertFalse(self.sender()) + call = self.replicator.logger.error.mock_calls[0] + self.assertEqual( + call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9')) + self.assertEqual(str(call[1][-1]), '0.01 seconds: connect send') + + def test_connect_receive_timeout(self): + self.replicator.node_timeout = 0.02 + node = dict(ip='1.2.3.4', port=5678, device='sda1') + job = dict(partition='9') + self.sender = ssync_sender.Sender(self.replicator, node, job, None) + self.sender.suffixes = ['abc'] + + class FakeBufferedHTTPConnection(NullBufferedHTTPConnection): + + def getresponse(*args, **kwargs): + eventlet.sleep(0.1) + + with mock.patch.object( + ssync_sender.bufferedhttp, 'BufferedHTTPConnection', + FakeBufferedHTTPConnection): + self.assertFalse(self.sender()) + call = self.replicator.logger.error.mock_calls[0] + self.assertEqual( + call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9')) + self.assertEqual(str(call[1][-1]), '0.02 seconds: connect receive') + + def test_connect_bad_status(self): + self.replicator.node_timeout = 0.02 + node = dict(ip='1.2.3.4', port=5678, device='sda1') + job = dict(partition='9') + self.sender = ssync_sender.Sender(self.replicator, node, job, None) + self.sender.suffixes = ['abc'] + + class FakeBufferedHTTPConnection(NullBufferedHTTPConnection): + def getresponse(*args, **kwargs): + response = FakeResponse() + response.status = 503 + return response + + with mock.patch.object( + ssync_sender.bufferedhttp, 'BufferedHTTPConnection', + FakeBufferedHTTPConnection): + self.assertFalse(self.sender()) + call = self.replicator.logger.error.mock_calls[0] + self.assertEqual( + call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9')) + self.assertEqual(str(call[1][-1]), 'Expected status 200; got 503') + + def test_readline_newline_in_buffer(self): + self.sender.response_buffer = 'Has a newline already.\r\nOkay.' + self.assertEqual(self.sender.readline(), 'Has a newline already.\r\n') + self.assertEqual(self.sender.response_buffer, 'Okay.') + + def test_readline_buffer_exceeds_network_chunk_size_somehow(self): + self.replicator.network_chunk_size = 2 + self.sender.response_buffer = '1234567890' + self.assertEqual(self.sender.readline(), '1234567890') + self.assertEqual(self.sender.response_buffer, '') + + def test_readline_at_start_of_chunk(self): + self.sender.response = FakeResponse() + self.sender.response.fp = StringIO.StringIO('2\r\nx\n\r\n') + self.assertEqual(self.sender.readline(), 'x\n') + + def test_readline_chunk_with_extension(self): + self.sender.response = FakeResponse() + self.sender.response.fp = StringIO.StringIO( + '2 ; chunk=extension\r\nx\n\r\n') + self.assertEqual(self.sender.readline(), 'x\n') + + def test_readline_broken_chunk(self): + self.sender.response = FakeResponse() + self.sender.response.fp = StringIO.StringIO('q\r\nx\n\r\n') + self.assertRaises( + exceptions.ReplicationException, self.sender.readline) + self.assertTrue(self.sender.response.close_called) + + def test_readline_terminated_chunk(self): + self.sender.response = FakeResponse() + self.sender.response.fp = StringIO.StringIO('b\r\nnot enough') + self.assertRaises( + exceptions.ReplicationException, self.sender.readline) + self.assertTrue(self.sender.response.close_called) + + def test_readline_all(self): + self.sender.response = FakeResponse() + self.sender.response.fp = StringIO.StringIO('2\r\nx\n\r\n0\r\n\r\n') + self.assertEqual(self.sender.readline(), 'x\n') + self.assertEqual(self.sender.readline(), '') + self.assertEqual(self.sender.readline(), '') + + def test_readline_all_trailing_not_newline_termed(self): + self.sender.response = FakeResponse() + self.sender.response.fp = StringIO.StringIO( + '2\r\nx\n\r\n3\r\n123\r\n0\r\n\r\n') + self.assertEqual(self.sender.readline(), 'x\n') + self.assertEqual(self.sender.readline(), '123') + self.assertEqual(self.sender.readline(), '') + self.assertEqual(self.sender.readline(), '') + + def test_missing_check_timeout(self): + self.sender.connection = FakeConnection() + self.sender.connection.send = lambda d: eventlet.sleep(1) + self.sender.daemon.node_timeout = 0.01 + self.assertRaises(exceptions.MessageTimeout, self.sender.missing_check) + + def test_missing_check_has_empty_suffixes(self): + def yield_hashes(device, partition, suffixes=None): + if device != 'dev' or partition != '9' or suffixes != [ + 'abc', 'def']: + yield # Just here to make this a generator + raise Exception( + 'No match for %r %r %r' % (device, partition, suffixes)) + + self.sender.connection = FakeConnection() + self.sender.job = {'device': 'dev', 'partition': '9'} + self.sender.suffixes = ['abc', 'def'] + self.sender.response = FakeResponse( + chunk_body=( + ':MISSING_CHECK: START\r\n' + ':MISSING_CHECK: END\r\n')) + self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.missing_check() + self.assertEqual( + ''.join(self.sender.connection.sent), + '17\r\n:MISSING_CHECK: START\r\n\r\n' + '15\r\n:MISSING_CHECK: END\r\n\r\n') + self.assertEqual(self.sender.send_list, []) + + def test_missing_check_has_suffixes(self): + def yield_hashes(device, partition, suffixes=None): + if device == 'dev' and partition == '9' and suffixes == [ + 'abc', 'def']: + yield ( + '/srv/node/dev/objects/9/abc/' + '9d41d8cd98f00b204e9800998ecf0abc', + '9d41d8cd98f00b204e9800998ecf0abc', + '1380144470.00000') + yield ( + '/srv/node/dev/objects/9/def/' + '9d41d8cd98f00b204e9800998ecf0def', + '9d41d8cd98f00b204e9800998ecf0def', + '1380144472.22222') + yield ( + '/srv/node/dev/objects/9/def/' + '9d41d8cd98f00b204e9800998ecf1def', + '9d41d8cd98f00b204e9800998ecf1def', + '1380144474.44444') + else: + raise Exception( + 'No match for %r %r %r' % (device, partition, suffixes)) + + self.sender.connection = FakeConnection() + self.sender.job = {'device': 'dev', 'partition': '9'} + self.sender.suffixes = ['abc', 'def'] + self.sender.response = FakeResponse( + chunk_body=( + ':MISSING_CHECK: START\r\n' + ':MISSING_CHECK: END\r\n')) + self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.missing_check() + self.assertEqual( + ''.join(self.sender.connection.sent), + '17\r\n:MISSING_CHECK: START\r\n\r\n' + '33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' + '33\r\n9d41d8cd98f00b204e9800998ecf0def 1380144472.22222\r\n\r\n' + '33\r\n9d41d8cd98f00b204e9800998ecf1def 1380144474.44444\r\n\r\n' + '15\r\n:MISSING_CHECK: END\r\n\r\n') + self.assertEqual(self.sender.send_list, []) + + def test_missing_check_far_end_disconnect(self): + def yield_hashes(device, partition, suffixes=None): + if device == 'dev' and partition == '9' and suffixes == ['abc']: + yield ( + '/srv/node/dev/objects/9/abc/' + '9d41d8cd98f00b204e9800998ecf0abc', + '9d41d8cd98f00b204e9800998ecf0abc', + '1380144470.00000') + else: + raise Exception( + 'No match for %r %r %r' % (device, partition, suffixes)) + + self.sender.connection = FakeConnection() + self.sender.job = {'device': 'dev', 'partition': '9'} + self.sender.suffixes = ['abc'] + self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.response = FakeResponse(chunk_body='\r\n') + exc = None + try: + self.sender.missing_check() + except exceptions.ReplicationException as err: + exc = err + self.assertEqual(str(exc), 'Early disconnect') + self.assertEqual( + ''.join(self.sender.connection.sent), + '17\r\n:MISSING_CHECK: START\r\n\r\n' + '33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' + '15\r\n:MISSING_CHECK: END\r\n\r\n') + + def test_missing_check_far_end_disconnect2(self): + def yield_hashes(device, partition, suffixes=None): + if device == 'dev' and partition == '9' and suffixes == ['abc']: + yield ( + '/srv/node/dev/objects/9/abc/' + '9d41d8cd98f00b204e9800998ecf0abc', + '9d41d8cd98f00b204e9800998ecf0abc', + '1380144470.00000') + else: + raise Exception( + 'No match for %r %r %r' % (device, partition, suffixes)) + + self.sender.connection = FakeConnection() + self.sender.job = {'device': 'dev', 'partition': '9'} + self.sender.suffixes = ['abc'] + self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.response = FakeResponse( + chunk_body=':MISSING_CHECK: START\r\n') + exc = None + try: + self.sender.missing_check() + except exceptions.ReplicationException as err: + exc = err + self.assertEqual(str(exc), 'Early disconnect') + self.assertEqual( + ''.join(self.sender.connection.sent), + '17\r\n:MISSING_CHECK: START\r\n\r\n' + '33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' + '15\r\n:MISSING_CHECK: END\r\n\r\n') + + def test_missing_check_far_end_unexpected(self): + def yield_hashes(device, partition, suffixes=None): + if device == 'dev' and partition == '9' and suffixes == ['abc']: + yield ( + '/srv/node/dev/objects/9/abc/' + '9d41d8cd98f00b204e9800998ecf0abc', + '9d41d8cd98f00b204e9800998ecf0abc', + '1380144470.00000') + else: + raise Exception( + 'No match for %r %r %r' % (device, partition, suffixes)) + + self.sender.connection = FakeConnection() + self.sender.job = {'device': 'dev', 'partition': '9'} + self.sender.suffixes = ['abc'] + self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.response = FakeResponse(chunk_body='OH HAI\r\n') + exc = None + try: + self.sender.missing_check() + except exceptions.ReplicationException as err: + exc = err + self.assertEqual(str(exc), "Unexpected response: 'OH HAI'") + self.assertEqual( + ''.join(self.sender.connection.sent), + '17\r\n:MISSING_CHECK: START\r\n\r\n' + '33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' + '15\r\n:MISSING_CHECK: END\r\n\r\n') + + def test_missing_check_send_list(self): + def yield_hashes(device, partition, suffixes=None): + if device == 'dev' and partition == '9' and suffixes == ['abc']: + yield ( + '/srv/node/dev/objects/9/abc/' + '9d41d8cd98f00b204e9800998ecf0abc', + '9d41d8cd98f00b204e9800998ecf0abc', + '1380144470.00000') + else: + raise Exception( + 'No match for %r %r %r' % (device, partition, suffixes)) + + self.sender.connection = FakeConnection() + self.sender.job = {'device': 'dev', 'partition': '9'} + self.sender.suffixes = ['abc'] + self.sender.response = FakeResponse( + chunk_body=( + ':MISSING_CHECK: START\r\n' + '0123abc\r\n' + ':MISSING_CHECK: END\r\n')) + self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.missing_check() + self.assertEqual( + ''.join(self.sender.connection.sent), + '17\r\n:MISSING_CHECK: START\r\n\r\n' + '33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' + '15\r\n:MISSING_CHECK: END\r\n\r\n') + self.assertEqual(self.sender.send_list, ['0123abc']) + + def test_updates_timeout(self): + self.sender.connection = FakeConnection() + self.sender.connection.send = lambda d: eventlet.sleep(1) + self.sender.daemon.node_timeout = 0.01 + self.assertRaises(exceptions.MessageTimeout, self.sender.updates) + + def test_updates_empty_send_list(self): + self.sender.connection = FakeConnection() + self.sender.send_list = [] + self.sender.response = FakeResponse( + chunk_body=( + ':UPDATES: START\r\n' + ':UPDATES: END\r\n')) + self.sender.updates() + self.assertEqual( + ''.join(self.sender.connection.sent), + '11\r\n:UPDATES: START\r\n\r\n' + 'f\r\n:UPDATES: END\r\n\r\n') + + def test_updates_unexpected_response_lines1(self): + self.sender.connection = FakeConnection() + self.sender.send_list = [] + self.sender.response = FakeResponse( + chunk_body=( + 'abc\r\n' + ':UPDATES: START\r\n' + ':UPDATES: END\r\n')) + exc = None + try: + self.sender.updates() + except exceptions.ReplicationException as err: + exc = err + self.assertEqual(str(exc), "Unexpected response: 'abc'") + self.assertEqual( + ''.join(self.sender.connection.sent), + '11\r\n:UPDATES: START\r\n\r\n' + 'f\r\n:UPDATES: END\r\n\r\n') + + def test_updates_unexpected_response_lines2(self): + self.sender.connection = FakeConnection() + self.sender.send_list = [] + self.sender.response = FakeResponse( + chunk_body=( + ':UPDATES: START\r\n' + 'abc\r\n' + ':UPDATES: END\r\n')) + exc = None + try: + self.sender.updates() + except exceptions.ReplicationException as err: + exc = err + self.assertEqual(str(exc), "Unexpected response: 'abc'") + self.assertEqual( + ''.join(self.sender.connection.sent), + '11\r\n:UPDATES: START\r\n\r\n' + 'f\r\n:UPDATES: END\r\n\r\n') + + def test_updates_is_deleted(self): + self.sender.connection = FakeConnection() + self.sender.job = {'device': 'dev', 'partition': '9'} + self.sender.node = {} + self.sender.send_list = ['0123abc'] + self.sender.send_delete = mock.MagicMock() + self.sender.send_put = mock.MagicMock() + self.sender.response = FakeResponse( + chunk_body=( + ':UPDATES: START\r\n' + ':UPDATES: END\r\n')) + df = self.sender.daemon._diskfile_mgr.get_diskfile_from_hash() + df.account = 'a' + df.container = 'c' + df.obj = 'o' + dfdel = exceptions.DiskFileDeleted() + dfdel.timestamp = '1381679759.90941' + df.open.side_effect = dfdel + self.sender.updates() + self.sender.send_delete.assert_called_once_with( + '/a/c/o', '1381679759.90941') + self.assertEqual(self.sender.send_put.mock_calls, []) + # note that the delete line isn't actually sent since we mock + # send_delete; send_delete is tested separately. + self.assertEqual( + ''.join(self.sender.connection.sent), + '11\r\n:UPDATES: START\r\n\r\n' + 'f\r\n:UPDATES: END\r\n\r\n') + + def test_updates_put(self): + self.sender.connection = FakeConnection() + self.sender.job = {'device': 'dev', 'partition': '9'} + self.sender.node = {} + self.sender.send_list = ['0123abc'] + df = mock.MagicMock() + df.get_metadata.return_value = {'Content-Length': 123} + self.sender.send_delete = mock.MagicMock() + self.sender.send_put = mock.MagicMock() + self.sender.response = FakeResponse( + chunk_body=( + ':UPDATES: START\r\n' + ':UPDATES: END\r\n')) + df = self.sender.daemon._diskfile_mgr.get_diskfile_from_hash() + df.account = 'a' + df.container = 'c' + df.obj = 'o' + self.sender.updates() + self.assertEqual(self.sender.send_delete.mock_calls, []) + self.sender.send_put.assert_called_once_with('/a/c/o', df) + # note that the put line isn't actually sent since we mock send_put; + # send_put is tested separately. + self.assertEqual( + ''.join(self.sender.connection.sent), + '11\r\n:UPDATES: START\r\n\r\n' + 'f\r\n:UPDATES: END\r\n\r\n') + + def test_updates_read_response_timeout_start(self): + self.sender.connection = FakeConnection() + self.sender.send_list = [] + self.sender.response = FakeResponse( + chunk_body=( + ':UPDATES: START\r\n' + ':UPDATES: END\r\n')) + orig_readline = self.sender.readline + + def delayed_readline(): + eventlet.sleep(1) + return orig_readline() + + self.sender.readline = delayed_readline + self.sender.daemon.http_timeout = 0.01 + self.assertRaises(exceptions.MessageTimeout, self.sender.updates) + + def test_updates_read_response_disconnect_start(self): + self.sender.connection = FakeConnection() + self.sender.send_list = [] + self.sender.response = FakeResponse(chunk_body='\r\n') + exc = None + try: + self.sender.updates() + except exceptions.ReplicationException as err: + exc = err + self.assertEqual(str(exc), 'Early disconnect') + self.assertEqual( + ''.join(self.sender.connection.sent), + '11\r\n:UPDATES: START\r\n\r\n' + 'f\r\n:UPDATES: END\r\n\r\n') + + def test_updates_read_response_unexp_start(self): + self.sender.connection = FakeConnection() + self.sender.send_list = [] + self.sender.response = FakeResponse( + chunk_body=( + 'anything else\r\n' + ':UPDATES: START\r\n' + ':UPDATES: END\r\n')) + exc = None + try: + self.sender.updates() + except exceptions.ReplicationException as err: + exc = err + self.assertEqual(str(exc), "Unexpected response: 'anything else'") + self.assertEqual( + ''.join(self.sender.connection.sent), + '11\r\n:UPDATES: START\r\n\r\n' + 'f\r\n:UPDATES: END\r\n\r\n') + + def test_updates_read_response_timeout_end(self): + self.sender.connection = FakeConnection() + self.sender.send_list = [] + self.sender.response = FakeResponse( + chunk_body=( + ':UPDATES: START\r\n' + ':UPDATES: END\r\n')) + orig_readline = self.sender.readline + + def delayed_readline(): + rv = orig_readline() + if rv == ':UPDATES: END\r\n': + eventlet.sleep(1) + return rv + + self.sender.readline = delayed_readline + self.sender.daemon.http_timeout = 0.01 + self.assertRaises(exceptions.MessageTimeout, self.sender.updates) + + def test_updates_read_response_disconnect_end(self): + self.sender.connection = FakeConnection() + self.sender.send_list = [] + self.sender.response = FakeResponse( + chunk_body=( + ':UPDATES: START\r\n' + '\r\n')) + exc = None + try: + self.sender.updates() + except exceptions.ReplicationException as err: + exc = err + self.assertEqual(str(exc), 'Early disconnect') + self.assertEqual( + ''.join(self.sender.connection.sent), + '11\r\n:UPDATES: START\r\n\r\n' + 'f\r\n:UPDATES: END\r\n\r\n') + + def test_updates_read_response_unexp_end(self): + self.sender.connection = FakeConnection() + self.sender.send_list = [] + self.sender.response = FakeResponse( + chunk_body=( + ':UPDATES: START\r\n' + 'anything else\r\n' + ':UPDATES: END\r\n')) + exc = None + try: + self.sender.updates() + except exceptions.ReplicationException as err: + exc = err + self.assertEqual(str(exc), "Unexpected response: 'anything else'") + self.assertEqual( + ''.join(self.sender.connection.sent), + '11\r\n:UPDATES: START\r\n\r\n' + 'f\r\n:UPDATES: END\r\n\r\n') + + def test_send_delete_timeout(self): + self.sender.connection = FakeConnection() + self.sender.connection.send = lambda d: eventlet.sleep(1) + self.sender.daemon.node_timeout = 0.01 + exc = None + try: + self.sender.send_delete('/a/c/o', '1381679759.90941') + except exceptions.MessageTimeout as err: + exc = err + self.assertEqual(str(exc), '0.01 seconds: send_delete') + + def test_send_delete(self): + self.sender.connection = FakeConnection() + self.sender.send_delete('/a/c/o', '1381679759.90941') + self.assertEqual( + ''.join(self.sender.connection.sent), + '30\r\n' + 'DELETE /a/c/o\r\n' + 'X-Timestamp: 1381679759.90941\r\n' + '\r\n\r\n') + + def test_send_put_initial_timeout(self): + self.sender.connection = FakeConnection() + df = mock.MagicMock() + self.sender.connection.send = lambda d: eventlet.sleep(1) + self.sender.daemon.node_timeout = 0.01 + df.get_metadata.return_value = { + 'name': '/a/c/o', + 'X-Timestamp': '1381679759.90941', + 'Content-Length': '3', + 'Etag': '900150983cd24fb0d6963f7d28e17f72', + 'Some-Other-Header': 'value'} + df.content_length = 3 + df.__iter__ = lambda x: iter(['ab', 'c']) + exc = None + try: + self.sender.send_put('/a/c/o', df) + except exceptions.MessageTimeout as err: + exc = err + self.assertEqual(str(exc), '0.01 seconds: send_put') + + def test_send_put_chunk_timeout(self): + self.sender.connection = FakeConnection() + df = mock.MagicMock() + self.sender.daemon.node_timeout = 0.01 + df.get_metadata.return_value = { + 'name': '/a/c/o', + 'X-Timestamp': '1381679759.90941', + 'Content-Length': '3', + 'Etag': '900150983cd24fb0d6963f7d28e17f72', + 'Some-Other-Header': 'value'} + + def iterator(dontcare): + self.sender.connection.send = lambda d: eventlet.sleep(1) + return iter(['ab', 'c']) + + df.content_length = 3 + df.reader().__iter__ = iterator + exc = None + try: + self.sender.send_put('/a/c/o', df) + except exceptions.MessageTimeout as err: + exc = err + self.assertEqual(str(exc), '0.01 seconds: send_put chunk') + + def test_send_put(self): + self.sender.connection = FakeConnection() + df = mock.MagicMock() + df.get_metadata.return_value = { + 'name': '/a/c/o', + 'X-Timestamp': '1381679759.90941', + 'Content-Length': '3', + 'Etag': '900150983cd24fb0d6963f7d28e17f72', + 'Some-Other-Header': 'value'} + df.content_length = 3 + df.reader().__iter__ = lambda x: iter(['ab', 'c']) + self.sender.send_put('/a/c/o', df) + self.assertEqual( + ''.join(self.sender.connection.sent), + '82\r\n' + 'PUT /a/c/o\r\n' + 'Content-Length: 3\r\n' + 'Etag: 900150983cd24fb0d6963f7d28e17f72\r\n' + 'Some-Other-Header: value\r\n' + 'X-Timestamp: 1381679759.90941\r\n' + '\r\n' + '\r\n' + '2\r\n' + 'ab\r\n' + '1\r\n' + 'c\r\n') + + def test_disconnect_timeout(self): + self.sender.connection = FakeConnection() + self.sender.connection.send = lambda d: eventlet.sleep(1) + self.sender.daemon.node_timeout = 0.01 + self.sender.disconnect() + self.assertEqual(''.join(self.sender.connection.sent), '') + self.assertTrue(self.sender.connection.closed) + + def test_disconnect(self): + self.sender.connection = FakeConnection() + self.sender.disconnect() + self.assertEqual(''.join(self.sender.connection.sent), '0\r\n\r\n') + self.assertTrue(self.sender.connection.closed) + + +if __name__ == '__main__': + unittest.main()