Object replication ssync (an rsync alternative)

For this commit, ssync is just a direct replacement for how
we use rsync. Assuming we switch over to ssync completely
someday and drop rsync, we will then be able to improve the
algorithms even further (removing local objects as we
successfully transfer each one rather than waiting for whole
partitions, using an index.db with hash-trees, etc., etc.)

For easier review, this commit can be thought of in distinct
parts:

1)  New global_conf_callback functionality for allowing
    services to perform setup code before workers, etc. are
    launched. (This is then used by ssync in the object
    server to create a cross-worker semaphore to restrict
    concurrent incoming replication.)

2)  A bit of shifting of items up from object server and
    replicator to diskfile or DEFAULT conf sections for
    better sharing of the same settings. conn_timeout,
    node_timeout, client_timeout, network_chunk_size,
    disk_chunk_size.

3)  Modifications to the object server and replicator to
    optionally use ssync in place of rsync. This is done in
    a generic enough way that switching to FutureSync should
    be easy someday.

4)  The biggest part, and (at least for now) completely
    optional part, are the new ssync_sender and
    ssync_receiver files. Nice and isolated for easier
    testing and visibility into test coverage, etc.

All the usual logging, statsd, recon, etc. instrumentation
is still there when using ssync, just as it is when using
rsync.

Beyond the essential error and exceptional condition
logging, I have not added any additional instrumentation at
this time. Unless there is something someone finds super
pressing to have added to the logging, I think such
additions would be better as separate change reviews.

FOR NOW, IT IS NOT RECOMMENDED TO USE SSYNC ON PRODUCTION
CLUSTERS. Some of us will be in a limited fashion to look
for any subtle issues, tuning, etc. but generally ssync is
an experimental feature. In its current implementation it is
probably going to be a bit slower than rsync, but if all
goes according to plan it will end up much faster.

There are no comparisions yet between ssync and rsync other
than some raw virtual machine testing I've done to show it
should compete well enough once we can put it in use in the
real world.

If you Tweet, Google+, or whatever, be sure to indicate it's
experimental. It'd be best to keep it out of deployment
guides, howtos, etc. until we all figure out if we like it,
find it to be stable, etc.

Change-Id: If003dcc6f4109e2d2a42f4873a0779110fff16d6
This commit is contained in:
gholt 2013-08-28 16:10:43 +00:00
parent deaddf003b
commit a80c720af5
20 changed files with 3530 additions and 75 deletions

View File

@ -16,7 +16,10 @@
from swift.common.utils import parse_options
from swift.common.wsgi import run_wsgi
from swift.obj import server
if __name__ == '__main__':
conf_file, options = parse_options()
run_wsgi(conf_file, 'object-server', default_port=6000, **options)
run_wsgi(conf_file, 'object-server', default_port=6000,
global_conf_callback=server.global_conf_callback, **options)

View File

@ -357,43 +357,67 @@ fallocate_reserve 0 You can set fallocate_reserve to the number of
when they completely run out of space; you can
make the services pretend they're out of space
early.
conn_timeout 0.5 Time to wait while attempting to connect to
another backend node.
node_timeout 3 Time to wait while sending each chunk of data
to another backend node.
client_timeout 60 Time to wait while receiving each chunk of
data from a client or another backend node.
network_chunk_size 65536 Size of chunks to read/write over the network
disk_chunk_size 65536 Size of chunks to read/write to disk
=================== ========== =============================================
.. _object-server-options:
[object-server]
================== ============= ===========================================
Option Default Description
------------------ ------------- -------------------------------------------
use paste.deploy entry point for the object
server. For most cases, this should be
`egg:swift#object`.
set log_name object-server Label used when logging
set log_facility LOG_LOCAL0 Syslog log facility
set log_level INFO Logging level
set log_requests True Whether or not to log each request
user swift User to run as
node_timeout 3 Request timeout to external services
conn_timeout 0.5 Connection timeout to external services
network_chunk_size 65536 Size of chunks to read/write over the
network
disk_chunk_size 65536 Size of chunks to read/write to disk
max_upload_time 86400 Maximum time allowed to upload an object
slow 0 If > 0, Minimum time in seconds for a PUT
or DELETE request to complete
mb_per_sync 512 On PUT requests, sync file every n MB
keep_cache_size 5242880 Largest object size to keep in buffer cache
keep_cache_private false Allow non-public objects to stay in
kernel's buffer cache
threads_per_disk 0 Size of the per-disk thread pool used for
performing disk I/O. The default of 0 means
to not use a per-disk thread pool. It is
recommended to keep this value small, as
large values can result in high read
latencies due to large queue depths. A good
starting point is 4 threads per disk.
================== ============= ===========================================
============================= ============= =================================
Option Default Description
----------------------------- ------------- ---------------------------------
use paste.deploy entry point for the
object server. For most cases,
this should be
`egg:swift#object`.
set log_name object-server Label used when logging
set log_facility LOG_LOCAL0 Syslog log facility
set log_level INFO Logging level
set log_requests True Whether or not to log each
request
user swift User to run as
max_upload_time 86400 Maximum time allowed to upload an
object
slow 0 If > 0, Minimum time in seconds
for a PUT or DELETE request to
complete
mb_per_sync 512 On PUT requests, sync file every
n MB
keep_cache_size 5242880 Largest object size to keep in
buffer cache
keep_cache_private false Allow non-public objects to stay
in kernel's buffer cache
threads_per_disk 0 Size of the per-disk thread pool
used for performing disk I/O. The
default of 0 means to not use a
per-disk thread pool. It is
recommended to keep this value
small, as large values can result
in high read latencies due to
large queue depths. A good
starting point is 4 threads per
disk.
replication_concurrency 4 Set to restrict the number of
concurrent incoming REPLICATION
requests; set to 0 for unlimited
replication_failure_threshold 100 The number of subrequest failures
before the
replication_failure_ratio is
checked
replication_failure_ratio 1.0 If the value of failures /
successes of REPLICATION
subrequests exceeds this ratio,
the overall REPLICATION request
will be aborted
============================= ============= =================================
[object-replicator]
@ -427,6 +451,11 @@ handoff_delete auto By default handoff partitions will be
replicated to n nodes. The default
setting should not be changed, except
for extremem situations.
node_timeout DEFAULT or 10 Request timeout to external services.
This uses what's set here, or what's set
in the DEFAULT section, or 10 (though
other sections use 3 as the final
default).
================== ================= =======================================
[object-updater]
@ -439,8 +468,10 @@ log_facility LOG_LOCAL0 Syslog log facility
log_level INFO Logging level
interval 300 Minimum time for a pass to take
concurrency 1 Number of updater workers to spawn
node_timeout 10 Request timeout to external services
conn_timeout 0.5 Connection timeout to external services
node_timeout DEFAULT or 10 Request timeout to external services. This
uses what's set here, or what's set in the
DEFAULT section, or 10 (though other
sections use 3 as the final default).
slowdown 0.01 Time in seconds to wait between objects
================== ============== ==========================================

View File

@ -24,6 +24,16 @@ Object Replicator
:undoc-members:
:show-inheritance:
.. automodule:: swift.obj.ssync_sender
:members:
:undoc-members:
:show-inheritance:
.. automodule:: swift.obj.ssync_receiver
:members:
:undoc-members:
:show-inheritance:
.. _object-updater:
Object Updater

View File

@ -93,6 +93,28 @@ systems, it was designed so that around 2% of the hash space on a normal node
will be invalidated per day, which has experimentally given us acceptable
replication speeds.
Work continues with a new ssync method where rsync is not used at all and
instead all-Swift code is used to transfer the objects. At first, this ssync
will just strive to emulate the rsync behavior. Once deemed stable it will open
the way for future improvements in replication since we'll be able to easily
add code in the replication path instead of trying to alter the rsync code
base and distributing such modifications.
One of the first improvements planned is an "index.db" that will replace the
hashes.pkl. This will allow quicker updates to that data as well as more
streamlined queries. Quite likely we'll implement a better scheme than the
current one hashes.pkl uses (hash-trees, that sort of thing).
Another improvement planned all along the way is separating the local disk
structure from the protocol path structure. This separation will allow ring
resizing at some point, or at least ring-doubling.
FOR NOW, IT IS NOT RECOMMENDED TO USE SSYNC ON PRODUCTION CLUSTERS. Some of us
will be in a limited fashion to look for any subtle issues, tuning, etc. but
generally ssync is an experimental feature. In its current implementation it is
probably going to be a bit slower than RSync, but if all goes according to plan
it will end up much faster.
-----------------------------
Dedicated replication network

View File

@ -44,6 +44,17 @@
# You can set fallocate_reserve to the number of bytes you'd like fallocate to
# reserve, whether there is space for the given file size or not.
# fallocate_reserve = 0
#
# Time to wait while attempting to connect to another backend node.
# conn_timeout = 0.5
# Time to wait while sending each chunk of data to another backend node.
# node_timeout = 3
# Time to wait while receiving each chunk of data from a client or another
# backend node.
# client_timeout = 60
#
# network_chunk_size = 65536
# disk_chunk_size = 65536
[pipeline:main]
pipeline = healthcheck recon object-server
@ -57,10 +68,6 @@ use = egg:swift#object
# set log_requests = true
# set log_address = /dev/log
#
# node_timeout = 3
# conn_timeout = 0.5
# network_chunk_size = 65536
# disk_chunk_size = 65536
# max_upload_time = 86400
# slow = 0
#
@ -81,6 +88,10 @@ use = egg:swift#object
#
# auto_create_account_prefix = .
#
# A value of 0 means "don't use thread pools". A reasonable starting point is
# 4.
# threads_per_disk = 0
#
# Configure parameter for creating specific server
# To handle all verbs, including replication verbs, do not specify
# "replication_server" (this is the default). To only handle replication,
@ -88,8 +99,20 @@ use = egg:swift#object
# verbs, set to "False". Unless you have a separate replication network, you
# should not specify any value for "replication_server".
# replication_server = false
# A value of 0 means "don't use thread pools". A reasonable starting point is 4.
# threads_per_disk = 0
#
# Set to restrict the number of concurrent incoming REPLICATION requests
# Set to 0 for unlimited
# Note that REPLICATION is currently an ssync only item
# replication_concurrency = 4
#
# These next two settings control when the REPLICATION subrequest handler will
# abort an incoming REPLICATION attempt. An abort will occur if there are at
# least threshold number of failures and the value of failures / successes
# exceeds the ratio. The defaults of 100 and 1.0 means that at least 100
# failures have to occur and there have to be more failures than successes for
# an abort to occur.
# replication_failure_threshold = 100
# replication_failure_ratio = 1.0
[filter:healthcheck]
use = egg:swift#healthcheck
@ -115,6 +138,12 @@ use = egg:swift#recon
# concurrency = 1
# stats_interval = 300
#
# The sync method to use; default is rsync but you can use ssync to try the
# EXPERIMENTAL all-swift-code-no-rsync-callouts method. Once verified as stable
# and nearly as efficient (or moreso) than rsync, we plan to deprecate rsync so
# we can move on with more features for replication.
# sync_method = rsync
#
# max duration of a partition rsync
# rsync_timeout = 900
#
@ -124,7 +153,9 @@ use = egg:swift#recon
# passed to rsync for io op timeout
# rsync_io_timeout = 30
#
# max duration of an http request
# node_timeout = <whatever's in the DEFAULT section or 10>
# max duration of an http request; this is for REPLICATE finalization calls and
# so should be longer than node_timeout
# http_timeout = 60
#
# attempts to kill all workers if nothing replicates for lockup_timeout seconds
@ -149,9 +180,7 @@ use = egg:swift#recon
#
# interval = 300
# concurrency = 1
# node_timeout = 10
# conn_timeout = 0.5
#
# node_timeout = <whatever's in the DEFAULT section or 10>
# slowdown will sleep that amount between objects
# slowdown = 0.01
#

View File

@ -118,3 +118,7 @@ class ListingIterNotAuthorized(ListingIterError):
class SegmentError(SwiftException):
pass
class ReplicationException(Exception):
pass

View File

@ -196,7 +196,7 @@ class RestrictedGreenPool(GreenPool):
self.waitall()
def run_server(conf, logger, sock):
def run_server(conf, logger, sock, global_conf=None):
# Ensure TZ environment variable exists to avoid stat('/etc/localtime') on
# some platforms. This locks in reported times to the timezone in which
# the server first starts running in locations that periodically change
@ -216,11 +216,13 @@ def run_server(conf, logger, sock):
eventlet_debug = config_true_value(conf.get('eventlet_debug', 'no'))
eventlet.debug.hub_exceptions(eventlet_debug)
# utils.LogAdapter stashes name in server; fallback on unadapted loggers
if hasattr(logger, 'server'):
log_name = logger.server
else:
log_name = logger.name
app = loadapp(conf['__file__'], global_conf={'log_name': log_name})
if not global_conf:
if hasattr(logger, 'server'):
log_name = logger.server
else:
log_name = logger.name
global_conf = {'log_name': log_name}
app = loadapp(conf['__file__'], global_conf=global_conf)
max_clients = int(conf.get('max_clients', '1024'))
pool = RestrictedGreenPool(size=max_clients)
try:
@ -252,8 +254,11 @@ def run_wsgi(conf_path, app_section, *args, **kwargs):
# remaining tasks should not require elevated privileges
drop_privileges(conf.get('user', 'swift'))
# Ensure the application can be loaded before proceeding.
loadapp(conf_path, global_conf={'log_name': log_name})
# Ensure the configuration and application can be loaded before proceeding.
global_conf = {'log_name': log_name}
if 'global_conf_callback' in kwargs:
kwargs['global_conf_callback'](conf, global_conf)
loadapp(conf_path, global_conf=global_conf)
# set utils.FALLOCATE_RESERVE if desired
reserve = int(conf.get('fallocate_reserve', 0))
@ -266,7 +271,7 @@ def run_wsgi(conf_path, app_section, *args, **kwargs):
# Useful for profiling [no forks].
if worker_count == 0:
run_server(conf, logger, sock)
run_server(conf, logger, sock, global_conf=global_conf)
return
def kill_children(*args):

View File

@ -53,7 +53,7 @@ from swift.common.constraints import check_mount
from swift.common.utils import mkdirs, normalize_timestamp, \
storage_directory, hash_path, renamer, fallocate, fsync, \
fdatasync, drop_buffer_cache, ThreadPool, lock_path, write_pickle, \
config_true_value, listdir
config_true_value, listdir, split_path
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \
DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \
DiskFileDeleted, DiskFileError, DiskFileNotOpen, PathNotDir
@ -381,6 +381,7 @@ class DiskFileManager(object):
self.keep_cache_size = int(conf.get('keep_cache_size', 5242880))
self.bytes_per_sync = int(conf.get('mb_per_sync', 512)) * 1024 * 1024
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
self.reclaim_age = int(conf.get('reclaim_age', ONE_WEEK))
threads_per_disk = int(conf.get('threads_per_disk', '0'))
self.threadpools = defaultdict(
lambda: ThreadPool(nthreads=threads_per_disk))
@ -443,6 +444,47 @@ class DiskFileManager(object):
self, audit_location.path, dev_path,
audit_location.partition)
def get_diskfile_from_hash(self, device, partition, object_hash, **kwargs):
"""
Returns a DiskFile instance for an object at the given
object_hash. Just in case someone thinks of refactoring, be
sure DiskFileDeleted is *not* raised, but the DiskFile
instance representing the tombstoned object is returned
instead.
:raises DiskFileNotExist: if the object does not exist
"""
dev_path = self.get_dev_path(device)
if not dev_path:
raise DiskFileDeviceUnavailable()
object_path = os.path.join(
dev_path, DATADIR, partition, object_hash[-3:], object_hash)
try:
filenames = hash_cleanup_listdir(object_path, self.reclaim_age)
except OSError as err:
if err.errno == errno.ENOTDIR:
quar_path = quarantine_renamer(dev_path, object_path)
logging.exception(
_('Quarantined %s to %s because it is not a '
'directory') % (object_path, quar_path))
raise DiskFileNotExist()
if err.errno != errno.ENOENT:
raise
raise DiskFileNotExist()
if not filenames:
raise DiskFileNotExist()
try:
metadata = read_metadata(os.path.join(object_path, filenames[-1]))
except EOFError:
raise DiskFileNotExist()
try:
account, container, obj = split_path(
metadata.get('name', ''), 3, 3, True)
except ValueError:
raise DiskFileNotExist()
return DiskFile(self, dev_path, self.threadpools[device],
partition, account, container, obj, **kwargs)
def get_hashes(self, device, partition, suffix):
dev_path = self.get_dev_path(device)
if not dev_path:
@ -455,6 +497,62 @@ class DiskFileManager(object):
get_hashes, partition_path, recalculate=suffixes)
return hashes
def _listdir(self, path):
try:
return os.listdir(path)
except OSError as err:
if err.errno != errno.ENOENT:
self.logger.error(
'ERROR: Skipping %r due to error with listdir attempt: %s',
path, err)
return []
def yield_suffixes(self, device, partition):
"""
Yields tuples of (full_path, suffix_only) for suffixes stored
on the given device and partition.
"""
dev_path = self.get_dev_path(device)
if not dev_path:
raise DiskFileDeviceUnavailable()
partition_path = os.path.join(dev_path, DATADIR, partition)
for suffix in self._listdir(partition_path):
if len(suffix) != 3:
continue
try:
int(suffix, 16)
except ValueError:
continue
yield (os.path.join(partition_path, suffix), suffix)
def yield_hashes(self, device, partition, suffixes=None):
"""
Yields tuples of (full_path, hash_only, timestamp) for object
information stored for the given device, partition, and
(optionally) suffixes. If suffixes is None, all stored
suffixes will be searched for object hashes. Note that if
suffixes is not None but empty, such as [], then nothing will
be yielded.
"""
dev_path = self.get_dev_path(device)
if not dev_path:
raise DiskFileDeviceUnavailable()
if suffixes is None:
suffixes = self.yield_suffixes(device, partition)
else:
partition_path = os.path.join(dev_path, DATADIR, partition)
suffixes = (
(os.path.join(partition_path, suffix), suffix)
for suffix in suffixes)
for suffix_path, suffix in suffixes:
for object_hash in self._listdir(suffix_path):
object_path = os.path.join(suffix_path, object_hash)
for name in hash_cleanup_listdir(
object_path, self.reclaim_age):
ts, ext = name.rsplit('.', 1)
yield (object_path, object_hash, ts)
break
class DiskFileWriter(object):
"""
@ -775,15 +873,25 @@ class DiskFile(object):
self._bytes_per_sync = mgr.bytes_per_sync
if account and container and obj:
self._name = '/' + '/'.join((account, container, obj))
self._account = account
self._container = container
self._obj = obj
name_hash = hash_path(account, container, obj)
self._datadir = join(
device_path, storage_directory(DATADIR, partition, name_hash))
else:
# gets populated when we read the metadata
self._name = None
self._account = None
self._container = None
self._obj = None
self._datadir = None
self._tmpdir = join(device_path, 'tmp')
self._metadata = None
self._data_file = None
self._fp = None
self._quarantined_dir = None
self._content_length = None
if _datadir:
self._datadir = _datadir
else:
@ -791,6 +899,30 @@ class DiskFile(object):
self._datadir = join(
device_path, storage_directory(DATADIR, partition, name_hash))
@property
def account(self):
return self._account
@property
def container(self):
return self._container
@property
def obj(self):
return self._obj
@property
def content_length(self):
if self._metadata is None:
raise DiskFileNotOpen()
return self._content_length
@property
def timestamp(self):
if self._metadata is None:
raise DiskFileNotOpen()
return self._metadata.get('X-Timestamp')
@classmethod
def from_hash_dir(cls, mgr, hash_dir_path, device_path, partition):
return cls(mgr, device_path, None, partition, _datadir=hash_dir_path)
@ -1036,6 +1168,7 @@ class DiskFile(object):
data_file, "metadata content-length %s does"
" not match actual object size %s" % (
metadata_size, statbuf.st_size))
self._content_length = obj_size
return obj_size
def _failsafe_read_metadata(self, source, quarantine_filename=None):

View File

@ -35,7 +35,8 @@ from swift.common.utils import whataremyips, unlink_older_than, \
from swift.common.bufferedhttp import http_connect
from swift.common.daemon import Daemon
from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
from swift.obj.diskfile import get_hashes
from swift.obj import ssync_sender
from swift.obj.diskfile import DiskFileManager, get_hashes
hubs.use_hub(get_hub())
@ -78,6 +79,11 @@ class ObjectReplicator(Daemon):
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.rcache = os.path.join(self.recon_cache_path, "object.recon")
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.node_timeout = float(conf.get('node_timeout', 10))
self.sync_method = getattr(self, conf.get('sync_method') or 'rsync')
self.network_chunk_size = int(conf.get('network_chunk_size', 65536))
self.disk_chunk_size = int(conf.get('disk_chunk_size', 65536))
self.headers = {
'Content-Length': '0',
'user-agent': 'obj-replicator %s' % os.getpid()}
@ -87,6 +93,20 @@ class ObjectReplicator(Daemon):
False))
self.handoff_delete = config_auto_int_value(
conf.get('handoff_delete', 'auto'), 0)
self._diskfile_mgr = DiskFileManager(conf, self.logger)
def sync(self, node, job, suffixes): # Just exists for doc anchor point
"""
Synchronize local suffix directories from a partition with a remote
node.
:param node: the "dev" entry for the remote node to sync with
:param job: information about the partition being synced
:param suffixes: a list of suffixes which need to be pushed
:returns: boolean indicating success or failure
"""
return self.sync_method(node, job, suffixes)
def _rsync(self, args):
"""
@ -135,14 +155,8 @@ class ObjectReplicator(Daemon):
def rsync(self, node, job, suffixes):
"""
Synchronize local suffix directories from a partition with a remote
node.
:param node: the "dev" entry for the remote node to sync with
:param job: information about the partition being synced
:param suffixes: a list of suffixes which need to be pushed
:returns: boolean indicating success or failure
Uses rsync to implement the sync method. This was the first
sync method in Swift.
"""
if not os.path.exists(job['path']):
return False
@ -175,6 +189,9 @@ class ObjectReplicator(Daemon):
'objects', job['partition']))
return self._rsync(args) == 0
def ssync(self, node, job, suffixes):
return ssync_sender.Sender(self, node, job, suffixes)()
def check_ring(self):
"""
Check to see if the ring has been updated
@ -206,7 +223,7 @@ class ObjectReplicator(Daemon):
suffixes = tpool.execute(tpool_get_suffixes, job['path'])
if suffixes:
for node in job['nodes']:
success = self.rsync(node, job, suffixes)
success = self.sync(node, job, suffixes)
if success:
with Timeout(self.http_timeout):
conn = http_connect(
@ -290,7 +307,7 @@ class ObjectReplicator(Daemon):
suffixes = [suffix for suffix in local_hash if
local_hash[suffix] !=
remote_hash.get(suffix, -1)]
self.rsync(node, job, suffixes)
self.sync(node, job, suffixes)
with Timeout(self.http_timeout):
conn = http_connect(
node['replication_ip'], node['replication_port'],
@ -380,7 +397,7 @@ class ObjectReplicator(Daemon):
def collect_jobs(self):
"""
Returns a sorted list of jobs (dictionaries) that specify the
partitions, nodes, etc to be rsynced.
partitions, nodes, etc to be synced.
"""
jobs = []
ips = whataremyips()

View File

@ -18,6 +18,7 @@
from __future__ import with_statement
import cPickle as pickle
import os
import multiprocessing
import time
import traceback
import socket
@ -35,6 +36,7 @@ from swift.common.constraints import check_object_creation, \
from swift.common.exceptions import ConnectionTimeout, DiskFileQuarantined, \
DiskFileNotExist, DiskFileCollision, DiskFileNoSpace, DiskFileDeleted, \
DiskFileDeviceUnavailable
from swift.obj import ssync_receiver
from swift.common.http import is_success
from swift.common.request_helpers import split_and_validate_path
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
@ -59,6 +61,8 @@ class ObjectController(object):
self.logger = get_logger(conf, log_route='object-server')
self.node_timeout = int(conf.get('node_timeout', 3))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.client_timeout = int(conf.get('client_timeout', 60))
self.disk_chunk_size = int(conf.get('disk_chunk_size', 65536))
self.network_chunk_size = int(conf.get('network_chunk_size', 65536))
self.log_requests = config_true_value(conf.get('log_requests', 'true'))
self.max_upload_time = int(conf.get('max_upload_time', 86400))
@ -122,6 +126,17 @@ class ObjectController(object):
# Common on-disk hierarchy shared across account, container and object
# servers.
self._diskfile_mgr = DiskFileManager(conf, self.logger)
# This is populated by global_conf_callback way below as the semaphore
# is shared by all workers.
if 'replication_semaphore' in conf:
# The value was put in a list so it could get past paste
self.replication_semaphore = conf['replication_semaphore'][0]
else:
self.replication_semaphore = None
self.replication_failure_threshold = int(
conf.get('replication_failure_threshold') or 100)
self.replication_failure_ratio = float(
conf.get('replication_failure_ratio') or 1.0)
def get_diskfile(self, device, partition, account, container, obj,
**kwargs):
@ -414,7 +429,9 @@ class ObjectController(object):
metadata.update(val for val in request.headers.iteritems()
if val[0].lower().startswith('x-object-meta-')
and len(val[0]) > 14)
for header_key in self.allowed_headers:
for header_key in (
request.headers.get('X-Backend-Replication-Headers') or
self.allowed_headers):
if header_key in request.headers:
header_caps = header_key.title()
metadata[header_caps] = request.headers[header_key]
@ -619,6 +636,12 @@ class ObjectController(object):
resp = Response(body=pickle.dumps(hashes))
return resp
@public
@replication
@timing_stats(sample_rate=0.1)
def REPLICATION(self, request):
return Response(app_iter=ssync_receiver.Receiver(self, request)())
def __call__(self, env, start_response):
"""WSGI Application entry point for the Swift Object Server."""
start_time = time.time()
@ -661,7 +684,8 @@ class ObjectController(object):
req.headers.get('x-trans-id', '-'),
req.user_agent or '-',
trans_time)
if req.method == 'REPLICATE':
if req.method in ('REPLICATE', 'REPLICATION') or \
'X-Backend-Replication' in req.headers:
self.logger.debug(log_line)
else:
self.logger.info(log_line)
@ -672,6 +696,30 @@ class ObjectController(object):
return res(env, start_response)
def global_conf_callback(preloaded_app_conf, global_conf):
"""
Callback for swift.common.wsgi.run_wsgi during the global_conf
creation so that we can add our replication_semaphore, used to
limit the number of concurrent REPLICATION_REQUESTS across all
workers.
:param preloaded_app_conf: The preloaded conf for the WSGI app.
This conf instance will go away, so
just read from it, don't write.
:param global_conf: The global conf that will eventually be
passed to the app_factory function later.
This conf is created before the worker
subprocesses are forked, so can be useful to
set up semaphores, shared memory, etc.
"""
replication_concurrency = int(
preloaded_app_conf.get('replication_concurrency') or 4)
if replication_concurrency:
# Have to put the value in a list so it can get past paste
global_conf['replication_semaphore'] = [
multiprocessing.BoundedSemaphore(replication_concurrency)]
def app_factory(global_conf, **local_conf):
"""paste.deploy app factory for creating WSGI object server apps"""
conf = global_conf.copy()

379
swift/obj/ssync_receiver.py Normal file
View File

@ -0,0 +1,379 @@
# Copyright (c) 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import with_statement
import urllib
import eventlet
import eventlet.wsgi
import eventlet.greenio
from swift.common import constraints
from swift.common import exceptions
from swift.common import http
from swift.common import swob
from swift.common import utils
class Receiver(object):
"""
Handles incoming REPLICATION requests to the object server.
These requests come from the object-replicator daemon that uses
:py:mod:`.ssync_sender`.
The number of concurrent REPLICATION requests is restricted by
use of a replication_semaphore and can be configured with the
object-server.conf [object-server] replication_concurrency
setting.
A REPLICATION request is really just an HTTP conduit for
sender/receiver replication communication. The overall
REPLICATION request should always succeed, but it will contain
multiple requests within its request and response bodies. This
"hack" is done so that replication concurrency can be managed.
The general process inside a REPLICATION request is:
1. Initialize the request: Basic request validation, mount check,
acquire semaphore lock, etc..
2. Missing check: Sender sends the hashes and timestamps of
the object information it can send, receiver sends back
the hashes it wants (doesn't have or has an older
timestamp).
3. Updates: Sender sends the object information requested.
4. Close down: Release semaphore lock, etc.
"""
def __init__(self, app, request):
self.app = app
self.request = request
self.device = None
self.partition = None
self.fp = None
self.disconnect = False
def __call__(self):
"""
Processes a REPLICATION request.
Acquires a semaphore lock and then proceeds through the steps
of the REPLICATION process.
"""
# The general theme for functions __call__ calls is that they should
# raise exceptions.MessageTimeout for client timeouts (logged locally),
# swob.HTTPException classes for exceptions to return to the caller but
# not log locally (unmounted, for example), and any other Exceptions
# will be logged with a full stack trace.
# This is because the client is never just some random user but
# is instead also our code and we definitely want to know if our code
# is broken or doing something unexpected.
try:
# Double try blocks in case our main error handlers fail.
try:
# intialize_request is for preamble items that can be done
# outside a replication semaphore lock.
for data in self.initialize_request():
yield data
# If semaphore is in use, try to acquire it, non-blocking, and
# return a 503 if it fails.
if self.app.replication_semaphore:
if not self.app.replication_semaphore.acquire(False):
raise swob.HTTPServiceUnavailable()
try:
for data in self.missing_check():
yield data
for data in self.updates():
yield data
finally:
if self.app.replication_semaphore:
self.app.replication_semaphore.release()
except exceptions.MessageTimeout as err:
self.app.logger.error(
'%s/%s/%s TIMEOUT in replication.Receiver: %s' % (
self.request.remote_addr, self.device, self.partition,
err))
yield ':ERROR: %d %r\n' % (408, str(err))
except swob.HTTPException as err:
body = ''.join(err({}, lambda *args: None))
yield ':ERROR: %d %r\n' % (err.status_int, body)
except Exception as err:
self.app.logger.exception(
'%s/%s/%s EXCEPTION in replication.Receiver' %
(self.request.remote_addr, self.device, self.partition))
yield ':ERROR: %d %r\n' % (0, str(err))
except Exception:
self.app.logger.exception('EXCEPTION in replication.Receiver')
if self.disconnect:
# This makes the socket close early so the remote side doesn't have
# to send its whole request while the lower Eventlet-level just
# reads it and throws it away. Instead, the connection is dropped
# and the remote side will get a broken-pipe exception.
try:
socket = self.request.environ['wsgi.input'].get_socket()
eventlet.greenio.shutdown_safe(socket)
socket.close()
except Exception:
pass # We're okay with the above failing.
def _ensure_flush(self):
"""
Sends a blank line sufficient to flush buffers.
This is to ensure Eventlet versions that don't support
eventlet.minimum_write_chunk_size will send any previous data
buffered.
If https://bitbucket.org/eventlet/eventlet/pull-request/37
ever gets released in an Eventlet version, we should make
this yield only for versions older than that.
"""
yield ' ' * eventlet.wsgi.MINIMUM_CHUNK_SIZE + '\r\n'
def initialize_request(self):
"""
Basic validation of request and mount check.
This function will be called before attempting to acquire a
replication semaphore lock, so contains only quick checks.
"""
# The following is the setting we talk about above in _ensure_flush.
self.request.environ['eventlet.minimum_write_chunk_size'] = 0
self.device, self.partition = utils.split_path(
urllib.unquote(self.request.path), 2, 2, False)
utils.validate_device_partition(self.device, self.partition)
if self.app._diskfile_mgr.mount_check and \
not constraints.check_mount(
self.app._diskfile_mgr.devices, self.device):
raise swob.HTTPInsufficientStorage(drive=self.device)
self.fp = self.request.environ['wsgi.input']
for data in self._ensure_flush():
yield data
def missing_check(self):
"""
Handles the receiver-side of the MISSING_CHECK step of a
REPLICATION request.
Receives a list of hashes and timestamps of object
information the sender can provide and responds with a list
of hashes desired, either because they're missing or have an
older timestamp locally.
The process is generally:
1. Sender sends `:MISSING_CHECK: START` and begins
sending `hash timestamp` lines.
2. Receiver gets `:MISSING_CHECK: START` and begins
reading the `hash timestamp` lines, collecting the
hashes of those it desires.
3. Sender sends `:MISSING_CHECK: END`.
4. Receiver gets `:MISSING_CHECK: END`, responds with
`:MISSING_CHECK: START`, followed by the list of
hashes it collected as being wanted (one per line),
`:MISSING_CHECK: END`, and flushes any buffers.
5. Sender gets `:MISSING_CHECK: START` and reads the list
of hashes desired by the receiver until reading
`:MISSING_CHECK: END`.
The collection and then response is so the sender doesn't
have to read while it writes to ensure network buffers don't
fill up and block everything.
"""
with exceptions.MessageTimeout(
self.app.client_timeout, 'missing_check start'):
line = self.fp.readline(self.app.network_chunk_size)
if line.strip() != ':MISSING_CHECK: START':
raise Exception(
'Looking for :MISSING_CHECK: START got %r' % line[:1024])
object_hashes = []
while True:
with exceptions.MessageTimeout(
self.app.client_timeout, 'missing_check line'):
line = self.fp.readline(self.app.network_chunk_size)
if not line or line.strip() == ':MISSING_CHECK: END':
break
object_hash, timestamp = [urllib.unquote(v) for v in line.split()]
want = False
try:
df = self.app._diskfile_mgr.get_diskfile_from_hash(
self.device, self.partition, object_hash)
except exceptions.DiskFileNotExist:
want = True
else:
try:
df.open()
except exceptions.DiskFileDeleted as err:
want = err.timestamp < timestamp
except exceptions.DiskFileError, err:
want = True
else:
want = df.timestamp < timestamp
if want:
object_hashes.append(object_hash)
yield ':MISSING_CHECK: START\r\n'
yield '\r\n'.join(object_hashes)
yield '\r\n'
yield ':MISSING_CHECK: END\r\n'
for data in self._ensure_flush():
yield data
def updates(self):
"""
Handles the UPDATES step of a REPLICATION request.
Receives a set of PUT and DELETE subrequests that will be
routed to the object server itself for processing. These
contain the information requested by the MISSING_CHECK step.
The PUT and DELETE subrequests are formatted pretty much
exactly like regular HTTP requests, excepting the HTTP
version on the first request line.
The process is generally:
1. Sender sends `:UPDATES: START` and begins sending the
PUT and DELETE subrequests.
2. Receiver gets `:UPDATES: START` and begins routing the
subrequests to the object server.
3. Sender sends `:UPDATES: END`.
4. Receiver gets `:UPDATES: END` and sends `:UPDATES:
START` and `:UPDATES: END` (assuming no errors).
5. Sender gets `:UPDATES: START` and `:UPDATES: END`.
If too many subrequests fail, as configured by
replication_failure_threshold and replication_failure_ratio,
the receiver will hang up the request early so as to not
waste any more time.
At step 4, the receiver will send back an error if there were
any failures (that didn't cause a hangup due to the above
thresholds) so the sender knows the whole was not entirely a
success. This is so the sender knows if it can remove an out
of place partition, for example.
"""
with exceptions.MessageTimeout(
self.app.client_timeout, 'updates start'):
line = self.fp.readline(self.app.network_chunk_size)
if line.strip() != ':UPDATES: START':
raise Exception('Looking for :UPDATES: START got %r' % line[:1024])
successes = 0
failures = 0
# We default to dropping the connection in case there is any exception
# raised during processing because otherwise the sender could send for
# quite some time before realizing it was all in vain.
self.disconnect = True
while True:
with exceptions.MessageTimeout(
self.app.client_timeout, 'updates line'):
line = self.fp.readline(self.app.network_chunk_size)
if not line or line.strip() == ':UPDATES: END':
break
# Read first line METHOD PATH of subrequest.
method, path = line.strip().split(' ', 1)
subreq = swob.Request.blank(
'/%s/%s%s' % (self.device, self.partition, path),
environ={'REQUEST_METHOD': method})
# Read header lines.
content_length = None
replication_headers = []
while True:
with exceptions.MessageTimeout(self.app.client_timeout):
line = self.fp.readline(self.app.network_chunk_size)
if not line:
raise Exception(
'Got no headers for %s %s' % (method, path))
line = line.strip()
if not line:
break
header, value = line.split(':', 1)
header = header.strip().lower()
value = value.strip()
subreq.headers[header] = value
replication_headers.append(header)
if header == 'content-length':
content_length = int(value)
# Establish subrequest body, if needed.
if method == 'DELETE':
if content_length not in (None, 0):
raise Exception(
'DELETE subrequest with content-length %s' % path)
elif method == 'PUT':
if content_length is None:
raise Exception(
'No content-length sent for %s %s' % (method, path))
def subreq_iter():
left = content_length
while left > 0:
with exceptions.MessageTimeout(
self.app.client_timeout,
'updates content'):
chunk = self.fp.read(
min(left, self.app.network_chunk_size))
if not chunk:
raise Exception(
'Early termination for %s %s' % (method, path))
left -= len(chunk)
yield chunk
subreq.environ['wsgi.input'] = utils.FileLikeIter(
subreq_iter())
else:
raise Exception('Invalid subrequest method %s' % method)
subreq.headers['X-Backend-Replication'] = 'True'
if replication_headers:
subreq.headers['X-Backend-Replication-Headers'] = \
' '.join(replication_headers)
# Route subrequest and translate response.
resp = subreq.get_response(self.app)
if http.is_success(resp.status_int) or \
resp.status_int == http.HTTP_NOT_FOUND:
successes += 1
else:
failures += 1
if failures >= self.app.replication_failure_threshold and (
not successes or
float(failures) / successes >
self.app.replication_failure_ratio):
raise Exception(
'Too many %d failures to %d successes' %
(failures, successes))
# The subreq may have failed, but we want to read the rest of the
# body from the remote side so we can continue on with the next
# subreq.
for junk in subreq.environ['wsgi.input']:
pass
if failures:
raise swob.HTTPInternalServerError(
'ERROR: With :UPDATES: %d failures to %d successes' %
(failures, successes))
# We didn't raise an exception, so end the request normally.
self.disconnect = False
yield ':UPDATES: START\r\n'
yield ':UPDATES: END\r\n'
for data in self._ensure_flush():
yield data

309
swift/obj/ssync_sender.py Normal file
View File

@ -0,0 +1,309 @@
# Copyright (c) 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import with_statement
import urllib
import eventlet
import eventlet.wsgi
import eventlet.greenio
from swift.common import bufferedhttp
from swift.common import exceptions
from swift.common import http
class Sender(object):
"""
Sends REPLICATION requests to the object server.
These requests are eventually handled by
:py:mod:`.ssync_receiver` and full documentation about the
process is there.
"""
def __init__(self, daemon, node, job, suffixes):
self.daemon = daemon
self.node = node
self.job = job
self.suffixes = suffixes
self.connection = None
self.response = None
self.response_buffer = ''
self.response_chunk_left = 0
self.send_list = None
self.failures = 0
def __call__(self):
if not self.suffixes:
return True
try:
# Double try blocks in case our main error handler fails.
try:
# The general theme for these functions is that they should
# raise exceptions.MessageTimeout for client timeouts and
# exceptions.ReplicationException for common issues that will
# abort the replication attempt and log a simple error. All
# other exceptions will be logged with a full stack trace.
self.connect()
self.missing_check()
self.updates()
self.disconnect()
return self.failures == 0
except (exceptions.MessageTimeout,
exceptions.ReplicationException) as err:
self.daemon.logger.error(
'%s:%s/%s/%s %s', self.node.get('ip'),
self.node.get('port'), self.node.get('device'),
self.job.get('partition'), err)
except Exception:
# We don't want any exceptions to escape our code and possibly
# mess up the original replicator code that called us since it
# was originally written to shell out to rsync which would do
# no such thing.
self.daemon.logger.exception(
'%s:%s/%s/%s EXCEPTION in replication.Sender',
self.node.get('ip'), self.node.get('port'),
self.node.get('device'), self.job.get('partition'))
except Exception:
# We don't want any exceptions to escape our code and possibly
# mess up the original replicator code that called us since it
# was originally written to shell out to rsync which would do
# no such thing.
# This particular exception handler does the minimal amount as it
# would only get called if the above except Exception handler
# failed (bad node or job data).
self.daemon.logger.exception('EXCEPTION in replication.Sender')
return False
def connect(self):
"""
Establishes a connection and starts a REPLICATION request
with the object server.
"""
with exceptions.MessageTimeout(
self.daemon.conn_timeout, 'connect send'):
self.connection = bufferedhttp.BufferedHTTPConnection(
'%s:%s' % (self.node['ip'], self.node['port']))
self.connection.putrequest('REPLICATION', '/%s/%s' % (
self.node['device'], self.job['partition']))
self.connection.putheader('Transfer-Encoding', 'chunked')
self.connection.endheaders()
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'connect receive'):
self.response = self.connection.getresponse()
if self.response.status != http.HTTP_OK:
raise exceptions.ReplicationException(
'Expected status %s; got %s' %
(http.HTTP_OK, self.response.status))
def readline(self):
"""
Reads a line from the REPLICATION response body.
httplib has no readline and will block on read(x) until x is
read, so we have to do the work ourselves. A bit of this is
taken from Python's httplib itself.
"""
data = self.response_buffer
self.response_buffer = ''
while '\n' not in data and len(data) < self.daemon.network_chunk_size:
if self.response_chunk_left == -1: # EOF-already indicator
break
if self.response_chunk_left == 0:
line = self.response.fp.readline()
i = line.find(';')
if i >= 0:
line = line[:i] # strip chunk-extensions
try:
self.response_chunk_left = int(line.strip(), 16)
except ValueError:
# close the connection as protocol synchronisation is
# probably lost
self.response.close()
raise exceptions.ReplicationException('Early disconnect')
if self.response_chunk_left == 0:
self.response_chunk_left = -1
break
chunk = self.response.fp.read(min(
self.response_chunk_left,
self.daemon.network_chunk_size - len(data)))
if not chunk:
# close the connection as protocol synchronisation is
# probably lost
self.response.close()
raise exceptions.ReplicationException('Early disconnect')
self.response_chunk_left -= len(chunk)
if self.response_chunk_left == 0:
self.response.fp.read(2) # discard the trailing \r\n
data += chunk
if '\n' in data:
data, self.response_buffer = data.split('\n', 1)
data += '\n'
return data
def missing_check(self):
"""
Handles the sender-side of the MISSING_CHECK step of a
REPLICATION request.
Full documentation of this can be found at
:py:meth:`.Receiver.missing_check`.
"""
# First, send our list.
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'missing_check start'):
msg = ':MISSING_CHECK: START\r\n'
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
for path, object_hash, timestamp in \
self.daemon._diskfile_mgr.yield_hashes(
self.job['device'], self.job['partition'], self.suffixes):
with exceptions.MessageTimeout(
self.daemon.node_timeout,
'missing_check send line'):
msg = '%s %s\r\n' % (
urllib.quote(object_hash),
urllib.quote(timestamp))
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'missing_check end'):
msg = ':MISSING_CHECK: END\r\n'
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
# Now, retrieve the list of what they want.
while True:
with exceptions.MessageTimeout(
self.daemon.http_timeout, 'missing_check start wait'):
line = self.readline()
if not line:
raise exceptions.ReplicationException('Early disconnect')
line = line.strip()
if line == ':MISSING_CHECK: START':
break
elif line:
raise exceptions.ReplicationException(
'Unexpected response: %r' % line[:1024])
self.send_list = []
while True:
with exceptions.MessageTimeout(
self.daemon.http_timeout, 'missing_check line wait'):
line = self.readline()
if not line:
raise exceptions.ReplicationException('Early disconnect')
line = line.strip()
if line == ':MISSING_CHECK: END':
break
if line:
self.send_list.append(line)
def updates(self):
"""
Handles the sender-side of the UPDATES step of a REPLICATION
request.
Full documentation of this can be found at
:py:meth:`.Receiver.updates`.
"""
# First, send all our subrequests based on the send_list.
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'updates start'):
msg = ':UPDATES: START\r\n'
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
for object_hash in self.send_list:
try:
df = self.daemon._diskfile_mgr.get_diskfile_from_hash(
self.job['device'], self.job['partition'], object_hash)
except exceptions.DiskFileNotExist:
continue
url_path = urllib.quote(
'/%s/%s/%s' % (df.account, df.container, df.obj))
try:
df.open()
except exceptions.DiskFileDeleted as err:
self.send_delete(url_path, err.timestamp)
except exceptions.DiskFileError:
pass
else:
self.send_put(url_path, df)
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'updates end'):
msg = ':UPDATES: END\r\n'
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
# Now, read their response for any issues.
while True:
with exceptions.MessageTimeout(
self.daemon.http_timeout, 'updates start wait'):
line = self.readline()
if not line:
raise exceptions.ReplicationException('Early disconnect')
line = line.strip()
if line == ':UPDATES: START':
break
elif line:
raise exceptions.ReplicationException(
'Unexpected response: %r' % line[:1024])
while True:
with exceptions.MessageTimeout(
self.daemon.http_timeout, 'updates line wait'):
line = self.readline()
if not line:
raise exceptions.ReplicationException('Early disconnect')
line = line.strip()
if line == ':UPDATES: END':
break
elif line:
raise exceptions.ReplicationException(
'Unexpected response: %r' % line[:1024])
def send_delete(self, url_path, timestamp):
"""
Sends a DELETE subrequest with the given information.
"""
msg = ['DELETE ' + url_path, 'X-Timestamp: ' + timestamp]
msg = '\r\n'.join(msg) + '\r\n\r\n'
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'send_delete'):
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
def send_put(self, url_path, df):
"""
Sends a PUT subrequest for the url_path using the source df
(DiskFile) and content_length.
"""
msg = ['PUT ' + url_path, 'Content-Length: ' + str(df.content_length)]
# Sorted to make it easier to test.
for key, value in sorted(df.get_metadata().iteritems()):
if key not in ('name', 'Content-Length'):
msg.append('%s: %s' % (key, value))
msg = '\r\n'.join(msg) + '\r\n\r\n'
with exceptions.MessageTimeout(self.daemon.node_timeout, 'send_put'):
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
for chunk in df.reader(iter_hook=eventlet.sleep):
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'send_put chunk'):
self.connection.send('%x\r\n%s\r\n' % (len(chunk), chunk))
def disconnect(self):
"""
Closes down the connection to the object server once done
with the REPLICATION request.
"""
try:
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'disconnect'):
self.connection.send('0\r\n\r\n')
except (Exception, exceptions.Timeout):
pass # We're okay with the above failing.
self.connection.close()

View File

@ -196,6 +196,11 @@ class Application(object):
try:
if self.memcache is None:
self.memcache = cache_from_env(env)
# Remove any x-backend-* headers since those are reserved for use
# by backends communicating with each other; no end user should be
# able to send those into the cluster.
for key in list(k for k in env if k.startswith('HTTP_X_BACKEND_')):
del env[key]
req = self.update_request(Request(env))
return self.handle_request(req)(env, start_response)
except UnicodeError:

View File

@ -16,12 +16,14 @@
# TODO(creiht): Tests
import unittest
from swift.common import exceptions
class TestExceptions(unittest.TestCase):
def test_placeholder(self):
pass
def test_replication_exception(self):
self.assertEqual(str(exceptions.ReplicationException()), '')
self.assertEqual(str(exceptions.ReplicationException('test')), 'test')
if __name__ == '__main__':

View File

@ -25,6 +25,7 @@ import os
import pickle
from textwrap import dedent
from gzip import GzipFile
from contextlib import nested
from StringIO import StringIO
from collections import defaultdict
from contextlib import closing
@ -33,6 +34,8 @@ from urllib import quote
from eventlet import listen
import swift
import mock
from swift.common.swob import Request
from swift.common import wsgi, utils, ring
@ -498,6 +501,40 @@ class TestWSGI(unittest.TestCase):
self.assertEquals(r.body, 'the body')
self.assertEquals(r.environ['swift.source'], 'UT')
def test_run_server_global_conf_callback(self):
calls = defaultdict(lambda: 0)
def _initrp(conf_file, app_section, *args, **kwargs):
return (
{'__file__': 'test', 'workers': 0},
'logger',
'log_name')
def _global_conf_callback(preloaded_app_conf, global_conf):
calls['_global_conf_callback'] += 1
self.assertEqual(
preloaded_app_conf, {'__file__': 'test', 'workers': 0})
self.assertEqual(global_conf, {'log_name': 'log_name'})
global_conf['test1'] = 'one'
def _loadapp(uri, name=None, **kwargs):
calls['_loadapp'] += 1
self.assertTrue('global_conf' in kwargs)
self.assertEqual(kwargs['global_conf'],
{'log_name': 'log_name', 'test1': 'one'})
with nested(
mock.patch.object(wsgi, '_initrp', _initrp),
mock.patch.object(wsgi, 'get_socket'),
mock.patch.object(wsgi, 'drop_privileges'),
mock.patch.object(wsgi, 'loadapp', _loadapp),
mock.patch.object(wsgi, 'capture_stdio'),
mock.patch.object(wsgi, 'run_server')):
wsgi.run_wsgi('conf_file', 'app_section',
global_conf_callback=_global_conf_callback)
self.assertEqual(calls['_global_conf_callback'], 1)
self.assertEqual(calls['_loadapp'], 1)
def test_pre_auth_req_with_empty_env_no_path(self):
r = wsgi.make_pre_authed_request(
{}, 'GET')

View File

@ -30,7 +30,7 @@ from shutil import rmtree
from time import time
from tempfile import mkdtemp
from hashlib import md5
from contextlib import closing
from contextlib import closing, nested
from gzip import GzipFile
from eventlet import tpool
@ -1160,3 +1160,371 @@ class TestDiskFile(unittest.TestCase):
reader.close()
log_lines = df._logger.get_lines_for_level('error')
self.assert_('a very special error' in log_lines[-1])
def test_get_diskfile_from_hash_dev_path_fail(self):
self.df_mgr.get_dev_path = mock.MagicMock(return_value=None)
with nested(
mock.patch('swift.obj.diskfile.DiskFile'),
mock.patch('swift.obj.diskfile.hash_cleanup_listdir'),
mock.patch('swift.obj.diskfile.read_metadata')) as \
(dfclass, hclistdir, readmeta):
hclistdir.return_value = ['1381679759.90941.data']
readmeta.return_value = {'name': '/a/c/o'}
self.assertRaises(
DiskFileDeviceUnavailable,
self.df_mgr.get_diskfile_from_hash,
'dev', '9', '9a7175077c01a23ade5956b8a2bba900')
def test_get_diskfile_from_hash_not_dir(self):
self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
with nested(
mock.patch('swift.obj.diskfile.DiskFile'),
mock.patch('swift.obj.diskfile.hash_cleanup_listdir'),
mock.patch('swift.obj.diskfile.read_metadata'),
mock.patch('swift.obj.diskfile.quarantine_renamer')) as \
(dfclass, hclistdir, readmeta, quarantine_renamer):
osexc = OSError()
osexc.errno = errno.ENOTDIR
hclistdir.side_effect = osexc
readmeta.return_value = {'name': '/a/c/o'}
self.assertRaises(
DiskFileNotExist,
self.df_mgr.get_diskfile_from_hash,
'dev', '9', '9a7175077c01a23ade5956b8a2bba900')
quarantine_renamer.assert_called_once_with(
'/srv/dev/',
'/srv/dev/objects/9/900/9a7175077c01a23ade5956b8a2bba900')
def test_get_diskfile_from_hash_no_dir(self):
self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
with nested(
mock.patch('swift.obj.diskfile.DiskFile'),
mock.patch('swift.obj.diskfile.hash_cleanup_listdir'),
mock.patch('swift.obj.diskfile.read_metadata')) as \
(dfclass, hclistdir, readmeta):
osexc = OSError()
osexc.errno = errno.ENOENT
hclistdir.side_effect = osexc
readmeta.return_value = {'name': '/a/c/o'}
self.assertRaises(
DiskFileNotExist,
self.df_mgr.get_diskfile_from_hash,
'dev', '9', '9a7175077c01a23ade5956b8a2bba900')
def test_get_diskfile_from_hash_other_oserror(self):
self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
with nested(
mock.patch('swift.obj.diskfile.DiskFile'),
mock.patch('swift.obj.diskfile.hash_cleanup_listdir'),
mock.patch('swift.obj.diskfile.read_metadata')) as \
(dfclass, hclistdir, readmeta):
osexc = OSError()
hclistdir.side_effect = osexc
readmeta.return_value = {'name': '/a/c/o'}
self.assertRaises(
OSError,
self.df_mgr.get_diskfile_from_hash,
'dev', '9', '9a7175077c01a23ade5956b8a2bba900')
def test_get_diskfile_from_hash_no_actual_files(self):
self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
with nested(
mock.patch('swift.obj.diskfile.DiskFile'),
mock.patch('swift.obj.diskfile.hash_cleanup_listdir'),
mock.patch('swift.obj.diskfile.read_metadata')) as \
(dfclass, hclistdir, readmeta):
hclistdir.return_value = []
readmeta.return_value = {'name': '/a/c/o'}
self.assertRaises(
DiskFileNotExist,
self.df_mgr.get_diskfile_from_hash,
'dev', '9', '9a7175077c01a23ade5956b8a2bba900')
def test_get_diskfile_from_hash_read_metadata_problem(self):
self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
with nested(
mock.patch('swift.obj.diskfile.DiskFile'),
mock.patch('swift.obj.diskfile.hash_cleanup_listdir'),
mock.patch('swift.obj.diskfile.read_metadata')) as \
(dfclass, hclistdir, readmeta):
hclistdir.return_value = ['1381679759.90941.data']
readmeta.side_effect = EOFError()
self.assertRaises(
DiskFileNotExist,
self.df_mgr.get_diskfile_from_hash,
'dev', '9', '9a7175077c01a23ade5956b8a2bba900')
def test_get_diskfile_from_hash_no_meta_name(self):
self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
with nested(
mock.patch('swift.obj.diskfile.DiskFile'),
mock.patch('swift.obj.diskfile.hash_cleanup_listdir'),
mock.patch('swift.obj.diskfile.read_metadata')) as \
(dfclass, hclistdir, readmeta):
hclistdir.return_value = ['1381679759.90941.data']
readmeta.return_value = {}
try:
self.df_mgr.get_diskfile_from_hash(
'dev', '9', '9a7175077c01a23ade5956b8a2bba900')
except DiskFileNotExist as err:
exc = err
self.assertEqual(str(exc), '')
def test_get_diskfile_from_hash_bad_meta_name(self):
self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
with nested(
mock.patch('swift.obj.diskfile.DiskFile'),
mock.patch('swift.obj.diskfile.hash_cleanup_listdir'),
mock.patch('swift.obj.diskfile.read_metadata')) as \
(dfclass, hclistdir, readmeta):
hclistdir.return_value = ['1381679759.90941.data']
readmeta.return_value = {'name': 'bad'}
try:
self.df_mgr.get_diskfile_from_hash(
'dev', '9', '9a7175077c01a23ade5956b8a2bba900')
except DiskFileNotExist as err:
exc = err
self.assertEqual(str(exc), '')
def test_get_diskfile_from_hash(self):
self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
with nested(
mock.patch('swift.obj.diskfile.DiskFile'),
mock.patch('swift.obj.diskfile.hash_cleanup_listdir'),
mock.patch('swift.obj.diskfile.read_metadata')) as \
(dfclass, hclistdir, readmeta):
hclistdir.return_value = ['1381679759.90941.data']
readmeta.return_value = {'name': '/a/c/o'}
self.df_mgr.get_diskfile_from_hash(
'dev', '9', '9a7175077c01a23ade5956b8a2bba900')
dfclass.assert_called_once_with(
self.df_mgr, '/srv/dev/', self.df_mgr.threadpools['dev'], '9',
'a', 'c', 'o')
hclistdir.assert_called_once_with(
'/srv/dev/objects/9/900/9a7175077c01a23ade5956b8a2bba900',
604800)
readmeta.assert_called_once_with(
'/srv/dev/objects/9/900/9a7175077c01a23ade5956b8a2bba900/'
'1381679759.90941.data')
def test_listdir_enoent(self):
oserror = OSError()
oserror.errno = errno.ENOENT
self.df_mgr.logger.error = mock.MagicMock()
with mock.patch('os.listdir', side_effect=oserror):
self.assertEqual(self.df_mgr._listdir('path'), [])
self.assertEqual(self.df_mgr.logger.error.mock_calls, [])
def test_listdir_other_oserror(self):
oserror = OSError()
self.df_mgr.logger.error = mock.MagicMock()
with mock.patch('os.listdir', side_effect=oserror):
self.assertEqual(self.df_mgr._listdir('path'), [])
self.df_mgr.logger.error.assert_called_once_with(
'ERROR: Skipping %r due to error with listdir attempt: %s',
'path', oserror)
def test_listdir(self):
self.df_mgr.logger.error = mock.MagicMock()
with mock.patch('os.listdir', return_value=['abc', 'def']):
self.assertEqual(self.df_mgr._listdir('path'), ['abc', 'def'])
self.assertEqual(self.df_mgr.logger.error.mock_calls, [])
def test_yield_suffixes_dev_path_fail(self):
self.df_mgr.get_dev_path = mock.MagicMock(return_value=None)
exc = None
try:
list(self.df_mgr.yield_suffixes('dev', '9'))
except DiskFileDeviceUnavailable as err:
exc = err
self.assertEqual(str(exc), '')
def test_yield_suffixes(self):
self.df_mgr._listdir = mock.MagicMock(return_value=[
'abc', 'def', 'ghi', 'abcd', '012'])
self.assertEqual(
list(self.df_mgr.yield_suffixes('dev', '9')),
[(self.testdir + '/dev/objects/9/abc', 'abc'),
(self.testdir + '/dev/objects/9/def', 'def'),
(self.testdir + '/dev/objects/9/012', '012')])
def test_yield_hashes_dev_path_fail(self):
self.df_mgr.get_dev_path = mock.MagicMock(return_value=None)
exc = None
try:
list(self.df_mgr.yield_hashes('dev', '9'))
except DiskFileDeviceUnavailable as err:
exc = err
self.assertEqual(str(exc), '')
def test_yield_hashes_empty(self):
def _listdir(path):
return []
with mock.patch('os.listdir', _listdir):
self.assertEqual(list(self.df_mgr.yield_hashes('dev', '9')), [])
def test_yield_hashes_empty_suffixes(self):
def _listdir(path):
return []
with mock.patch('os.listdir', _listdir):
self.assertEqual(
list(self.df_mgr.yield_hashes('dev', '9', suffixes=['456'])),
[])
def test_yield_hashes(self):
fresh_ts = normalize_timestamp(time() - 10)
fresher_ts = normalize_timestamp(time() - 1)
def _listdir(path):
if path.endswith('/dev/objects/9'):
return ['abc', '456', 'def']
elif path.endswith('/dev/objects/9/abc'):
return ['9373a92d072897b136b3fc06595b4abc']
elif path.endswith(
'/dev/objects/9/abc/9373a92d072897b136b3fc06595b4abc'):
return [fresh_ts + '.ts']
elif path.endswith('/dev/objects/9/456'):
return ['9373a92d072897b136b3fc06595b0456',
'9373a92d072897b136b3fc06595b7456']
elif path.endswith(
'/dev/objects/9/456/9373a92d072897b136b3fc06595b0456'):
return ['1383180000.12345.data']
elif path.endswith(
'/dev/objects/9/456/9373a92d072897b136b3fc06595b7456'):
return [fresh_ts + '.ts',
fresher_ts + '.data']
elif path.endswith('/dev/objects/9/def'):
return []
else:
raise Exception('Unexpected listdir of %r' % path)
with nested(
mock.patch('os.listdir', _listdir),
mock.patch('os.unlink')):
self.assertEqual(
list(self.df_mgr.yield_hashes('dev', '9')),
[(self.testdir +
'/dev/objects/9/abc/9373a92d072897b136b3fc06595b4abc',
'9373a92d072897b136b3fc06595b4abc', fresh_ts),
(self.testdir +
'/dev/objects/9/456/9373a92d072897b136b3fc06595b0456',
'9373a92d072897b136b3fc06595b0456', '1383180000.12345'),
(self.testdir +
'/dev/objects/9/456/9373a92d072897b136b3fc06595b7456',
'9373a92d072897b136b3fc06595b7456', fresher_ts)])
def test_yield_hashes_suffixes(self):
fresh_ts = normalize_timestamp(time() - 10)
fresher_ts = normalize_timestamp(time() - 1)
def _listdir(path):
if path.endswith('/dev/objects/9'):
return ['abc', '456', 'def']
elif path.endswith('/dev/objects/9/abc'):
return ['9373a92d072897b136b3fc06595b4abc']
elif path.endswith(
'/dev/objects/9/abc/9373a92d072897b136b3fc06595b4abc'):
return [fresh_ts + '.ts']
elif path.endswith('/dev/objects/9/456'):
return ['9373a92d072897b136b3fc06595b0456',
'9373a92d072897b136b3fc06595b7456']
elif path.endswith(
'/dev/objects/9/456/9373a92d072897b136b3fc06595b0456'):
return ['1383180000.12345.data']
elif path.endswith(
'/dev/objects/9/456/9373a92d072897b136b3fc06595b7456'):
return [fresh_ts + '.ts',
fresher_ts + '.data']
elif path.endswith('/dev/objects/9/def'):
return []
else:
raise Exception('Unexpected listdir of %r' % path)
with nested(
mock.patch('os.listdir', _listdir),
mock.patch('os.unlink')):
self.assertEqual(
list(self.df_mgr.yield_hashes(
'dev', '9', suffixes=['456'])),
[(self.testdir +
'/dev/objects/9/456/9373a92d072897b136b3fc06595b0456',
'9373a92d072897b136b3fc06595b0456', '1383180000.12345'),
(self.testdir +
'/dev/objects/9/456/9373a92d072897b136b3fc06595b7456',
'9373a92d072897b136b3fc06595b7456', fresher_ts)])
def test_diskfile_names(self):
df = self.df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o')
self.assertEqual(df.account, 'a')
self.assertEqual(df.container, 'c')
self.assertEqual(df.obj, 'o')
def test_diskfile_content_length_not_open(self):
df = self.df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o')
exc = None
try:
df.content_length
except DiskFileNotOpen as err:
exc = err
self.assertEqual(str(exc), '')
def test_diskfile_content_length_deleted(self):
df = self._get_open_disk_file()
ts = time()
df.delete(ts)
exp_name = '%s.ts' % str(normalize_timestamp(ts))
dl = os.listdir(df._datadir)
self.assertEquals(len(dl), 1)
self.assertTrue(exp_name in set(dl))
df = self.df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o')
exc = None
try:
with df.open():
df.content_length
except DiskFileDeleted as err:
exc = err
self.assertEqual(str(exc), '')
def test_diskfile_content_length(self):
self._get_open_disk_file()
df = self.df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o')
with df.open():
self.assertEqual(df.content_length, 1024)
def test_diskfile_timestamp_not_open(self):
df = self.df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o')
exc = None
try:
df.timestamp
except DiskFileNotOpen as err:
exc = err
self.assertEqual(str(exc), '')
def test_diskfile_timestamp_deleted(self):
df = self._get_open_disk_file()
ts = time()
df.delete(ts)
exp_name = '%s.ts' % str(normalize_timestamp(ts))
dl = os.listdir(df._datadir)
self.assertEquals(len(dl), 1)
self.assertTrue(exp_name in set(dl))
df = self.df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o')
exc = None
try:
with df.open():
df.timestamp
except DiskFileDeleted as err:
exc = err
self.assertEqual(str(exc), '')
def test_diskfile_timestamp(self):
self._get_open_disk_file(ts='1383181759.12345')
df = self.df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o')
with df.open():
self.assertEqual(df.timestamp, '1383181759.12345')
if __name__ == '__main__':
unittest.main()

View File

@ -554,6 +554,12 @@ class TestObjectReplicator(unittest.TestCase):
mock_http_connect(200)):
self.replicator.replicate()
def test_sync_just_calls_sync_method(self):
self.replicator.sync_method = mock.MagicMock()
self.replicator.sync('node', 'job', 'suffixes')
self.replicator.sync_method.assert_called_once_with(
'node', 'job', 'suffixes')
@mock.patch('swift.obj.replicator.tpool_reraise', autospec=True)
@mock.patch('swift.obj.replicator.http_connect', autospec=True)
def test_update(self, mock_http, mock_tpool_reraise):
@ -638,13 +644,13 @@ class TestObjectReplicator(unittest.TestCase):
self.assertEquals(self.replicator.suffix_count, 0)
mock_logger.reset_mock()
# Check seccesfull http_connect and rsync for local node
# Check successful http_connect and sync for local node
mock_tpool_reraise.return_value = (1, {'a83': 'ba47fd314242ec8c'
'7efb91f5d57336e4'})
resp.read.return_value = pickle.dumps({'a83': 'c130a2c17ed45102a'
'ada0f4eee69494ff'})
set_default(self)
self.replicator.rsync = fake_func = mock.MagicMock()
self.replicator.sync = fake_func = mock.MagicMock()
self.replicator.update(local_job)
reqs = []
for node in local_job['nodes']:

View File

@ -2819,6 +2819,26 @@ class TestObjectController(unittest.TestCase):
finally:
diskfile.fallocate = orig_fallocate
def test_global_conf_callback_does_nothing(self):
preloaded_app_conf = {}
global_conf = {}
object_server.global_conf_callback(preloaded_app_conf, global_conf)
self.assertEqual(preloaded_app_conf, {})
self.assertEqual(global_conf.keys(), ['replication_semaphore'])
self.assertEqual(
global_conf['replication_semaphore'][0].get_value(), 4)
def test_global_conf_callback_replication_semaphore(self):
preloaded_app_conf = {'replication_concurrency': 123}
global_conf = {}
with mock.patch.object(
object_server.multiprocessing, 'BoundedSemaphore',
return_value='test1') as mocked_Semaphore:
object_server.global_conf_callback(preloaded_app_conf, global_conf)
self.assertEqual(preloaded_app_conf, {'replication_concurrency': 123})
self.assertEqual(global_conf, {'replication_semaphore': ['test1']})
mocked_Semaphore.assert_called_once_with(123)
def test_serv_reserv(self):
# Test replication_server flag was set from configuration file.
conf = {'devices': self.testdir, 'mount_check': 'false'}
@ -2836,7 +2856,7 @@ class TestObjectController(unittest.TestCase):
def test_list_allowed_methods(self):
# Test list of allowed_methods
obj_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'POST']
repl_methods = ['REPLICATE']
repl_methods = ['REPLICATE', 'REPLICATION']
for method_name in obj_methods:
method = getattr(self.object_controller, method_name)
self.assertFalse(hasattr(method, 'replication'))

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,800 @@
# Copyright (c) 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import StringIO
import unittest
import eventlet
import mock
from swift.common import exceptions
from swift.obj import ssync_sender
class FakeReplicator(object):
def __init__(self):
self.logger = mock.MagicMock()
self.conn_timeout = 1
self.node_timeout = 2
self.http_timeout = 3
self.network_chunk_size = 65536
self.disk_chunk_size = 4096
self._diskfile_mgr = mock.MagicMock()
class NullBufferedHTTPConnection(object):
def __init__(*args, **kwargs):
pass
def putrequest(*args, **kwargs):
pass
def putheader(*args, **kwargs):
pass
def endheaders(*args, **kwargs):
pass
def getresponse(*args, **kwargs):
pass
class FakeResponse(object):
def __init__(self, chunk_body=''):
self.status = 200
self.close_called = False
if chunk_body:
self.fp = StringIO.StringIO(
'%x\r\n%s\r\n0\r\n\r\n' % (len(chunk_body), chunk_body))
def close(self):
self.close_called = True
class FakeConnection(object):
def __init__(self):
self.sent = []
self.closed = False
def send(self, data):
self.sent.append(data)
def close(self):
self.closed = True
class TestSender(unittest.TestCase):
def setUp(self):
self.replicator = FakeReplicator()
self.sender = ssync_sender.Sender(self.replicator, None, None, None)
def test_call_catches_MessageTimeout(self):
def connect(self):
exc = exceptions.MessageTimeout(1, 'test connect')
# Cancels Eventlet's raising of this since we're about to do it.
exc.cancel()
raise exc
with mock.patch.object(ssync_sender.Sender, 'connect', connect):
node = dict(ip='1.2.3.4', port=5678, device='sda1')
job = dict(partition='9')
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
self.assertFalse(self.sender())
call = self.replicator.logger.error.mock_calls[0]
self.assertEqual(
call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
self.assertEqual(str(call[1][-1]), '1 second: test connect')
def test_call_catches_ReplicationException(self):
def connect(self):
raise exceptions.ReplicationException('test connect')
with mock.patch.object(ssync_sender.Sender, 'connect', connect):
node = dict(ip='1.2.3.4', port=5678, device='sda1')
job = dict(partition='9')
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
self.assertFalse(self.sender())
call = self.replicator.logger.error.mock_calls[0]
self.assertEqual(
call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
self.assertEqual(str(call[1][-1]), 'test connect')
def test_call_catches_other_exceptions(self):
node = dict(ip='1.2.3.4', port=5678, device='sda1')
job = dict(partition='9')
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
self.sender.connect = 'cause exception'
self.assertFalse(self.sender())
call = self.replicator.logger.exception.mock_calls[0]
self.assertEqual(
call[1],
('%s:%s/%s/%s EXCEPTION in replication.Sender', '1.2.3.4', 5678,
'sda1', '9'))
def test_call_catches_exception_handling_exception(self):
node = dict(ip='1.2.3.4', port=5678, device='sda1')
job = None # Will cause inside exception handler to fail
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
self.sender.connect = 'cause exception'
self.assertFalse(self.sender())
self.replicator.logger.exception.assert_called_once_with(
'EXCEPTION in replication.Sender')
def test_call_calls_others(self):
self.sender.suffixes = ['abc']
self.sender.connect = mock.MagicMock()
self.sender.missing_check = mock.MagicMock()
self.sender.updates = mock.MagicMock()
self.sender.disconnect = mock.MagicMock()
self.assertTrue(self.sender())
self.sender.connect.assert_called_once_with()
self.sender.missing_check.assert_called_once_with()
self.sender.updates.assert_called_once_with()
self.sender.disconnect.assert_called_once_with()
def test_call_calls_others_returns_failure(self):
self.sender.suffixes = ['abc']
self.sender.connect = mock.MagicMock()
self.sender.missing_check = mock.MagicMock()
self.sender.updates = mock.MagicMock()
self.sender.disconnect = mock.MagicMock()
self.sender.failures = 1
self.assertFalse(self.sender())
self.sender.connect.assert_called_once_with()
self.sender.missing_check.assert_called_once_with()
self.sender.updates.assert_called_once_with()
self.sender.disconnect.assert_called_once_with()
def test_connect_send_timeout(self):
self.replicator.conn_timeout = 0.01
node = dict(ip='1.2.3.4', port=5678, device='sda1')
job = dict(partition='9')
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
def putrequest(*args, **kwargs):
eventlet.sleep(0.1)
with mock.patch.object(
ssync_sender.bufferedhttp.BufferedHTTPConnection,
'putrequest', putrequest):
self.assertFalse(self.sender())
call = self.replicator.logger.error.mock_calls[0]
self.assertEqual(
call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
self.assertEqual(str(call[1][-1]), '0.01 seconds: connect send')
def test_connect_receive_timeout(self):
self.replicator.node_timeout = 0.02
node = dict(ip='1.2.3.4', port=5678, device='sda1')
job = dict(partition='9')
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
class FakeBufferedHTTPConnection(NullBufferedHTTPConnection):
def getresponse(*args, **kwargs):
eventlet.sleep(0.1)
with mock.patch.object(
ssync_sender.bufferedhttp, 'BufferedHTTPConnection',
FakeBufferedHTTPConnection):
self.assertFalse(self.sender())
call = self.replicator.logger.error.mock_calls[0]
self.assertEqual(
call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
self.assertEqual(str(call[1][-1]), '0.02 seconds: connect receive')
def test_connect_bad_status(self):
self.replicator.node_timeout = 0.02
node = dict(ip='1.2.3.4', port=5678, device='sda1')
job = dict(partition='9')
self.sender = ssync_sender.Sender(self.replicator, node, job, None)
self.sender.suffixes = ['abc']
class FakeBufferedHTTPConnection(NullBufferedHTTPConnection):
def getresponse(*args, **kwargs):
response = FakeResponse()
response.status = 503
return response
with mock.patch.object(
ssync_sender.bufferedhttp, 'BufferedHTTPConnection',
FakeBufferedHTTPConnection):
self.assertFalse(self.sender())
call = self.replicator.logger.error.mock_calls[0]
self.assertEqual(
call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
self.assertEqual(str(call[1][-1]), 'Expected status 200; got 503')
def test_readline_newline_in_buffer(self):
self.sender.response_buffer = 'Has a newline already.\r\nOkay.'
self.assertEqual(self.sender.readline(), 'Has a newline already.\r\n')
self.assertEqual(self.sender.response_buffer, 'Okay.')
def test_readline_buffer_exceeds_network_chunk_size_somehow(self):
self.replicator.network_chunk_size = 2
self.sender.response_buffer = '1234567890'
self.assertEqual(self.sender.readline(), '1234567890')
self.assertEqual(self.sender.response_buffer, '')
def test_readline_at_start_of_chunk(self):
self.sender.response = FakeResponse()
self.sender.response.fp = StringIO.StringIO('2\r\nx\n\r\n')
self.assertEqual(self.sender.readline(), 'x\n')
def test_readline_chunk_with_extension(self):
self.sender.response = FakeResponse()
self.sender.response.fp = StringIO.StringIO(
'2 ; chunk=extension\r\nx\n\r\n')
self.assertEqual(self.sender.readline(), 'x\n')
def test_readline_broken_chunk(self):
self.sender.response = FakeResponse()
self.sender.response.fp = StringIO.StringIO('q\r\nx\n\r\n')
self.assertRaises(
exceptions.ReplicationException, self.sender.readline)
self.assertTrue(self.sender.response.close_called)
def test_readline_terminated_chunk(self):
self.sender.response = FakeResponse()
self.sender.response.fp = StringIO.StringIO('b\r\nnot enough')
self.assertRaises(
exceptions.ReplicationException, self.sender.readline)
self.assertTrue(self.sender.response.close_called)
def test_readline_all(self):
self.sender.response = FakeResponse()
self.sender.response.fp = StringIO.StringIO('2\r\nx\n\r\n0\r\n\r\n')
self.assertEqual(self.sender.readline(), 'x\n')
self.assertEqual(self.sender.readline(), '')
self.assertEqual(self.sender.readline(), '')
def test_readline_all_trailing_not_newline_termed(self):
self.sender.response = FakeResponse()
self.sender.response.fp = StringIO.StringIO(
'2\r\nx\n\r\n3\r\n123\r\n0\r\n\r\n')
self.assertEqual(self.sender.readline(), 'x\n')
self.assertEqual(self.sender.readline(), '123')
self.assertEqual(self.sender.readline(), '')
self.assertEqual(self.sender.readline(), '')
def test_missing_check_timeout(self):
self.sender.connection = FakeConnection()
self.sender.connection.send = lambda d: eventlet.sleep(1)
self.sender.daemon.node_timeout = 0.01
self.assertRaises(exceptions.MessageTimeout, self.sender.missing_check)
def test_missing_check_has_empty_suffixes(self):
def yield_hashes(device, partition, suffixes=None):
if device != 'dev' or partition != '9' or suffixes != [
'abc', 'def']:
yield # Just here to make this a generator
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
self.sender.connection = FakeConnection()
self.sender.job = {'device': 'dev', 'partition': '9'}
self.sender.suffixes = ['abc', 'def']
self.sender.response = FakeResponse(
chunk_body=(
':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.missing_check()
self.assertEqual(
''.join(self.sender.connection.sent),
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.send_list, [])
def test_missing_check_has_suffixes(self):
def yield_hashes(device, partition, suffixes=None):
if device == 'dev' and partition == '9' and suffixes == [
'abc', 'def']:
yield (
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc',
'1380144470.00000')
yield (
'/srv/node/dev/objects/9/def/'
'9d41d8cd98f00b204e9800998ecf0def',
'9d41d8cd98f00b204e9800998ecf0def',
'1380144472.22222')
yield (
'/srv/node/dev/objects/9/def/'
'9d41d8cd98f00b204e9800998ecf1def',
'9d41d8cd98f00b204e9800998ecf1def',
'1380144474.44444')
else:
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
self.sender.connection = FakeConnection()
self.sender.job = {'device': 'dev', 'partition': '9'}
self.sender.suffixes = ['abc', 'def']
self.sender.response = FakeResponse(
chunk_body=(
':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.missing_check()
self.assertEqual(
''.join(self.sender.connection.sent),
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0def 1380144472.22222\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf1def 1380144474.44444\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.send_list, [])
def test_missing_check_far_end_disconnect(self):
def yield_hashes(device, partition, suffixes=None):
if device == 'dev' and partition == '9' and suffixes == ['abc']:
yield (
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc',
'1380144470.00000')
else:
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
self.sender.connection = FakeConnection()
self.sender.job = {'device': 'dev', 'partition': '9'}
self.sender.suffixes = ['abc']
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.response = FakeResponse(chunk_body='\r\n')
exc = None
try:
self.sender.missing_check()
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), 'Early disconnect')
self.assertEqual(
''.join(self.sender.connection.sent),
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
def test_missing_check_far_end_disconnect2(self):
def yield_hashes(device, partition, suffixes=None):
if device == 'dev' and partition == '9' and suffixes == ['abc']:
yield (
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc',
'1380144470.00000')
else:
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
self.sender.connection = FakeConnection()
self.sender.job = {'device': 'dev', 'partition': '9'}
self.sender.suffixes = ['abc']
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.response = FakeResponse(
chunk_body=':MISSING_CHECK: START\r\n')
exc = None
try:
self.sender.missing_check()
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), 'Early disconnect')
self.assertEqual(
''.join(self.sender.connection.sent),
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
def test_missing_check_far_end_unexpected(self):
def yield_hashes(device, partition, suffixes=None):
if device == 'dev' and partition == '9' and suffixes == ['abc']:
yield (
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc',
'1380144470.00000')
else:
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
self.sender.connection = FakeConnection()
self.sender.job = {'device': 'dev', 'partition': '9'}
self.sender.suffixes = ['abc']
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.response = FakeResponse(chunk_body='OH HAI\r\n')
exc = None
try:
self.sender.missing_check()
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), "Unexpected response: 'OH HAI'")
self.assertEqual(
''.join(self.sender.connection.sent),
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
def test_missing_check_send_list(self):
def yield_hashes(device, partition, suffixes=None):
if device == 'dev' and partition == '9' and suffixes == ['abc']:
yield (
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc',
'1380144470.00000')
else:
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
self.sender.connection = FakeConnection()
self.sender.job = {'device': 'dev', 'partition': '9'}
self.sender.suffixes = ['abc']
self.sender.response = FakeResponse(
chunk_body=(
':MISSING_CHECK: START\r\n'
'0123abc\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.missing_check()
self.assertEqual(
''.join(self.sender.connection.sent),
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.send_list, ['0123abc'])
def test_updates_timeout(self):
self.sender.connection = FakeConnection()
self.sender.connection.send = lambda d: eventlet.sleep(1)
self.sender.daemon.node_timeout = 0.01
self.assertRaises(exceptions.MessageTimeout, self.sender.updates)
def test_updates_empty_send_list(self):
self.sender.connection = FakeConnection()
self.sender.send_list = []
self.sender.response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates()
self.assertEqual(
''.join(self.sender.connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n')
def test_updates_unexpected_response_lines1(self):
self.sender.connection = FakeConnection()
self.sender.send_list = []
self.sender.response = FakeResponse(
chunk_body=(
'abc\r\n'
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
exc = None
try:
self.sender.updates()
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), "Unexpected response: 'abc'")
self.assertEqual(
''.join(self.sender.connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n')
def test_updates_unexpected_response_lines2(self):
self.sender.connection = FakeConnection()
self.sender.send_list = []
self.sender.response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
'abc\r\n'
':UPDATES: END\r\n'))
exc = None
try:
self.sender.updates()
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), "Unexpected response: 'abc'")
self.assertEqual(
''.join(self.sender.connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n')
def test_updates_is_deleted(self):
self.sender.connection = FakeConnection()
self.sender.job = {'device': 'dev', 'partition': '9'}
self.sender.node = {}
self.sender.send_list = ['0123abc']
self.sender.send_delete = mock.MagicMock()
self.sender.send_put = mock.MagicMock()
self.sender.response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
df = self.sender.daemon._diskfile_mgr.get_diskfile_from_hash()
df.account = 'a'
df.container = 'c'
df.obj = 'o'
dfdel = exceptions.DiskFileDeleted()
dfdel.timestamp = '1381679759.90941'
df.open.side_effect = dfdel
self.sender.updates()
self.sender.send_delete.assert_called_once_with(
'/a/c/o', '1381679759.90941')
self.assertEqual(self.sender.send_put.mock_calls, [])
# note that the delete line isn't actually sent since we mock
# send_delete; send_delete is tested separately.
self.assertEqual(
''.join(self.sender.connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n')
def test_updates_put(self):
self.sender.connection = FakeConnection()
self.sender.job = {'device': 'dev', 'partition': '9'}
self.sender.node = {}
self.sender.send_list = ['0123abc']
df = mock.MagicMock()
df.get_metadata.return_value = {'Content-Length': 123}
self.sender.send_delete = mock.MagicMock()
self.sender.send_put = mock.MagicMock()
self.sender.response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
df = self.sender.daemon._diskfile_mgr.get_diskfile_from_hash()
df.account = 'a'
df.container = 'c'
df.obj = 'o'
self.sender.updates()
self.assertEqual(self.sender.send_delete.mock_calls, [])
self.sender.send_put.assert_called_once_with('/a/c/o', df)
# note that the put line isn't actually sent since we mock send_put;
# send_put is tested separately.
self.assertEqual(
''.join(self.sender.connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n')
def test_updates_read_response_timeout_start(self):
self.sender.connection = FakeConnection()
self.sender.send_list = []
self.sender.response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
orig_readline = self.sender.readline
def delayed_readline():
eventlet.sleep(1)
return orig_readline()
self.sender.readline = delayed_readline
self.sender.daemon.http_timeout = 0.01
self.assertRaises(exceptions.MessageTimeout, self.sender.updates)
def test_updates_read_response_disconnect_start(self):
self.sender.connection = FakeConnection()
self.sender.send_list = []
self.sender.response = FakeResponse(chunk_body='\r\n')
exc = None
try:
self.sender.updates()
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), 'Early disconnect')
self.assertEqual(
''.join(self.sender.connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n')
def test_updates_read_response_unexp_start(self):
self.sender.connection = FakeConnection()
self.sender.send_list = []
self.sender.response = FakeResponse(
chunk_body=(
'anything else\r\n'
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
exc = None
try:
self.sender.updates()
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), "Unexpected response: 'anything else'")
self.assertEqual(
''.join(self.sender.connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n')
def test_updates_read_response_timeout_end(self):
self.sender.connection = FakeConnection()
self.sender.send_list = []
self.sender.response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
orig_readline = self.sender.readline
def delayed_readline():
rv = orig_readline()
if rv == ':UPDATES: END\r\n':
eventlet.sleep(1)
return rv
self.sender.readline = delayed_readline
self.sender.daemon.http_timeout = 0.01
self.assertRaises(exceptions.MessageTimeout, self.sender.updates)
def test_updates_read_response_disconnect_end(self):
self.sender.connection = FakeConnection()
self.sender.send_list = []
self.sender.response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
'\r\n'))
exc = None
try:
self.sender.updates()
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), 'Early disconnect')
self.assertEqual(
''.join(self.sender.connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n')
def test_updates_read_response_unexp_end(self):
self.sender.connection = FakeConnection()
self.sender.send_list = []
self.sender.response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
'anything else\r\n'
':UPDATES: END\r\n'))
exc = None
try:
self.sender.updates()
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), "Unexpected response: 'anything else'")
self.assertEqual(
''.join(self.sender.connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n')
def test_send_delete_timeout(self):
self.sender.connection = FakeConnection()
self.sender.connection.send = lambda d: eventlet.sleep(1)
self.sender.daemon.node_timeout = 0.01
exc = None
try:
self.sender.send_delete('/a/c/o', '1381679759.90941')
except exceptions.MessageTimeout as err:
exc = err
self.assertEqual(str(exc), '0.01 seconds: send_delete')
def test_send_delete(self):
self.sender.connection = FakeConnection()
self.sender.send_delete('/a/c/o', '1381679759.90941')
self.assertEqual(
''.join(self.sender.connection.sent),
'30\r\n'
'DELETE /a/c/o\r\n'
'X-Timestamp: 1381679759.90941\r\n'
'\r\n\r\n')
def test_send_put_initial_timeout(self):
self.sender.connection = FakeConnection()
df = mock.MagicMock()
self.sender.connection.send = lambda d: eventlet.sleep(1)
self.sender.daemon.node_timeout = 0.01
df.get_metadata.return_value = {
'name': '/a/c/o',
'X-Timestamp': '1381679759.90941',
'Content-Length': '3',
'Etag': '900150983cd24fb0d6963f7d28e17f72',
'Some-Other-Header': 'value'}
df.content_length = 3
df.__iter__ = lambda x: iter(['ab', 'c'])
exc = None
try:
self.sender.send_put('/a/c/o', df)
except exceptions.MessageTimeout as err:
exc = err
self.assertEqual(str(exc), '0.01 seconds: send_put')
def test_send_put_chunk_timeout(self):
self.sender.connection = FakeConnection()
df = mock.MagicMock()
self.sender.daemon.node_timeout = 0.01
df.get_metadata.return_value = {
'name': '/a/c/o',
'X-Timestamp': '1381679759.90941',
'Content-Length': '3',
'Etag': '900150983cd24fb0d6963f7d28e17f72',
'Some-Other-Header': 'value'}
def iterator(dontcare):
self.sender.connection.send = lambda d: eventlet.sleep(1)
return iter(['ab', 'c'])
df.content_length = 3
df.reader().__iter__ = iterator
exc = None
try:
self.sender.send_put('/a/c/o', df)
except exceptions.MessageTimeout as err:
exc = err
self.assertEqual(str(exc), '0.01 seconds: send_put chunk')
def test_send_put(self):
self.sender.connection = FakeConnection()
df = mock.MagicMock()
df.get_metadata.return_value = {
'name': '/a/c/o',
'X-Timestamp': '1381679759.90941',
'Content-Length': '3',
'Etag': '900150983cd24fb0d6963f7d28e17f72',
'Some-Other-Header': 'value'}
df.content_length = 3
df.reader().__iter__ = lambda x: iter(['ab', 'c'])
self.sender.send_put('/a/c/o', df)
self.assertEqual(
''.join(self.sender.connection.sent),
'82\r\n'
'PUT /a/c/o\r\n'
'Content-Length: 3\r\n'
'Etag: 900150983cd24fb0d6963f7d28e17f72\r\n'
'Some-Other-Header: value\r\n'
'X-Timestamp: 1381679759.90941\r\n'
'\r\n'
'\r\n'
'2\r\n'
'ab\r\n'
'1\r\n'
'c\r\n')
def test_disconnect_timeout(self):
self.sender.connection = FakeConnection()
self.sender.connection.send = lambda d: eventlet.sleep(1)
self.sender.daemon.node_timeout = 0.01
self.sender.disconnect()
self.assertEqual(''.join(self.sender.connection.sent), '')
self.assertTrue(self.sender.connection.closed)
def test_disconnect(self):
self.sender.connection = FakeConnection()
self.sender.disconnect()
self.assertEqual(''.join(self.sender.connection.sent), '0\r\n\r\n')
self.assertTrue(self.sender.connection.closed)
if __name__ == '__main__':
unittest.main()