Merge "Add device name to *-replicator.removes for DBs"
This commit is contained in:
commit
1712135a7e
@ -509,9 +509,9 @@ Metric Name Description
|
|||||||
|
|
||||||
Metrics for `account-replicator`:
|
Metrics for `account-replicator`:
|
||||||
|
|
||||||
================================== ====================================================
|
===================================== ====================================================
|
||||||
Metric Name Description
|
Metric Name Description
|
||||||
---------------------------------- ----------------------------------------------------
|
------------------------------------- ----------------------------------------------------
|
||||||
`account-replicator.diffs` Count of syncs handled by sending differing rows.
|
`account-replicator.diffs` Count of syncs handled by sending differing rows.
|
||||||
`account-replicator.diff_caps` Count of "diffs" operations which failed because
|
`account-replicator.diff_caps` Count of "diffs" operations which failed because
|
||||||
"max_diffs" was hit.
|
"max_diffs" was hit.
|
||||||
@ -527,7 +527,7 @@ Metric Name Description
|
|||||||
due to corruption (quarantined) or inability to read
|
due to corruption (quarantined) or inability to read
|
||||||
as well as attempts to individual nodes which
|
as well as attempts to individual nodes which
|
||||||
failed.
|
failed.
|
||||||
`account-replicator.removes` Count of databases deleted because the
|
`account-replicator.removes.<device>` Count of databases on <device> deleted because the
|
||||||
delete_timestamp was greater than the put_timestamp
|
delete_timestamp was greater than the put_timestamp
|
||||||
and the database had no rows or because it was
|
and the database had no rows or because it was
|
||||||
successfully sync'ed to other locations and doesn't
|
successfully sync'ed to other locations and doesn't
|
||||||
@ -536,7 +536,7 @@ Metric Name Description
|
|||||||
which were successful.
|
which were successful.
|
||||||
`account-replicator.timing` Timing data for each database replication attempt
|
`account-replicator.timing` Timing data for each database replication attempt
|
||||||
not resulting in a failure.
|
not resulting in a failure.
|
||||||
================================== ====================================================
|
===================================== ====================================================
|
||||||
|
|
||||||
Metrics for `container-auditor`:
|
Metrics for `container-auditor`:
|
||||||
|
|
||||||
@ -552,9 +552,9 @@ Metric Name Description
|
|||||||
|
|
||||||
Metrics for `container-replicator`:
|
Metrics for `container-replicator`:
|
||||||
|
|
||||||
==================================== ====================================================
|
======================================= ====================================================
|
||||||
Metric Name Description
|
Metric Name Description
|
||||||
------------------------------------ ----------------------------------------------------
|
--------------------------------------- ----------------------------------------------------
|
||||||
`container-replicator.diffs` Count of syncs handled by sending differing rows.
|
`container-replicator.diffs` Count of syncs handled by sending differing rows.
|
||||||
`container-replicator.diff_caps` Count of "diffs" operations which failed because
|
`container-replicator.diff_caps` Count of "diffs" operations which failed because
|
||||||
"max_diffs" was hit.
|
"max_diffs" was hit.
|
||||||
@ -570,7 +570,7 @@ Metric Name Description
|
|||||||
due to corruption (quarantined) or inability to read
|
due to corruption (quarantined) or inability to read
|
||||||
as well as attempts to individual nodes which
|
as well as attempts to individual nodes which
|
||||||
failed.
|
failed.
|
||||||
`container-replicator.removes` Count of databases deleted because the
|
`container-replicator.removes.<device>` Count of databases deleted on <device> because the
|
||||||
delete_timestamp was greater than the put_timestamp
|
delete_timestamp was greater than the put_timestamp
|
||||||
and the database had no rows or because it was
|
and the database had no rows or because it was
|
||||||
successfully sync'ed to other locations and doesn't
|
successfully sync'ed to other locations and doesn't
|
||||||
@ -579,7 +579,7 @@ Metric Name Description
|
|||||||
which were successful.
|
which were successful.
|
||||||
`container-replicator.timing` Timing data for each database replication attempt
|
`container-replicator.timing` Timing data for each database replication attempt
|
||||||
not resulting in a failure.
|
not resulting in a failure.
|
||||||
==================================== ====================================================
|
======================================= ====================================================
|
||||||
|
|
||||||
Metrics for `container-server` ("Not Found" is not considered an error and requests
|
Metrics for `container-server` ("Not Found" is not considered an error and requests
|
||||||
which increment `errors` are not included in the timing data):
|
which increment `errors` are not included in the timing data):
|
||||||
|
@ -21,6 +21,7 @@ import time
|
|||||||
import shutil
|
import shutil
|
||||||
import uuid
|
import uuid
|
||||||
import errno
|
import errno
|
||||||
|
import re
|
||||||
|
|
||||||
from eventlet import GreenPool, sleep, Timeout
|
from eventlet import GreenPool, sleep, Timeout
|
||||||
from eventlet.green import subprocess
|
from eventlet.green import subprocess
|
||||||
@ -130,6 +131,8 @@ class Replicator(Daemon):
|
|||||||
self.recon_replicator = '%s.recon' % self.server_type
|
self.recon_replicator = '%s.recon' % self.server_type
|
||||||
self.rcache = os.path.join(self.recon_cache_path,
|
self.rcache = os.path.join(self.recon_cache_path,
|
||||||
self.recon_replicator)
|
self.recon_replicator)
|
||||||
|
self.extract_device_re = re.compile('%s%s([^%s]+)' % (
|
||||||
|
self.root, os.path.sep, os.path.sep))
|
||||||
|
|
||||||
def _zero_stats(self):
|
def _zero_stats(self):
|
||||||
"""Zero out the stats."""
|
"""Zero out the stats."""
|
||||||
@ -388,10 +391,7 @@ class Replicator(Daemon):
|
|||||||
delete_timestamp > put_timestamp and \
|
delete_timestamp > put_timestamp and \
|
||||||
info['count'] in (None, '', 0, '0'):
|
info['count'] in (None, '', 0, '0'):
|
||||||
if self.report_up_to_date(full_info):
|
if self.report_up_to_date(full_info):
|
||||||
with lock_parent_directory(object_file):
|
self.delete_db(object_file)
|
||||||
shutil.rmtree(os.path.dirname(object_file), True)
|
|
||||||
self.stats['remove'] += 1
|
|
||||||
self.logger.increment('removes')
|
|
||||||
self.logger.timing_since('timing', start_time)
|
self.logger.timing_since('timing', start_time)
|
||||||
return
|
return
|
||||||
responses = []
|
responses = []
|
||||||
@ -419,11 +419,27 @@ class Replicator(Daemon):
|
|||||||
if not shouldbehere and all(responses):
|
if not shouldbehere and all(responses):
|
||||||
# If the db shouldn't be on this node and has been successfully
|
# If the db shouldn't be on this node and has been successfully
|
||||||
# synced to all of its peers, it can be removed.
|
# synced to all of its peers, it can be removed.
|
||||||
|
self.delete_db(object_file)
|
||||||
|
self.logger.timing_since('timing', start_time)
|
||||||
|
|
||||||
|
def delete_db(self, object_file):
|
||||||
with lock_parent_directory(object_file):
|
with lock_parent_directory(object_file):
|
||||||
shutil.rmtree(os.path.dirname(object_file), True)
|
shutil.rmtree(os.path.dirname(object_file), True)
|
||||||
self.stats['remove'] += 1
|
self.stats['remove'] += 1
|
||||||
self.logger.increment('removes')
|
device_name = self.extract_device(object_file)
|
||||||
self.logger.timing_since('timing', start_time)
|
self.logger.increment('removes.' + device_name)
|
||||||
|
|
||||||
|
def extract_device(self, object_file):
|
||||||
|
"""
|
||||||
|
Extract the device name from an object path. Returns "UNKNOWN" if the
|
||||||
|
path could not be extracted successfully for some reason.
|
||||||
|
|
||||||
|
:param object_file: the path to a database file.
|
||||||
|
"""
|
||||||
|
match = self.extract_device_re.match(object_file)
|
||||||
|
if match:
|
||||||
|
return match.groups()[0]
|
||||||
|
return "UNKNOWN"
|
||||||
|
|
||||||
def report_up_to_date(self, full_info):
|
def report_up_to_date(self, full_info):
|
||||||
return True
|
return True
|
||||||
|
@ -6,6 +6,7 @@ import copy
|
|||||||
import logging
|
import logging
|
||||||
from sys import exc_info
|
from sys import exc_info
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
|
from collections import defaultdict
|
||||||
from tempfile import NamedTemporaryFile
|
from tempfile import NamedTemporaryFile
|
||||||
from eventlet.green import socket
|
from eventlet.green import socket
|
||||||
from tempfile import mkdtemp
|
from tempfile import mkdtemp
|
||||||
@ -117,30 +118,28 @@ class FakeLogger(object):
|
|||||||
self.facility = kwargs['facility']
|
self.facility = kwargs['facility']
|
||||||
|
|
||||||
def _clear(self):
|
def _clear(self):
|
||||||
self.log_dict = dict(
|
self.log_dict = defaultdict(list)
|
||||||
error=[], info=[], warning=[], debug=[], exception=[])
|
|
||||||
|
|
||||||
def error(self, *args, **kwargs):
|
def _store_in(store_name):
|
||||||
self.log_dict['error'].append((args, kwargs))
|
def stub_fn(self, *args, **kwargs):
|
||||||
|
self.log_dict[store_name].append((args, kwargs))
|
||||||
|
return stub_fn
|
||||||
|
|
||||||
def info(self, *args, **kwargs):
|
error = _store_in('error')
|
||||||
self.log_dict['info'].append((args, kwargs))
|
info = _store_in('info')
|
||||||
|
warning = _store_in('warning')
|
||||||
def warning(self, *args, **kwargs):
|
debug = _store_in('debug')
|
||||||
self.log_dict['warning'].append((args, kwargs))
|
|
||||||
|
|
||||||
def debug(self, *args, **kwargs):
|
|
||||||
self.log_dict['debug'].append((args, kwargs))
|
|
||||||
|
|
||||||
def exception(self, *args, **kwargs):
|
def exception(self, *args, **kwargs):
|
||||||
self.log_dict['exception'].append((args, kwargs, str(exc_info()[1])))
|
self.log_dict['exception'].append((args, kwargs, str(exc_info()[1])))
|
||||||
|
|
||||||
# mock out the StatsD logging methods:
|
# mock out the StatsD logging methods:
|
||||||
def set_statsd_prefix(self, *a, **kw):
|
increment = _store_in('increment')
|
||||||
pass
|
decrement = _store_in('decrement')
|
||||||
|
timing = _store_in('timing')
|
||||||
increment = decrement = timing = timing_since = update_stats = \
|
timing_since = _store_in('timing_since')
|
||||||
set_statsd_prefix
|
update_stats = _store_in('update_stats')
|
||||||
|
set_statsd_prefix = _store_in('set_statsd_prefix')
|
||||||
|
|
||||||
def setFormatter(self, obj):
|
def setFormatter(self, obj):
|
||||||
self.formatter = obj
|
self.formatter = obj
|
||||||
|
@ -18,21 +18,26 @@ from contextlib import contextmanager
|
|||||||
import os
|
import os
|
||||||
import logging
|
import logging
|
||||||
import errno
|
import errno
|
||||||
|
from tempfile import mkdtemp, NamedTemporaryFile
|
||||||
|
|
||||||
from swift.common import db_replicator
|
from swift.common import db_replicator
|
||||||
from swift.common import utils
|
from swift.common import utils
|
||||||
from swift.common.utils import normalize_timestamp
|
from swift.common.utils import normalize_timestamp
|
||||||
from swift.container import server as container_server
|
from swift.container import server as container_server
|
||||||
|
|
||||||
|
from test.unit import FakeLogger
|
||||||
|
|
||||||
|
|
||||||
def teardown_module():
|
def teardown_module():
|
||||||
"clean up my monkey patching"
|
"clean up my monkey patching"
|
||||||
reload(db_replicator)
|
reload(db_replicator)
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def lock_parent_directory(filename):
|
def lock_parent_directory(filename):
|
||||||
yield True
|
yield True
|
||||||
|
|
||||||
|
|
||||||
class FakeRing:
|
class FakeRing:
|
||||||
class Ring:
|
class Ring:
|
||||||
devs = []
|
devs = []
|
||||||
@ -43,6 +48,30 @@ class FakeRing:
|
|||||||
def get_more_nodes(self, *args):
|
def get_more_nodes(self, *args):
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
class FakeRingWithNodes:
|
||||||
|
class Ring:
|
||||||
|
devs = [dict(
|
||||||
|
id=1, weight=10.0, zone=1, ip='1.1.1.1', port=6000, device='sdb',
|
||||||
|
meta=''
|
||||||
|
), dict(
|
||||||
|
id=2, weight=10.0, zone=2, ip='1.1.1.2', port=6000, device='sdb',
|
||||||
|
meta=''
|
||||||
|
), dict(
|
||||||
|
id=3, weight=10.0, zone=3, ip='1.1.1.3', port=6000, device='sdb',
|
||||||
|
meta=''
|
||||||
|
), dict(
|
||||||
|
id=4, weight=10.0, zone=4, ip='1.1.1.4', port=6000, device='sdb',
|
||||||
|
meta='')]
|
||||||
|
|
||||||
|
def __init__(self, path, reload_time=15, ring_name=None):
|
||||||
|
pass
|
||||||
|
def get_part_nodes(self, part):
|
||||||
|
return self.devs[:3]
|
||||||
|
def get_more_nodes(self, *args):
|
||||||
|
return (d for d in self.devs[3:])
|
||||||
|
|
||||||
|
|
||||||
class FakeProcess:
|
class FakeProcess:
|
||||||
def __init__(self, *codes):
|
def __init__(self, *codes):
|
||||||
self.codes = iter(codes)
|
self.codes = iter(codes)
|
||||||
@ -56,6 +85,7 @@ class FakeProcess:
|
|||||||
raise next
|
raise next
|
||||||
return Failure()
|
return Failure()
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def _mock_process(*args):
|
def _mock_process(*args):
|
||||||
orig_process = db_replicator.subprocess.Popen
|
orig_process = db_replicator.subprocess.Popen
|
||||||
@ -63,6 +93,7 @@ def _mock_process(*args):
|
|||||||
yield
|
yield
|
||||||
db_replicator.subprocess.Popen = orig_process
|
db_replicator.subprocess.Popen = orig_process
|
||||||
|
|
||||||
|
|
||||||
class ReplHttp:
|
class ReplHttp:
|
||||||
def __init__(self, response=None):
|
def __init__(self, response=None):
|
||||||
self.response = response
|
self.response = response
|
||||||
@ -77,6 +108,7 @@ class ReplHttp:
|
|||||||
return self.response
|
return self.response
|
||||||
return Response()
|
return Response()
|
||||||
|
|
||||||
|
|
||||||
class ChangingMtimesOs:
|
class ChangingMtimesOs:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.mtime = 0
|
self.mtime = 0
|
||||||
@ -86,9 +118,11 @@ class ChangingMtimesOs:
|
|||||||
self.mtime += 1
|
self.mtime += 1
|
||||||
return self.mtime
|
return self.mtime
|
||||||
|
|
||||||
|
|
||||||
class FakeBroker:
|
class FakeBroker:
|
||||||
db_file = __file__
|
db_file = __file__
|
||||||
get_repl_missing_table = False
|
get_repl_missing_table = False
|
||||||
|
stub_replication_info = None
|
||||||
db_type = 'container'
|
db_type = 'container'
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
return None
|
return None
|
||||||
@ -110,11 +144,13 @@ class FakeBroker:
|
|||||||
def get_replication_info(self):
|
def get_replication_info(self):
|
||||||
if self.get_repl_missing_table:
|
if self.get_repl_missing_table:
|
||||||
raise Exception('no such table')
|
raise Exception('no such table')
|
||||||
|
if self.stub_replication_info:
|
||||||
|
return self.stub_replication_info
|
||||||
return {'delete_timestamp': 0, 'put_timestamp': 1, 'count': 0}
|
return {'delete_timestamp': 0, 'put_timestamp': 1, 'count': 0}
|
||||||
def reclaim(self, item_timestamp, sync_timestamp):
|
def reclaim(self, item_timestamp, sync_timestamp):
|
||||||
pass
|
pass
|
||||||
|
def get_info(self):
|
||||||
db_replicator.ring = FakeRing()
|
pass
|
||||||
|
|
||||||
|
|
||||||
class TestReplicator(db_replicator.Replicator):
|
class TestReplicator(db_replicator.Replicator):
|
||||||
@ -124,7 +160,15 @@ class TestReplicator(db_replicator.Replicator):
|
|||||||
datadir = container_server.DATADIR
|
datadir = container_server.DATADIR
|
||||||
default_port = 1000
|
default_port = 1000
|
||||||
|
|
||||||
|
|
||||||
class TestDBReplicator(unittest.TestCase):
|
class TestDBReplicator(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
db_replicator.ring = FakeRing()
|
||||||
|
self.delete_db_calls = []
|
||||||
|
|
||||||
|
def stub_delete_db(self, object_file):
|
||||||
|
self.delete_db_calls.append(object_file)
|
||||||
|
|
||||||
|
|
||||||
def test_repl_connection(self):
|
def test_repl_connection(self):
|
||||||
node = {'ip': '127.0.0.1', 'port': 80, 'device': 'sdb1'}
|
node = {'ip': '127.0.0.1', 'port': 80, 'device': 'sdb1'}
|
||||||
@ -204,9 +248,11 @@ class TestDBReplicator(unittest.TestCase):
|
|||||||
replicator._report_stats()
|
replicator._report_stats()
|
||||||
|
|
||||||
def test_replicate_object(self):
|
def test_replicate_object(self):
|
||||||
db_replicator.lock_parent_directory = lock_parent_directory
|
db_replicator.ring = FakeRingWithNodes()
|
||||||
replicator = TestReplicator({})
|
replicator = TestReplicator({})
|
||||||
replicator._replicate_object('0', 'file', 'node_id')
|
replicator.delete_db = self.stub_delete_db
|
||||||
|
replicator._replicate_object('0', '/path/to/file', 'node_id')
|
||||||
|
self.assertEquals([], self.delete_db_calls)
|
||||||
|
|
||||||
def test_replicate_object_quarantine(self):
|
def test_replicate_object_quarantine(self):
|
||||||
replicator = TestReplicator({})
|
replicator = TestReplicator({})
|
||||||
@ -227,7 +273,6 @@ class TestDBReplicator(unittest.TestCase):
|
|||||||
return mock_renamer(was, new, cause_colision=True)
|
return mock_renamer(was, new, cause_colision=True)
|
||||||
was_renamer = db_replicator.renamer
|
was_renamer = db_replicator.renamer
|
||||||
db_replicator.renamer = mock_renamer
|
db_replicator.renamer = mock_renamer
|
||||||
db_replicator.lock_parent_directory = lock_parent_directory
|
|
||||||
replicator.brokerclass.get_repl_missing_table = True
|
replicator.brokerclass.get_repl_missing_table = True
|
||||||
replicator.brokerclass.db_file = '/a/b/c/d/e/hey'
|
replicator.brokerclass.db_file = '/a/b/c/d/e/hey'
|
||||||
replicator._replicate_object('0', 'file', 'node_id')
|
replicator._replicate_object('0', 'file', 'node_id')
|
||||||
@ -238,6 +283,51 @@ class TestDBReplicator(unittest.TestCase):
|
|||||||
replicator.brokerclass.db_file = was_db_file
|
replicator.brokerclass.db_file = was_db_file
|
||||||
db_replicator.renamer = was_renamer
|
db_replicator.renamer = was_renamer
|
||||||
|
|
||||||
|
def test_replicate_object_delete_because_deleted(self):
|
||||||
|
replicator = TestReplicator({})
|
||||||
|
try:
|
||||||
|
replicator.delete_db = self.stub_delete_db
|
||||||
|
replicator.brokerclass.stub_replication_info = {
|
||||||
|
'delete_timestamp': 2, 'put_timestamp': 1, 'count': 0}
|
||||||
|
replicator._replicate_object('0', '/path/to/file', 'node_id')
|
||||||
|
finally:
|
||||||
|
replicator.brokerclass.stub_replication_info = None
|
||||||
|
self.assertEquals(['/path/to/file'], self.delete_db_calls)
|
||||||
|
|
||||||
|
def test_replicate_object_delete_because_not_shouldbehere(self):
|
||||||
|
replicator = TestReplicator({})
|
||||||
|
replicator.delete_db = self.stub_delete_db
|
||||||
|
replicator._replicate_object('0', '/path/to/file', 'node_id')
|
||||||
|
self.assertEquals(['/path/to/file'], self.delete_db_calls)
|
||||||
|
|
||||||
|
def test_delete_db(self):
|
||||||
|
db_replicator.lock_parent_directory = lock_parent_directory
|
||||||
|
replicator = TestReplicator({})
|
||||||
|
replicator._zero_stats()
|
||||||
|
replicator.extract_device = lambda _: 'some_device'
|
||||||
|
replicator.logger = FakeLogger()
|
||||||
|
|
||||||
|
temp_dir = mkdtemp()
|
||||||
|
temp_file = NamedTemporaryFile(dir=temp_dir, delete=False)
|
||||||
|
|
||||||
|
# sanity-checks
|
||||||
|
self.assertTrue(os.path.exists(temp_dir))
|
||||||
|
self.assertTrue(os.path.exists(temp_file.name))
|
||||||
|
self.assertEqual(0, replicator.stats['remove'])
|
||||||
|
|
||||||
|
replicator.delete_db(temp_file.name)
|
||||||
|
|
||||||
|
self.assertFalse(os.path.exists(temp_dir))
|
||||||
|
self.assertFalse(os.path.exists(temp_file.name))
|
||||||
|
self.assertEqual([(('removes.some_device',), {})],
|
||||||
|
replicator.logger.log_dict['increment'])
|
||||||
|
self.assertEqual(1, replicator.stats['remove'])
|
||||||
|
|
||||||
|
def test_extract_device(self):
|
||||||
|
replicator = TestReplicator({'devices': '/some/root'})
|
||||||
|
self.assertEqual('some_device', replicator.extract_device(
|
||||||
|
'/some/root/some_device/deeper/and/deeper'))
|
||||||
|
|
||||||
# def test_dispatch(self):
|
# def test_dispatch(self):
|
||||||
# rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False)
|
# rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False)
|
||||||
# no_op = lambda *args, **kwargs: True
|
# no_op = lambda *args, **kwargs: True
|
||||||
@ -271,6 +361,7 @@ class TestDBReplicator(unittest.TestCase):
|
|||||||
rpc.merge_syncs(fake_broker, args)
|
rpc.merge_syncs(fake_broker, args)
|
||||||
self.assertEquals(fake_broker.args, (args[0],))
|
self.assertEquals(fake_broker.args, (args[0],))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user