Merge "Improvements to database replication."

This commit is contained in:
Jenkins 2012-03-06 17:19:44 +00:00 committed by Gerrit Code Review
commit 3c7e983793
3 changed files with 48 additions and 14 deletions

@ -32,8 +32,9 @@ use = egg:swift#account
# log_facility = LOG_LOCAL0 # log_facility = LOG_LOCAL0
# log_level = INFO # log_level = INFO
# per_diff = 1000 # per_diff = 1000
# max_diffs = 100
# concurrency = 8 # concurrency = 8
# run_pause = 30 # interval = 30
# How long without an error before a node's error count is reset. This will # How long without an error before a node's error count is reset. This will
# also be how long before a node is reenabled after suppression is triggered. # also be how long before a node is reenabled after suppression is triggered.
# error_suppression_interval = 60 # error_suppression_interval = 60

@ -35,8 +35,9 @@ use = egg:swift#container
# log_level = INFO # log_level = INFO
# vm_test_mode = no # vm_test_mode = no
# per_diff = 1000 # per_diff = 1000
# max_diffs = 100
# concurrency = 8 # concurrency = 8
# run_pause = 30 # interval = 30
# node_timeout = 10 # node_timeout = 10
# conn_timeout = 0.5 # conn_timeout = 0.5
# The replicator also performs reclamation # The replicator also performs reclamation

@ -112,7 +112,9 @@ class Replicator(Daemon):
swift_dir = conf.get('swift_dir', '/etc/swift') swift_dir = conf.get('swift_dir', '/etc/swift')
self.ring = ring.Ring(os.path.join(swift_dir, self.ring_file)) self.ring = ring.Ring(os.path.join(swift_dir, self.ring_file))
self.per_diff = int(conf.get('per_diff', 1000)) self.per_diff = int(conf.get('per_diff', 1000))
self.run_pause = int(conf.get('run_pause', 30)) self.max_diffs = int(conf.get('max_diffs') or 100)
self.interval = int(conf.get('interval') or
conf.get('run_pause') or 30)
self.vm_test_mode = conf.get( self.vm_test_mode = conf.get(
'vm_test_mode', 'no').lower() in ('yes', 'true', 'on', '1') 'vm_test_mode', 'no').lower() in ('yes', 'true', 'on', '1')
self.node_timeout = int(conf.get('node_timeout', 10)) self.node_timeout = int(conf.get('node_timeout', 10))
@ -125,7 +127,7 @@ class Replicator(Daemon):
self.stats = {'attempted': 0, 'success': 0, 'failure': 0, 'ts_repl': 0, self.stats = {'attempted': 0, 'success': 0, 'failure': 0, 'ts_repl': 0,
'no_change': 0, 'hashmatch': 0, 'rsync': 0, 'diff': 0, 'no_change': 0, 'hashmatch': 0, 'rsync': 0, 'diff': 0,
'remove': 0, 'empty': 0, 'remote_merge': 0, 'remove': 0, 'empty': 0, 'remote_merge': 0,
'start': time.time()} 'start': time.time(), 'diff_capped': 0}
def _report_stats(self): def _report_stats(self):
"""Report the current stats to the logs.""" """Report the current stats to the logs."""
@ -141,7 +143,8 @@ class Replicator(Daemon):
% self.stats) % self.stats)
self.logger.info(' '.join(['%s:%s' % item for item in self.logger.info(' '.join(['%s:%s' % item for item in
self.stats.items() if item[0] in self.stats.items() if item[0] in
('no_change', 'hashmatch', 'rsync', 'diff', 'ts_repl', 'empty')])) ('no_change', 'hashmatch', 'rsync', 'diff', 'ts_repl', 'empty',
'diff_capped')]))
def _rsync_file(self, db_file, remote_file, whole_file=True): def _rsync_file(self, db_file, remote_file, whole_file=True):
""" """
@ -215,7 +218,9 @@ class Replicator(Daemon):
self.logger.debug(_('Syncing chunks with %s'), http.host) self.logger.debug(_('Syncing chunks with %s'), http.host)
sync_table = broker.get_syncs() sync_table = broker.get_syncs()
objects = broker.get_items_since(point, self.per_diff) objects = broker.get_items_since(point, self.per_diff)
while len(objects): diffs = 0
while len(objects) and diffs < self.max_diffs:
diffs += 1
with Timeout(self.node_timeout): with Timeout(self.node_timeout):
response = http.replicate('merge_items', objects, local_id) response = http.replicate('merge_items', objects, local_id)
if not response or response.status >= 300 or response.status < 200: if not response or response.status >= 300 or response.status < 200:
@ -226,6 +231,12 @@ class Replicator(Daemon):
return False return False
point = objects[-1]['ROWID'] point = objects[-1]['ROWID']
objects = broker.get_items_since(point, self.per_diff) objects = broker.get_items_since(point, self.per_diff)
if objects:
self.logger.debug(_('Synchronization for %s has fallen more than '
'%s rows behind; moving on and will try again next pass.') %
(broker.db_file, self.max_diffs * self.per_diff))
self.stats['diff_capped'] += 1
else:
with Timeout(self.node_timeout): with Timeout(self.node_timeout):
response = http.replicate('merge_syncs', sync_table) response = http.replicate('merge_syncs', sync_table)
if response and response.status >= 200 and response.status < 300: if response and response.status >= 200 and response.status < 300:
@ -360,7 +371,11 @@ class Replicator(Daemon):
responses = [] responses = []
nodes = self.ring.get_part_nodes(int(partition)) nodes = self.ring.get_part_nodes(int(partition))
shouldbehere = bool([n for n in nodes if n['id'] == node_id]) shouldbehere = bool([n for n in nodes if n['id'] == node_id])
repl_nodes = [n for n in nodes if n['id'] != node_id] # See Footnote [1] for an explanation of the repl_nodes assignment.
i = 0
while i < len(nodes) and nodes[i]['id'] != node_id:
i += 1
repl_nodes = nodes[i + 1:] + nodes[:i]
more_nodes = self.ring.get_more_nodes(int(partition)) more_nodes = self.ring.get_more_nodes(int(partition))
for node in repl_nodes: for node in repl_nodes:
success = False success = False
@ -439,12 +454,16 @@ class Replicator(Daemon):
""" """
Replicate dbs under the given root in an infinite loop. Replicate dbs under the given root in an infinite loop.
""" """
sleep(random.random() * self.interval)
while True: while True:
begin = time.time()
try: try:
self.run_once() self.run_once()
except (Exception, Timeout): except (Exception, Timeout):
self.logger.exception(_('ERROR trying to replicate')) self.logger.exception(_('ERROR trying to replicate'))
sleep(self.run_pause) elapsed = time.time() - begin
if elapsed < self.interval:
sleep(self.interval - elapsed)
class ReplicatorRpc(object): class ReplicatorRpc(object):
@ -565,3 +584,16 @@ class ReplicatorRpc(object):
new_broker.newid(args[0]) new_broker.newid(args[0])
renamer(old_filename, db_file) renamer(old_filename, db_file)
return HTTPNoContent() return HTTPNoContent()
# Footnote [1]:
# This orders the nodes so that, given nodes a b c, a will contact b then c,
# b will contact c then a, and c will contact a then b -- in other words, each
# node will always contact the next node in the list first.
# This helps in the case where databases are all way out of sync, so each
# node is likely to be sending to a different node than it's receiving from,
# rather than two nodes talking to each other, starving out the third.
# If the third didn't even have a copy and the first two nodes were way out
# of sync, such starvation would mean the third node wouldn't get any copy
# until the first two nodes finally got in sync, which could take a while.
# This new ordering ensures such starvation doesn't occur, making the data
# more durable.