From 52ba08d67d063878f05a60f371422a3549200f96 Mon Sep 17 00:00:00 2001 From: gholt Date: Tue, 6 Mar 2012 03:24:16 +0000 Subject: [PATCH] Improvements to database replication. Note: I'd like to get this released as soon as possible as it is a data durability issue. 1) Orders nodes so that none get starved (see code and footnote). 2) New max_diffs setting that caps how long the replicator will spend trying to sync a given database per pass so the other databases don't get starved. 3) Replaces run_pause with the more standard "interval", which means the replicator won't pause unless it takes less than the interval set. Change-Id: I986742229e65031df88f5251ca61746b7c8d2bde --- etc/account-server.conf-sample | 3 +- etc/container-server.conf-sample | 3 +- swift/common/db_replicator.py | 56 +++++++++++++++++++++++++------- 3 files changed, 48 insertions(+), 14 deletions(-) diff --git a/etc/account-server.conf-sample b/etc/account-server.conf-sample index 34ba714b72..91df89363f 100644 --- a/etc/account-server.conf-sample +++ b/etc/account-server.conf-sample @@ -32,8 +32,9 @@ use = egg:swift#account # log_facility = LOG_LOCAL0 # log_level = INFO # per_diff = 1000 +# max_diffs = 100 # concurrency = 8 -# run_pause = 30 +# interval = 30 # How long without an error before a node's error count is reset. This will # also be how long before a node is reenabled after suppression is triggered. # error_suppression_interval = 60 diff --git a/etc/container-server.conf-sample b/etc/container-server.conf-sample index 4c488c25c1..6f295ade16 100644 --- a/etc/container-server.conf-sample +++ b/etc/container-server.conf-sample @@ -35,8 +35,9 @@ use = egg:swift#container # log_level = INFO # vm_test_mode = no # per_diff = 1000 +# max_diffs = 100 # concurrency = 8 -# run_pause = 30 +# interval = 30 # node_timeout = 10 # conn_timeout = 0.5 # The replicator also performs reclamation diff --git a/swift/common/db_replicator.py b/swift/common/db_replicator.py index 181c113fe9..b27ac43380 100644 --- a/swift/common/db_replicator.py +++ b/swift/common/db_replicator.py @@ -112,7 +112,9 @@ class Replicator(Daemon): swift_dir = conf.get('swift_dir', '/etc/swift') self.ring = ring.Ring(os.path.join(swift_dir, self.ring_file)) self.per_diff = int(conf.get('per_diff', 1000)) - self.run_pause = int(conf.get('run_pause', 30)) + self.max_diffs = int(conf.get('max_diffs') or 100) + self.interval = int(conf.get('interval') or + conf.get('run_pause') or 30) self.vm_test_mode = conf.get( 'vm_test_mode', 'no').lower() in ('yes', 'true', 'on', '1') self.node_timeout = int(conf.get('node_timeout', 10)) @@ -125,7 +127,7 @@ class Replicator(Daemon): self.stats = {'attempted': 0, 'success': 0, 'failure': 0, 'ts_repl': 0, 'no_change': 0, 'hashmatch': 0, 'rsync': 0, 'diff': 0, 'remove': 0, 'empty': 0, 'remote_merge': 0, - 'start': time.time()} + 'start': time.time(), 'diff_capped': 0} def _report_stats(self): """Report the current stats to the logs.""" @@ -141,7 +143,8 @@ class Replicator(Daemon): % self.stats) self.logger.info(' '.join(['%s:%s' % item for item in self.stats.items() if item[0] in - ('no_change', 'hashmatch', 'rsync', 'diff', 'ts_repl', 'empty')])) + ('no_change', 'hashmatch', 'rsync', 'diff', 'ts_repl', 'empty', + 'diff_capped')])) def _rsync_file(self, db_file, remote_file, whole_file=True): """ @@ -215,7 +218,9 @@ class Replicator(Daemon): self.logger.debug(_('Syncing chunks with %s'), http.host) sync_table = broker.get_syncs() objects = broker.get_items_since(point, self.per_diff) - while len(objects): + diffs = 0 + while len(objects) and diffs < self.max_diffs: + diffs += 1 with Timeout(self.node_timeout): response = http.replicate('merge_items', objects, local_id) if not response or response.status >= 300 or response.status < 200: @@ -226,12 +231,18 @@ class Replicator(Daemon): return False point = objects[-1]['ROWID'] objects = broker.get_items_since(point, self.per_diff) - with Timeout(self.node_timeout): - response = http.replicate('merge_syncs', sync_table) - if response and response.status >= 200 and response.status < 300: - broker.merge_syncs([{'remote_id': remote_id, - 'sync_point': point}], incoming=False) - return True + if objects: + self.logger.debug(_('Synchronization for %s has fallen more than ' + '%s rows behind; moving on and will try again next pass.') % + (broker.db_file, self.max_diffs * self.per_diff)) + self.stats['diff_capped'] += 1 + else: + with Timeout(self.node_timeout): + response = http.replicate('merge_syncs', sync_table) + if response and response.status >= 200 and response.status < 300: + broker.merge_syncs([{'remote_id': remote_id, + 'sync_point': point}], incoming=False) + return True return False def _in_sync(self, rinfo, info, broker, local_sync): @@ -360,7 +371,11 @@ class Replicator(Daemon): responses = [] nodes = self.ring.get_part_nodes(int(partition)) shouldbehere = bool([n for n in nodes if n['id'] == node_id]) - repl_nodes = [n for n in nodes if n['id'] != node_id] + # See Footnote [1] for an explanation of the repl_nodes assignment. + i = 0 + while i < len(nodes) and nodes[i]['id'] != node_id: + i += 1 + repl_nodes = nodes[i + 1:] + nodes[:i] more_nodes = self.ring.get_more_nodes(int(partition)) for node in repl_nodes: success = False @@ -439,12 +454,16 @@ class Replicator(Daemon): """ Replicate dbs under the given root in an infinite loop. """ + sleep(random.random() * self.interval) while True: + begin = time.time() try: self.run_once() except (Exception, Timeout): self.logger.exception(_('ERROR trying to replicate')) - sleep(self.run_pause) + elapsed = time.time() - begin + if elapsed < self.interval: + sleep(self.interval - elapsed) class ReplicatorRpc(object): @@ -565,3 +584,16 @@ class ReplicatorRpc(object): new_broker.newid(args[0]) renamer(old_filename, db_file) return HTTPNoContent() + +# Footnote [1]: +# This orders the nodes so that, given nodes a b c, a will contact b then c, +# b will contact c then a, and c will contact a then b -- in other words, each +# node will always contact the next node in the list first. +# This helps in the case where databases are all way out of sync, so each +# node is likely to be sending to a different node than it's receiving from, +# rather than two nodes talking to each other, starving out the third. +# If the third didn't even have a copy and the first two nodes were way out +# of sync, such starvation would mean the third node wouldn't get any copy +# until the first two nodes finally got in sync, which could take a while. +# This new ordering ensures such starvation doesn't occur, making the data +# more durable.