Add device name to *-replicator.removes for DBs

To tell when replication for a device has finished, it's important to
know when the replicator is removing objects.  This was previously
handled for the object-replicator
(object-replicator.partition.delete.count.<device> and
object-replicator.partition.update.count.<device> metrics) but not the
account and container replicators.

This patch extends the existing DB removal count metrics to make them
per-device.  The new metrics are:
 account-replicator.removes.<device>
 container-replicator.removes.<device>

There's also a bonus refactoring and increased test coverage of the DB
replicator code.

Change-Id: I2067317d4a5f8ad2a496834147954bdcdfc541c1
This commit is contained in:
Darrell Bishop 2012-08-17 17:00:50 -07:00
parent 82f1d550b6
commit 66400b7337
4 changed files with 192 additions and 86 deletions

View File

@ -509,34 +509,34 @@ Metric Name Description
Metrics for `account-replicator`:
================================== ====================================================
Metric Name Description
---------------------------------- ----------------------------------------------------
`account-replicator.diffs` Count of syncs handled by sending differing rows.
`account-replicator.diff_caps` Count of "diffs" operations which failed because
"max_diffs" was hit.
`account-replicator.no_changes` Count of accounts found to be in sync.
`account-replicator.hashmatches` Count of accounts found to be in sync via hash
comparison (`broker.merge_syncs` was called).
`account-replicator.rsyncs` Count of completely missing accounts where were sent
via rsync.
`account-replicator.remote_merges` Count of syncs handled by sending entire database
via rsync.
`account-replicator.attempts` Count of database replication attempts.
`account-replicator.failures` Count of database replication attempts which failed
due to corruption (quarantined) or inability to read
as well as attempts to individual nodes which
failed.
`account-replicator.removes` Count of databases deleted because the
delete_timestamp was greater than the put_timestamp
and the database had no rows or because it was
successfully sync'ed to other locations and doesn't
belong here anymore.
`account-replicator.successes` Count of replication attempts to an individual node
which were successful.
`account-replicator.timing` Timing data for each database replication attempt
not resulting in a failure.
================================== ====================================================
===================================== ====================================================
Metric Name Description
------------------------------------- ----------------------------------------------------
`account-replicator.diffs` Count of syncs handled by sending differing rows.
`account-replicator.diff_caps` Count of "diffs" operations which failed because
"max_diffs" was hit.
`account-replicator.no_changes` Count of accounts found to be in sync.
`account-replicator.hashmatches` Count of accounts found to be in sync via hash
comparison (`broker.merge_syncs` was called).
`account-replicator.rsyncs` Count of completely missing accounts where were sent
via rsync.
`account-replicator.remote_merges` Count of syncs handled by sending entire database
via rsync.
`account-replicator.attempts` Count of database replication attempts.
`account-replicator.failures` Count of database replication attempts which failed
due to corruption (quarantined) or inability to read
as well as attempts to individual nodes which
failed.
`account-replicator.removes.<device>` Count of databases on <device> deleted because the
delete_timestamp was greater than the put_timestamp
and the database had no rows or because it was
successfully sync'ed to other locations and doesn't
belong here anymore.
`account-replicator.successes` Count of replication attempts to an individual node
which were successful.
`account-replicator.timing` Timing data for each database replication attempt
not resulting in a failure.
===================================== ====================================================
Metrics for `container-auditor`:
@ -552,34 +552,34 @@ Metric Name Description
Metrics for `container-replicator`:
==================================== ====================================================
Metric Name Description
------------------------------------ ----------------------------------------------------
`container-replicator.diffs` Count of syncs handled by sending differing rows.
`container-replicator.diff_caps` Count of "diffs" operations which failed because
"max_diffs" was hit.
`container-replicator.no_changes` Count of containers found to be in sync.
`container-replicator.hashmatches` Count of containers found to be in sync via hash
comparison (`broker.merge_syncs` was called).
`container-replicator.rsyncs` Count of completely missing containers where were sent
via rsync.
`container-replicator.remote_merges` Count of syncs handled by sending entire database
via rsync.
`container-replicator.attempts` Count of database replication attempts.
`container-replicator.failures` Count of database replication attempts which failed
due to corruption (quarantined) or inability to read
as well as attempts to individual nodes which
failed.
`container-replicator.removes` Count of databases deleted because the
delete_timestamp was greater than the put_timestamp
and the database had no rows or because it was
successfully sync'ed to other locations and doesn't
belong here anymore.
`container-replicator.successes` Count of replication attempts to an individual node
which were successful.
`container-replicator.timing` Timing data for each database replication attempt
not resulting in a failure.
==================================== ====================================================
======================================= ====================================================
Metric Name Description
--------------------------------------- ----------------------------------------------------
`container-replicator.diffs` Count of syncs handled by sending differing rows.
`container-replicator.diff_caps` Count of "diffs" operations which failed because
"max_diffs" was hit.
`container-replicator.no_changes` Count of containers found to be in sync.
`container-replicator.hashmatches` Count of containers found to be in sync via hash
comparison (`broker.merge_syncs` was called).
`container-replicator.rsyncs` Count of completely missing containers where were sent
via rsync.
`container-replicator.remote_merges` Count of syncs handled by sending entire database
via rsync.
`container-replicator.attempts` Count of database replication attempts.
`container-replicator.failures` Count of database replication attempts which failed
due to corruption (quarantined) or inability to read
as well as attempts to individual nodes which
failed.
`container-replicator.removes.<device>` Count of databases deleted on <device> because the
delete_timestamp was greater than the put_timestamp
and the database had no rows or because it was
successfully sync'ed to other locations and doesn't
belong here anymore.
`container-replicator.successes` Count of replication attempts to an individual node
which were successful.
`container-replicator.timing` Timing data for each database replication attempt
not resulting in a failure.
======================================= ====================================================
Metrics for `container-server` ("Not Found" is not considered an error and requests
which increment `errors` are not included in the timing data):

View File

@ -21,6 +21,7 @@ import time
import shutil
import uuid
import errno
import re
from eventlet import GreenPool, sleep, Timeout
from eventlet.green import subprocess
@ -130,6 +131,8 @@ class Replicator(Daemon):
self.recon_replicator = '%s.recon' % self.server_type
self.rcache = os.path.join(self.recon_cache_path,
self.recon_replicator)
self.extract_device_re = re.compile('%s%s([^%s]+)' % (
self.root, os.path.sep, os.path.sep))
def _zero_stats(self):
"""Zero out the stats."""
@ -388,10 +391,7 @@ class Replicator(Daemon):
delete_timestamp > put_timestamp and \
info['count'] in (None, '', 0, '0'):
if self.report_up_to_date(full_info):
with lock_parent_directory(object_file):
shutil.rmtree(os.path.dirname(object_file), True)
self.stats['remove'] += 1
self.logger.increment('removes')
self.delete_db(object_file)
self.logger.timing_since('timing', start_time)
return
responses = []
@ -419,12 +419,28 @@ class Replicator(Daemon):
if not shouldbehere and all(responses):
# If the db shouldn't be on this node and has been successfully
# synced to all of its peers, it can be removed.
with lock_parent_directory(object_file):
shutil.rmtree(os.path.dirname(object_file), True)
self.stats['remove'] += 1
self.logger.increment('removes')
self.delete_db(object_file)
self.logger.timing_since('timing', start_time)
def delete_db(self, object_file):
with lock_parent_directory(object_file):
shutil.rmtree(os.path.dirname(object_file), True)
self.stats['remove'] += 1
device_name = self.extract_device(object_file)
self.logger.increment('removes.' + device_name)
def extract_device(self, object_file):
"""
Extract the device name from an object path. Returns "UNKNOWN" if the
path could not be extracted successfully for some reason.
:param object_file: the path to a database file.
"""
match = self.extract_device_re.match(object_file)
if match:
return match.groups()[0]
return "UNKNOWN"
def report_up_to_date(self, full_info):
return True

View File

@ -6,6 +6,7 @@ import copy
import logging
from sys import exc_info
from contextlib import contextmanager
from collections import defaultdict
from tempfile import NamedTemporaryFile
from eventlet.green import socket
from tempfile import mkdtemp
@ -117,30 +118,28 @@ class FakeLogger(object):
self.facility = kwargs['facility']
def _clear(self):
self.log_dict = dict(
error=[], info=[], warning=[], debug=[], exception=[])
self.log_dict = defaultdict(list)
def error(self, *args, **kwargs):
self.log_dict['error'].append((args, kwargs))
def _store_in(store_name):
def stub_fn(self, *args, **kwargs):
self.log_dict[store_name].append((args, kwargs))
return stub_fn
def info(self, *args, **kwargs):
self.log_dict['info'].append((args, kwargs))
def warning(self, *args, **kwargs):
self.log_dict['warning'].append((args, kwargs))
def debug(self, *args, **kwargs):
self.log_dict['debug'].append((args, kwargs))
error = _store_in('error')
info = _store_in('info')
warning = _store_in('warning')
debug = _store_in('debug')
def exception(self, *args, **kwargs):
self.log_dict['exception'].append((args, kwargs, str(exc_info()[1])))
# mock out the StatsD logging methods:
def set_statsd_prefix(self, *a, **kw):
pass
increment = decrement = timing = timing_since = update_stats = \
set_statsd_prefix
increment = _store_in('increment')
decrement = _store_in('decrement')
timing = _store_in('timing')
timing_since = _store_in('timing_since')
update_stats = _store_in('update_stats')
set_statsd_prefix = _store_in('set_statsd_prefix')
def setFormatter(self, obj):
self.formatter = obj

View File

@ -18,21 +18,26 @@ from contextlib import contextmanager
import os
import logging
import errno
from tempfile import mkdtemp, NamedTemporaryFile
from swift.common import db_replicator
from swift.common import utils
from swift.common.utils import normalize_timestamp
from swift.container import server as container_server
from test.unit import FakeLogger
def teardown_module():
"clean up my monkey patching"
reload(db_replicator)
@contextmanager
def lock_parent_directory(filename):
yield True
class FakeRing:
class Ring:
devs = []
@ -43,6 +48,30 @@ class FakeRing:
def get_more_nodes(self, *args):
return []
class FakeRingWithNodes:
class Ring:
devs = [dict(
id=1, weight=10.0, zone=1, ip='1.1.1.1', port=6000, device='sdb',
meta=''
), dict(
id=2, weight=10.0, zone=2, ip='1.1.1.2', port=6000, device='sdb',
meta=''
), dict(
id=3, weight=10.0, zone=3, ip='1.1.1.3', port=6000, device='sdb',
meta=''
), dict(
id=4, weight=10.0, zone=4, ip='1.1.1.4', port=6000, device='sdb',
meta='')]
def __init__(self, path, reload_time=15, ring_name=None):
pass
def get_part_nodes(self, part):
return self.devs[:3]
def get_more_nodes(self, *args):
return (d for d in self.devs[3:])
class FakeProcess:
def __init__(self, *codes):
self.codes = iter(codes)
@ -56,6 +85,7 @@ class FakeProcess:
raise next
return Failure()
@contextmanager
def _mock_process(*args):
orig_process = db_replicator.subprocess.Popen
@ -63,6 +93,7 @@ def _mock_process(*args):
yield
db_replicator.subprocess.Popen = orig_process
class ReplHttp:
def __init__(self, response=None):
self.response = response
@ -77,6 +108,7 @@ class ReplHttp:
return self.response
return Response()
class ChangingMtimesOs:
def __init__(self):
self.mtime = 0
@ -86,9 +118,11 @@ class ChangingMtimesOs:
self.mtime += 1
return self.mtime
class FakeBroker:
db_file = __file__
get_repl_missing_table = False
stub_replication_info = None
db_type = 'container'
def __init__(self, *args, **kwargs):
return None
@ -110,11 +144,13 @@ class FakeBroker:
def get_replication_info(self):
if self.get_repl_missing_table:
raise Exception('no such table')
if self.stub_replication_info:
return self.stub_replication_info
return {'delete_timestamp': 0, 'put_timestamp': 1, 'count': 0}
def reclaim(self, item_timestamp, sync_timestamp):
pass
db_replicator.ring = FakeRing()
def get_info(self):
pass
class TestReplicator(db_replicator.Replicator):
@ -124,7 +160,15 @@ class TestReplicator(db_replicator.Replicator):
datadir = container_server.DATADIR
default_port = 1000
class TestDBReplicator(unittest.TestCase):
def setUp(self):
db_replicator.ring = FakeRing()
self.delete_db_calls = []
def stub_delete_db(self, object_file):
self.delete_db_calls.append(object_file)
def test_repl_connection(self):
node = {'ip': '127.0.0.1', 'port': 80, 'device': 'sdb1'}
@ -204,9 +248,11 @@ class TestDBReplicator(unittest.TestCase):
replicator._report_stats()
def test_replicate_object(self):
db_replicator.lock_parent_directory = lock_parent_directory
db_replicator.ring = FakeRingWithNodes()
replicator = TestReplicator({})
replicator._replicate_object('0', 'file', 'node_id')
replicator.delete_db = self.stub_delete_db
replicator._replicate_object('0', '/path/to/file', 'node_id')
self.assertEquals([], self.delete_db_calls)
def test_replicate_object_quarantine(self):
replicator = TestReplicator({})
@ -227,7 +273,6 @@ class TestDBReplicator(unittest.TestCase):
return mock_renamer(was, new, cause_colision=True)
was_renamer = db_replicator.renamer
db_replicator.renamer = mock_renamer
db_replicator.lock_parent_directory = lock_parent_directory
replicator.brokerclass.get_repl_missing_table = True
replicator.brokerclass.db_file = '/a/b/c/d/e/hey'
replicator._replicate_object('0', 'file', 'node_id')
@ -238,6 +283,51 @@ class TestDBReplicator(unittest.TestCase):
replicator.brokerclass.db_file = was_db_file
db_replicator.renamer = was_renamer
def test_replicate_object_delete_because_deleted(self):
replicator = TestReplicator({})
try:
replicator.delete_db = self.stub_delete_db
replicator.brokerclass.stub_replication_info = {
'delete_timestamp': 2, 'put_timestamp': 1, 'count': 0}
replicator._replicate_object('0', '/path/to/file', 'node_id')
finally:
replicator.brokerclass.stub_replication_info = None
self.assertEquals(['/path/to/file'], self.delete_db_calls)
def test_replicate_object_delete_because_not_shouldbehere(self):
replicator = TestReplicator({})
replicator.delete_db = self.stub_delete_db
replicator._replicate_object('0', '/path/to/file', 'node_id')
self.assertEquals(['/path/to/file'], self.delete_db_calls)
def test_delete_db(self):
db_replicator.lock_parent_directory = lock_parent_directory
replicator = TestReplicator({})
replicator._zero_stats()
replicator.extract_device = lambda _: 'some_device'
replicator.logger = FakeLogger()
temp_dir = mkdtemp()
temp_file = NamedTemporaryFile(dir=temp_dir, delete=False)
# sanity-checks
self.assertTrue(os.path.exists(temp_dir))
self.assertTrue(os.path.exists(temp_file.name))
self.assertEqual(0, replicator.stats['remove'])
replicator.delete_db(temp_file.name)
self.assertFalse(os.path.exists(temp_dir))
self.assertFalse(os.path.exists(temp_file.name))
self.assertEqual([(('removes.some_device',), {})],
replicator.logger.log_dict['increment'])
self.assertEqual(1, replicator.stats['remove'])
def test_extract_device(self):
replicator = TestReplicator({'devices': '/some/root'})
self.assertEqual('some_device', replicator.extract_device(
'/some/root/some_device/deeper/and/deeper'))
# def test_dispatch(self):
# rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False)
# no_op = lambda *args, **kwargs: True
@ -271,6 +361,7 @@ class TestDBReplicator(unittest.TestCase):
rpc.merge_syncs(fake_broker, args)
self.assertEquals(fake_broker.args, (args[0],))
if __name__ == '__main__':
unittest.main()