From 158e6c3ae9dfdf233b91f67e04baef896cf7f039 Mon Sep 17 00:00:00 2001 From: Chuck Thier Date: Tue, 31 Aug 2010 23:12:59 +0000 Subject: [PATCH] refactored bins to by more DRY --- bin/swift-account-auditor | 31 +------------ bin/swift-account-reaper | 31 +------------ bin/swift-account-replicator | 31 +++---------- bin/swift-container-auditor | 31 +------------ bin/swift-container-replicator | 30 +++---------- bin/swift-container-updater | 24 +---------- bin/swift-object-auditor | 28 +----------- bin/swift-object-replicator | 49 +++------------------ bin/swift-object-updater | 25 +---------- swift/account/auditor.py | 9 ++-- swift/account/reaper.py | 8 ++-- swift/account/replicator.py | 26 +++++++++++ swift/common/daemon.py | 60 ++++++++++++++++++++++++++ swift/common/db_replicator.py | 11 ++--- swift/common/utils.py | 2 + swift/container/auditor.py | 8 ++-- swift/container/replicator.py | 25 +++++++++++ swift/container/updater.py | 8 ++-- swift/obj/auditor.py | 10 +++-- swift/obj/replicator.py | 42 ++++++++++++++---- swift/obj/updater.py | 10 +++-- test/unit/common/test_db_replicator.py | 4 +- test/unit/container/test_updater.py | 14 +++--- test/unit/obj/test_replicator.py | 6 +-- test/unit/obj/test_updater.py | 12 +++--- 25 files changed, 228 insertions(+), 307 deletions(-) create mode 100644 swift/account/replicator.py create mode 100644 swift/common/daemon.py create mode 100644 swift/container/replicator.py diff --git a/bin/swift-account-auditor b/bin/swift-account-auditor index 681adc6a26..a715979599 100755 --- a/bin/swift-account-auditor +++ b/bin/swift-account-auditor @@ -14,10 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os -import signal import sys -from ConfigParser import ConfigParser from swift.account.auditor import AccountAuditor from swift.common import utils @@ -26,32 +23,6 @@ if __name__ == '__main__': if len(sys.argv) < 2: print "Usage: swift-account-auditor CONFIG_FILE [once]" sys.exit() - once = len(sys.argv) > 2 and sys.argv[2] == 'once' - conf = utils.readconf(sys.argv[1], 'account-auditor') - logger = utils.get_logger(conf) - # log uncaught exceptions - sys.excepthook = lambda *exc_info: \ - logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info) - sys.stdout = sys.stderr = utils.LoggerFileObject(logger) - - utils.drop_privileges(conf.get('user', 'swift')) - - try: - os.setsid() - except OSError: - pass - - def kill_children(*args): - signal.signal(signal.SIGTERM, signal.SIG_IGN) - os.killpg(0, signal.SIGTERM) - sys.exit() - - signal.signal(signal.SIGTERM, kill_children) - - auditor = AccountAuditor(conf) - if once: - auditor.audit_once() - else: - auditor.audit_forever() + auditor = AccountAuditor(conf).run(once) diff --git a/bin/swift-account-reaper b/bin/swift-account-reaper index 444d19a09d..90496c64e8 100755 --- a/bin/swift-account-reaper +++ b/bin/swift-account-reaper @@ -14,10 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os -import signal import sys -from ConfigParser import ConfigParser from swift.account.reaper import AccountReaper from swift.common import utils @@ -26,32 +23,6 @@ if __name__ == '__main__': if len(sys.argv) < 2: print "Usage: account-reaper CONFIG_FILE [once]" sys.exit() - once = len(sys.argv) > 2 and sys.argv[2] == 'once' - conf = utils.readconf(sys.argv[1], 'account-reaper') - logger = utils.get_logger(conf) - # log uncaught exceptions - sys.excepthook = lambda *exc_info: \ - logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info) - sys.stdout = sys.stderr = utils.LoggerFileObject(logger) - - utils.drop_privileges(conf.get('user', 'swift')) - - try: - os.setsid() - except OSError: - pass - - def kill_children(*args): - signal.signal(signal.SIGTERM, signal.SIG_IGN) - os.killpg(0, signal.SIGTERM) - sys.exit() - - signal.signal(signal.SIGTERM, kill_children) - - reaper = AccountReaper(conf) - if once: - reaper.reap_once() - else: - reaper.reap_forever() + reaper = AccountReaper(conf).run(once) diff --git a/bin/swift-account-replicator b/bin/swift-account-replicator index 18bea931a6..c71c326b8b 100755 --- a/bin/swift-account-replicator +++ b/bin/swift-account-replicator @@ -15,31 +15,14 @@ # limitations under the License. import sys -from ConfigParser import ConfigParser -import getopt -from swift.account import server as account_server -from swift.common import db, db_replicator, utils - -class AccountReplicator(db_replicator.Replicator): - server_type = 'account' - ring_file = 'account.ring.gz' - brokerclass = db.AccountBroker - datadir = account_server.DATADIR - default_port = 6002 +from swift.common import utils +from swift.account.replicator import AccountReplicator if __name__ == '__main__': - optlist, args = getopt.getopt(sys.argv[1:], '', ['once']) - - if not args: - print "Usage: swift-account-replicator <--once> CONFIG_FILE [once]" - sys.exit() - - once = len(args) > 1 and args[1] == 'once' + if len(sys.argv) < 2: + print "Usage: swift-account-replicator CONFIG_FILE [once]" + sys.exit(1) + once = len(sys.argv) > 2 and sys.argv[2] == 'once' conf = utils.readconf(sys.argv[1], 'account-replicator') - utils.drop_privileges(conf.get('user', 'swift')) - if once or '--once' in [opt[0] for opt in optlist]: - AccountReplicator(conf).replicate_once() - else: - AccountReplicator(conf).replicate_forever() - + AccountReplicator(conf).run(once) diff --git a/bin/swift-container-auditor b/bin/swift-container-auditor index 3f22fbf698..b3472c54af 100755 --- a/bin/swift-container-auditor +++ b/bin/swift-container-auditor @@ -14,10 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os -import signal import sys -from ConfigParser import ConfigParser from swift.container.auditor import ContainerAuditor from swift.common import utils @@ -26,32 +23,6 @@ if __name__ == '__main__': if len(sys.argv) < 2: print "Usage: swift-container-auditor CONFIG_FILE [once]" sys.exit() - once = len(sys.argv) > 2 and sys.argv[2] == 'once' - conf = utils.readconf(sys.argv[1], 'container-auditor') - logger = utils.get_logger(conf) - # log uncaught exceptions - sys.excepthook = lambda *exc_info: \ - logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info) - sys.stdout = sys.stderr = utils.LoggerFileObject(logger) - - utils.drop_privileges(conf.get('user', 'swift')) - - try: - os.setsid() - except OSError: - pass - - def kill_children(*args): - signal.signal(signal.SIGTERM, signal.SIG_IGN) - os.killpg(0, signal.SIGTERM) - sys.exit() - - signal.signal(signal.SIGTERM, kill_children) - - auditor = ContainerAuditor(conf) - if once: - auditor.audit_once() - else: - auditor.audit_forever() + ContainerAuditor(conf).run(once) diff --git a/bin/swift-container-replicator b/bin/swift-container-replicator index 6e8ae0f41d..a0594142df 100755 --- a/bin/swift-container-replicator +++ b/bin/swift-container-replicator @@ -15,31 +15,15 @@ # limitations under the License. import sys -from ConfigParser import ConfigParser -import getopt -from swift.container import server as container_server -from swift.common import db, db_replicator, utils - -class ContainerReplicator(db_replicator.Replicator): - server_type = 'container' - ring_file = 'container.ring.gz' - brokerclass = db.ContainerBroker - datadir = container_server.DATADIR - default_port = 6001 +from swift.common import db, utils +from swift.container.replicator import ContainerReplicator if __name__ == '__main__': - optlist, args = getopt.getopt(sys.argv[1:], '', ['once']) - - if not args: - print "Usage: swift-container-replicator <--once> CONFIG_FILE [once]" + if len(sys.argv) < 2: + print "Usage: swift-container-replicator CONFIG_FILE [once]" sys.exit(1) - - once = len(args) > 1 and args[1] == 'once' - conf = utils.readconf(args[0], 'container-replicator') - utils.drop_privileges(conf.get('user', 'swift')) - if once or '--once' in [opt[0] for opt in optlist]: - ContainerReplicator(conf).replicate_once() - else: - ContainerReplicator(conf).replicate_forever() + once = len(sys.argv) > 2 and sys.argv[2] == 'once' + conf = utils.readconf(sys.argv[1], 'container-replicator') + ContainerReplicator(conf).run(once) diff --git a/bin/swift-container-updater b/bin/swift-container-updater index 92b7017faa..ed22d29901 100755 --- a/bin/swift-container-updater +++ b/bin/swift-container-updater @@ -14,10 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os -import signal import sys -from ConfigParser import ConfigParser from swift.container.updater import ContainerUpdater from swift.common import utils @@ -26,25 +23,6 @@ if __name__ == '__main__': if len(sys.argv) < 2: print "Usage: swift-container-updater CONFIG_FILE [once]" sys.exit() - once = len(sys.argv) > 2 and sys.argv[2] == 'once' conf = utils.readconf(sys.argv[1], 'container-updater') - utils.drop_privileges(conf.get('user', 'swift')) - - try: - os.setsid() - except OSError: - pass - - def kill_children(*args): - signal.signal(signal.SIGTERM, signal.SIG_IGN) - os.killpg(0, signal.SIGTERM) - sys.exit() - - signal.signal(signal.SIGTERM, kill_children) - - updater = ContainerUpdater(conf) - if once: - updater.update_once_single_threaded() - else: - updater.update_forever() + ContainerUpdater(conf).run(once) diff --git a/bin/swift-object-auditor b/bin/swift-object-auditor index a80065414a..d60bcb6148 100755 --- a/bin/swift-object-auditor +++ b/bin/swift-object-auditor @@ -14,8 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os -import signal import sys from swift.obj.auditor import ObjectAuditor @@ -28,28 +26,4 @@ if __name__ == '__main__': once = len(sys.argv) > 2 and sys.argv[2] == 'once' conf = utils.readconf(sys.argv[1], 'object-auditor') - logger = utils.get_logger(conf) - # log uncaught exceptions - sys.excepthook = lambda *exc_info: \ - logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info) - sys.stdout = sys.stderr = utils.LoggerFileObject(logger) - - utils.drop_privileges(conf.get('user', 'swift')) - - try: - os.setsid() - except OSError: - pass - - def kill_children(*args): - signal.signal(signal.SIGTERM, signal.SIG_IGN) - os.killpg(0, signal.SIGTERM) - sys.exit() - - signal.signal(signal.SIGTERM, kill_children) - - auditor = ObjectAuditor(conf) - if once: - auditor.audit_once() - else: - auditor.audit_forever() + ObjectAuditor(conf).run(once) diff --git a/bin/swift-object-replicator b/bin/swift-object-replicator index c01c96aff4..cc8c32fdde 100755 --- a/bin/swift-object-replicator +++ b/bin/swift-object-replicator @@ -15,54 +15,15 @@ # limitations under the License. import sys -import logging -import time - -from eventlet import sleep, hubs -hubs.use_hub('poll') from swift.obj.replicator import ObjectReplicator -from swift.common.utils import get_logger, drop_privileges, LoggerFileObject, \ - readconf - -TRUE_VALUES = set(('true', '1', 'yes', 'True', 'Yes')) +from swift.common import utils if __name__ == '__main__': if len(sys.argv) < 2: print "Usage: swift-object-replicator CONFIG_FILE [once]" sys.exit() - conf = readconf(sys.argv[1], "object-replicator") - once = len(sys.argv) > 2 and sys.argv[2] == 'once' - logger = get_logger(conf) - # log uncaught exceptions - sys.excepthook = lambda *exc_info: \ - logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info) - sys.stdout = sys.stderr = LoggerFileObject(logger) - drop_privileges(conf.get('user', 'swift')) - if not once and conf.get('daemonize', 'true') in TRUE_VALUES: - logger.info("Starting object replicator in daemon mode.") - # Run the replicator continually - while True: - start = time.time() - logger.info("Starting object replication pass.") - # Run the replicator - replicator = ObjectReplicator(conf, logger) - replicator.run() - total = (time.time() - start)/60 - # Reload the config - logger.info("Object replication complete. (%.02f minutes)" % total) - conf = read_configs(sys.argv[1]) - if conf.get('daemonize', 'true') not in TRUE_VALUES: - # Stop running - logger.info("Daemon mode turned off in config, stopping.") - break - logger.debug('Replication sleeping for %s seconds.' % - conf['run_pause']) - sleep(int(conf['run_pause'])) - else: - start = time.time() - logger.info("Running object replicator in script mode.") - replicator = ObjectReplicator(conf, logger) - replicator.run() - total = (time.time() - start)/60 - logger.info("Object replication complete. (%.02f minutes)" % total) + conf = utils.readconf(sys.argv[1], "object-replicator") + once = (len(sys.argv) > 2 and sys.argv[2] == 'once') or \ + conf.get('daemonize', 'true') not in utils.TRUE_VALUES + ObjectReplicator(conf).run(once) diff --git a/bin/swift-object-updater b/bin/swift-object-updater index 375e63ec83..d24cdbcfe9 100755 --- a/bin/swift-object-updater +++ b/bin/swift-object-updater @@ -14,8 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os -import signal import sys from swift.obj.updater import ObjectUpdater @@ -25,27 +23,6 @@ if __name__ == '__main__': if len(sys.argv) < 2: print "Usage: swift-object-updater CONFIG_FILE [once]" sys.exit(1) - once = len(sys.argv) > 2 and sys.argv[2] == 'once' - conf = utils.readconf(sys.argv[1], 'object-updater') - utils.drop_privileges(conf.get('user', 'swift')) - - try: - os.setsid() - except OSError: - pass - - def kill_children(*args): - signal.signal(signal.SIGTERM, signal.SIG_IGN) - os.killpg(0, signal.SIGTERM) - sys.exit() - - signal.signal(signal.SIGTERM, kill_children) - - updater = ObjectUpdater(conf) - if once: - updater.update_once_single_threaded() - else: - updater.update_forever() - + ObjectUpdater(conf).run(once) diff --git a/swift/account/auditor.py b/swift/account/auditor.py index 249d9f436d..b145f663a4 100644 --- a/swift/account/auditor.py +++ b/swift/account/auditor.py @@ -26,16 +26,18 @@ from swift.common.bufferedhttp import http_connect from swift.common.exceptions import ConnectionTimeout from swift.common.ring import Ring from swift.common.utils import get_logger +from swift.common.daemon import Daemon class AuditException(Exception): pass -class AccountAuditor(object): +class AccountAuditor(Daemon): """Audit accounts.""" def __init__(self, conf): + self.conf = conf self.logger = get_logger(conf, 'account-auditor') self.devices = conf.get('devices', '/srv/node') self.mount_check = conf.get('mount_check', 'true').lower() in \ @@ -60,12 +62,11 @@ class AccountAuditor(object): """ if not self.container_ring: self.logger.debug( - 'Loading container ring from %s' % self.container_ring_path) self.container_ring = Ring(self.container_ring_path) return self.container_ring - def audit_forever(self): # pragma: no cover + def run_forever(self): # pragma: no cover """Run the account audit until stopped.""" reported = time.time() time.sleep(random() * self.interval) @@ -92,7 +93,7 @@ class AccountAuditor(object): if elapsed < self.interval: time.sleep(self.interval - elapsed) - def audit_once(self): + def run_once(self): """Run the account audit once.""" self.logger.info('Begin account audit "once" mode') begin = time.time() diff --git a/swift/account/reaper.py b/swift/account/reaper.py index f747e3ce09..d93b363719 100644 --- a/swift/account/reaper.py +++ b/swift/account/reaper.py @@ -27,9 +27,10 @@ from swift.common.direct_client import ClientException, \ direct_delete_container, direct_delete_object, direct_get_container from swift.common.ring import Ring from swift.common.utils import get_logger, whataremyips +from swift.common.daemon import Daemon -class AccountReaper(object): +class AccountReaper(Daemon): """ Removes data from status=DELETED accounts. These are accounts that have been asked to be removed by the reseller via services @@ -51,6 +52,7 @@ class AccountReaper(object): """ def __init__(self, conf): + self.conf = conf self.logger = get_logger(conf) self.devices = conf.get('devices', '/srv/node') self.mount_check = conf.get('mount_check', 'true').lower() in \ @@ -95,7 +97,7 @@ class AccountReaper(object): self.object_ring = Ring(self.object_ring_path) return self.object_ring - def reap_forever(self): + def run_forever(self): """ Main entry point when running the reaper in its normal daemon mode. This repeatedly calls :func:`reap_once` no quicker than the @@ -110,7 +112,7 @@ class AccountReaper(object): if elapsed < self.interval: sleep(self.interval - elapsed) - def reap_once(self): + def run_once(self): """ Main entry point when running the reaper in 'once' mode, where it will do a single pass over all accounts on the server. This is called diff --git a/swift/account/replicator.py b/swift/account/replicator.py new file mode 100644 index 0000000000..6d0ffb9cea --- /dev/null +++ b/swift/account/replicator.py @@ -0,0 +1,26 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from swift.account import server as account_server +from swift.common import db, db_replicator + + +class AccountReplicator(db_replicator.Replicator): + server_type = 'account' + ring_file = 'account.ring.gz' + brokerclass = db.AccountBroker + datadir = account_server.DATADIR + default_port = 6002 + diff --git a/swift/common/daemon.py b/swift/common/daemon.py new file mode 100644 index 0000000000..80d3eff4c7 --- /dev/null +++ b/swift/common/daemon.py @@ -0,0 +1,60 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import signal +from swift.common import utils + +class Daemon(object): + """Daemon base class""" + + def __init__(self, conf): + self.conf = conf + self.logger = utils.get_logger(conf, 'swift-daemon') + + def run_once(self): + """Override this to run the script once""" + raise NotImplementedError('run_once not implemented') + + def run_forever(self): + """Override this to run forever""" + raise NotImplementedError('run_forever not implemented') + + def run(self, once=False): + """Run the daemon""" + # log uncaught exceptions + sys.excepthook = lambda *exc_info: \ + self.logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info) + sys.stdout = sys.stderr = utils.LoggerFileObject(self.logger) + + utils.drop_privileges(self.conf.get('user', 'swift')) + + try: + os.setsid() + except OSError: + pass + + def kill_children(*args): + signal.signal(signal.SIGTERM, signal.SIG_IGN) + os.killpg(0, signal.SIGTERM) + sys.exit() + + signal.signal(signal.SIGTERM, kill_children) + + if once: + self.run_once() + else: + self.run_forever() diff --git a/swift/common/db_replicator.py b/swift/common/db_replicator.py index 3e7433fd31..c777930fce 100644 --- a/swift/common/db_replicator.py +++ b/swift/common/db_replicator.py @@ -33,6 +33,7 @@ from swift.common.utils import get_logger, whataremyips, storage_directory, \ from swift.common import ring from swift.common.bufferedhttp import BufferedHTTPConnection from swift.common.exceptions import DriveNotMounted, ConnectionTimeout +from swift.common.daemon import Daemon def quarantine_db(object_file, server_type): @@ -84,14 +85,14 @@ class ReplConnection(BufferedHTTPConnection): return None -class Replicator(object): +class Replicator(Daemon): """ Implements the logic for directing db replication. """ def __init__(self, conf): - self.logger = \ - get_logger(conf) + self.conf = conf + self.logger = get_logger(conf) # log uncaught exceptions sys.excepthook = lambda * exc_info: \ self.logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info) @@ -396,7 +397,7 @@ class Replicator(object): except StopIteration: its.remove(it) - def replicate_once(self): + def run_once(self): """Run a replication pass once.""" self._zero_stats() dirs = [] @@ -425,7 +426,7 @@ class Replicator(object): self.logger.info('Replication run OVER') self._report_stats() - def replicate_forever(self): + def run_forever(self): """ Replicate dbs under the given root in an infinite loop. """ diff --git a/swift/common/utils.py b/swift/common/utils.py index 920d54d644..0028581cb1 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -55,6 +55,8 @@ _posix_fadvise = None # will end up with would also require knowing this suffix. HASH_PATH_SUFFIX = os.environ.get('SWIFT_HASH_PATH_SUFFIX', 'endcap') +# Used when reading config values +TRUE_VALUES = set(('true', '1', 'yes', 'True', 'Yes')) def load_libc_function(func_name): """ diff --git a/swift/container/auditor.py b/swift/container/auditor.py index a41bf5358c..592f4b7cd6 100644 --- a/swift/container/auditor.py +++ b/swift/container/auditor.py @@ -27,16 +27,18 @@ from swift.common.bufferedhttp import http_connect from swift.common.exceptions import ConnectionTimeout from swift.common.ring import Ring from swift.common.utils import get_logger +from swift.common.daemon import Daemon class AuditException(Exception): pass -class ContainerAuditor(object): +class ContainerAuditor(Daemon): """Audit containers.""" def __init__(self, conf): + self.conf = conf self.logger = get_logger(conf) self.devices = conf.get('devices', '/srv/node') self.mount_check = conf.get('mount_check', 'true').lower() in \ @@ -81,7 +83,7 @@ class ContainerAuditor(object): self.object_ring = Ring(self.object_ring_path) return self.object_ring - def audit_forever(self): # pragma: no cover + def run_forever(self): # pragma: no cover """Run the container audit until stopped.""" reported = time.time() time.sleep(random() * self.interval) @@ -114,7 +116,7 @@ class ContainerAuditor(object): if elapsed < self.interval: time.sleep(self.interval - elapsed) - def audit_once(self): + def run_once(self): """Run the container audit once.""" self.logger.info('Begin container audit "once" mode') begin = time.time() diff --git a/swift/container/replicator.py b/swift/container/replicator.py new file mode 100644 index 0000000000..c264ce680e --- /dev/null +++ b/swift/container/replicator.py @@ -0,0 +1,25 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from swift.container import server as container_server +from swift.common import db, db_replicator + +class ContainerReplicator(db_replicator.Replicator): + server_type = 'container' + ring_file = 'container.ring.gz' + brokerclass = db.ContainerBroker + datadir = container_server.DATADIR + default_port = 6001 + diff --git a/swift/container/updater.py b/swift/container/updater.py index 003d06b130..646815257b 100644 --- a/swift/container/updater.py +++ b/swift/container/updater.py @@ -28,12 +28,14 @@ from swift.common.db import ContainerBroker from swift.common.exceptions import ConnectionTimeout from swift.common.ring import Ring from swift.common.utils import get_logger, whataremyips +from swift.common.daemon import Daemon -class ContainerUpdater(object): +class ContainerUpdater(Daemon): """Update container information in account listings.""" def __init__(self, conf): + self.conf = conf self.logger = get_logger(conf, 'container-updater') self.devices = conf.get('devices', '/srv/node') self.mount_check = conf.get('mount_check', 'true').lower() in \ @@ -78,7 +80,7 @@ class ContainerUpdater(object): shuffle(paths) return paths - def update_forever(self): # pragma: no cover + def run_forever(self): # pragma: no cover """ Run the updator continuously. """ @@ -118,7 +120,7 @@ class ContainerUpdater(object): if elapsed < self.interval: time.sleep(self.interval - elapsed) - def update_once_single_threaded(self): + def run_once(self): """ Run the updater once. """ diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py index d9e2145d92..2e41a54f31 100644 --- a/swift/obj/auditor.py +++ b/swift/obj/auditor.py @@ -28,13 +28,15 @@ from swift.common.exceptions import ConnectionTimeout from swift.common.ring import Ring from swift.common.utils import get_logger, renamer from swift.common.exceptions import AuditException +from swift.common.daemon import Daemon -class ObjectAuditor(object): +class ObjectAuditor(Daemon): """Audit objects.""" def __init__(self, conf): - self.logger = get_logger(conf) + self.conf = conf + self.logger = get_logger(conf, 'object-auditor') self.devices = conf.get('devices', '/srv/node') self.mount_check = conf.get('mount_check', 'true').lower() in \ ('true', 't', '1', 'on', 'yes', 'y') @@ -63,7 +65,7 @@ class ObjectAuditor(object): self.container_ring = Ring(self.container_ring_path) return self.container_ring - def audit_forever(self): # pragma: no cover + def run_forever(self): # pragma: no cover """Run the object audit until stopped.""" reported = time.time() time.sleep(random() * self.interval) @@ -97,7 +99,7 @@ class ObjectAuditor(object): if elapsed < self.interval: time.sleep(self.interval - elapsed) - def audit_once(self): + def run_once(self): """Run the object audit once.""" self.logger.info('Begin object audit "once" mode') begin = time.time() diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index 9567c3b7a9..45a77980de 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -24,15 +24,17 @@ import itertools import cPickle as pickle import eventlet -from eventlet import GreenPool, tpool, Timeout, sleep +from eventlet import GreenPool, tpool, Timeout, sleep, hubs from eventlet.green import subprocess from eventlet.support.greenlets import GreenletExit from swift.common.ring import Ring from swift.common.utils import whataremyips, unlink_older_than, lock_path, \ - renamer, compute_eta + renamer, compute_eta, get_logger from swift.common.bufferedhttp import http_connect +from swift.common.daemon import Daemon +hubs.use_hub('poll') PICKLE_PROTOCOL = 2 ONE_WEEK = 604800 @@ -190,22 +192,22 @@ def get_hashes(partition_dir, do_listdir=True, reclaim_age=ONE_WEEK): return hashed, hashes -class ObjectReplicator(object): +class ObjectReplicator(Daemon): """ Replicate objects. Encapsulates most logic and data needed by the object replication process. - Each call to .run() performs one replication pass. It's up to the caller - to do this in a loop. + Each call to .replicate() performs one replication pass. It's up to the + caller to do this in a loop. """ - def __init__(self, conf, logger): + def __init__(self, conf): """ :param conf: configuration object obtained from ConfigParser :param logger: logging object """ self.conf = conf - self.logger = logger + self.logger = get_logger(conf, 'object-replicator') self.devices_dir = conf.get('devices', '/srv/node') self.mount_check = conf.get('mount_check', 'true').lower() in \ ('true', 't', '1', 'on', 'yes', 'y') @@ -221,6 +223,7 @@ class ObjectReplicator(object): self.next_check = time.time() + self.ring_check_interval self.reclaim_age = int(conf.get('reclaim_age', 86400 * 7)) self.partition_times = [] + self.run_pause = int(conf.get('run_pause', 30)) def _rsync(self, args): """ @@ -450,7 +453,7 @@ class ObjectReplicator(object): eventlet.sleep(300) self.stats_line() - def run(self): + def replicate(self): """Run a replication pass""" self.start = time.time() self.suffix_count = 0 @@ -506,3 +509,26 @@ class ObjectReplicator(object): self.kill_coros() self.stats_line() stats.kill() + + def run_once(self): + start = time.time() + self.logger.info("Running object replicator in script mode.") + self.replicate() + total = (time.time() - start)/60 + self.logger.info( + "Object replication complete. (%.02f minutes)" % total) + + def run_forever(self): + self.logger.info("Starting object replicator in daemon mode.") + # Run the replicator continually + while True: + start = time.time() + self.logger.info("Starting object replication pass.") + # Run the replicator + self.replicate() + total = (time.time() - start)/60 + self.logger.info( + "Object replication complete. (%.02f minutes)" % total) + self.logger.debug('Replication sleeping for %s seconds.' % + self.run_pause) + sleep(self.run_pause) diff --git a/swift/obj/updater.py b/swift/obj/updater.py index 7fa9182823..40121e4b9d 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -26,14 +26,16 @@ from swift.common.bufferedhttp import http_connect from swift.common.exceptions import ConnectionTimeout from swift.common.ring import Ring from swift.common.utils import get_logger, renamer +from swift.common.daemon import Daemon from swift.obj.server import ASYNCDIR -class ObjectUpdater(object): +class ObjectUpdater(Daemon): """Update object information in container listings.""" def __init__(self, conf): - self.logger = get_logger(conf) + self.conf = conf + self.logger = get_logger(conf, 'object-updater') self.devices = conf.get('devices', '/srv/node') self.mount_check = conf.get('mount_check', 'true').lower() in \ ('true', 't', '1', 'on', 'yes', 'y') @@ -56,7 +58,7 @@ class ObjectUpdater(object): self.container_ring = Ring(self.container_ring_path) return self.container_ring - def update_forever(self): # pragma: no cover + def run_forever(self): # pragma: no cover """Run the updater continuously.""" time.sleep(random() * self.interval) while True: @@ -95,7 +97,7 @@ class ObjectUpdater(object): if elapsed < self.interval: time.sleep(self.interval - elapsed) - def update_once_single_threaded(self): + def run_once(self): """Run the updater once""" self.logger.info('Begin object update single threaded sweep') begin = time.time() diff --git a/test/unit/common/test_db_replicator.py b/test/unit/common/test_db_replicator.py index 65fe149ff1..1ffe1e923b 100644 --- a/test/unit/common/test_db_replicator.py +++ b/test/unit/common/test_db_replicator.py @@ -170,9 +170,9 @@ class TestDBReplicator(unittest.TestCase): {'id': 'a', 'point': -1, 'max_row': 10, 'hash': 'd'}, FakeBroker(), -1)), False) - def test_replicate_once(self): + def test_run_once(self): replicator = TestReplicator({}) - replicator.replicate_once() + replicator.run_once() def test_usync(self): fake_http = ReplHttp() diff --git a/test/unit/container/test_updater.py b/test/unit/container/test_updater.py index 108334363c..35e7629162 100644 --- a/test/unit/container/test_updater.py +++ b/test/unit/container/test_updater.py @@ -77,7 +77,7 @@ class TestContainerUpdater(unittest.TestCase): self.assertEquals(cu.node_timeout, 5) self.assert_(cu.get_account_ring() is not None) - def test_update_once_single_threaded(self): + def test_run_once(self): cu = container_updater.ContainerUpdater({ 'devices': self.devices_dir, 'mount_check': 'false', @@ -86,17 +86,17 @@ class TestContainerUpdater(unittest.TestCase): 'concurrency': '1', 'node_timeout': '15', }) - cu.update_once_single_threaded() + cu.run_once() containers_dir = os.path.join(self.sda1, container_server.DATADIR) os.mkdir(containers_dir) - cu.update_once_single_threaded() + cu.run_once() self.assert_(os.path.exists(containers_dir)) subdir = os.path.join(containers_dir, 'subdir') os.mkdir(subdir) cb = ContainerBroker(os.path.join(subdir, 'hash.db'), account='a', container='c') cb.initialize(normalize_timestamp(1)) - cu.update_once_single_threaded() + cu.run_once() info = cb.get_info() self.assertEquals(info['object_count'], 0) self.assertEquals(info['bytes_used'], 0) @@ -105,7 +105,7 @@ class TestContainerUpdater(unittest.TestCase): cb.put_object('o', normalize_timestamp(2), 3, 'text/plain', '68b329da9893e34099c7d8ad5cb9c940') - cu.update_once_single_threaded() + cu.run_once() info = cb.get_info() self.assertEquals(info['object_count'], 1) self.assertEquals(info['bytes_used'], 3) @@ -148,7 +148,7 @@ class TestContainerUpdater(unittest.TestCase): for dev in cu.get_account_ring().devs: if dev is not None: dev['port'] = bindsock.getsockname()[1] - cu.update_once_single_threaded() + cu.run_once() for event in spawned.wait(): err = event.wait() if err: @@ -202,7 +202,7 @@ class TestContainerUpdater(unittest.TestCase): for dev in cu.get_account_ring().devs: if dev is not None: dev['port'] = bindsock.getsockname()[1] - cu.update_once_single_threaded() + cu.run_once() for event in spawned.wait(): err = event.wait() if err: diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index be04f9ee01..06f7d74582 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -105,7 +105,7 @@ class TestObjectReplicator(unittest.TestCase): swift_dir=self.testdir, devices=self.devices, mount_check='false', timeout='300', stats_interval='1') self.replicator = object_replicator.ObjectReplicator( - self.conf, null_logger) + self.conf) # def test_check_ring(self): # self.replicator.collect_jobs('sda', 0, self.ring) @@ -184,11 +184,11 @@ class TestObjectReplicator(unittest.TestCase): def test_run(self): with _mock_process([(0,'')]*100): - self.replicator.run() + self.replicator.replicate() def test_run_withlog(self): with _mock_process([(0,"stuff in log")]*100): - self.replicator.run() + self.replicator.replicate() if __name__ == '__main__': unittest.main() diff --git a/test/unit/obj/test_updater.py b/test/unit/obj/test_updater.py index 0d71dc966c..0064e2cbd2 100644 --- a/test/unit/obj/test_updater.py +++ b/test/unit/obj/test_updater.py @@ -69,7 +69,7 @@ class TestObjectUpdater(unittest.TestCase): self.assertEquals(cu.node_timeout, 5) self.assert_(cu.get_container_ring() is not None) - def test_update_once_single_threaded(self): + def test_run_once(self): cu = object_updater.ObjectUpdater({ 'devices': self.devices_dir, 'mount_check': 'false', @@ -78,15 +78,15 @@ class TestObjectUpdater(unittest.TestCase): 'concurrency': '1', 'node_timeout': '15', }) - cu.update_once_single_threaded() + cu.run_once() async_dir = os.path.join(self.sda1, object_server.ASYNCDIR) os.mkdir(async_dir) - cu.update_once_single_threaded() + cu.run_once() self.assert_(os.path.exists(async_dir)) odd_dir = os.path.join(async_dir, 'not really supposed to be here') os.mkdir(odd_dir) - cu.update_once_single_threaded() + cu.run_once() self.assert_(os.path.exists(async_dir)) self.assert_(not os.path.exists(odd_dir)) @@ -98,7 +98,7 @@ class TestObjectUpdater(unittest.TestCase): pickle.dump({'op': 'PUT', 'account': 'a', 'container': 'c', 'obj': 'o', 'headers': {'X-Container-Timestamp': normalize_timestamp(0)}}, open(op_path, 'wb')) - cu.update_once_single_threaded() + cu.run_once() self.assert_(os.path.exists(op_path)) bindsock = listen(('127.0.0.1', 0)) @@ -140,7 +140,7 @@ class TestObjectUpdater(unittest.TestCase): for dev in cu.get_container_ring().devs: if dev is not None: dev['port'] = bindsock.getsockname()[1] - cu.update_once_single_threaded() + cu.run_once() err = event.wait() if err: raise err