only latest async pending is now sent
This commit is contained in:
parent
b9d499a230
commit
71b076eb59
@ -776,7 +776,7 @@ def readconf(conf, section_name=None, log_name=None, defaults=None):
|
|||||||
return conf
|
return conf
|
||||||
|
|
||||||
|
|
||||||
def write_pickle(obj, dest, tmp, pickle_protocol=0):
|
def write_pickle(obj, dest, tmp=None, pickle_protocol=0):
|
||||||
"""
|
"""
|
||||||
Ensure that a pickle file gets written to disk. The file
|
Ensure that a pickle file gets written to disk. The file
|
||||||
is first written to a tmp location, ensure it is synced to disk, then
|
is first written to a tmp location, ensure it is synced to disk, then
|
||||||
@ -784,9 +784,11 @@ def write_pickle(obj, dest, tmp, pickle_protocol=0):
|
|||||||
|
|
||||||
:param obj: python object to be pickled
|
:param obj: python object to be pickled
|
||||||
:param dest: path of final destination file
|
:param dest: path of final destination file
|
||||||
:param tmp: path to tmp to use
|
:param tmp: path to tmp to use, defaults to None
|
||||||
:param pickle_protocol: protocol to pickle the obj with, defaults to 0
|
:param pickle_protocol: protocol to pickle the obj with, defaults to 0
|
||||||
"""
|
"""
|
||||||
|
if tmp == None:
|
||||||
|
tmp = os.path.dirname(dest)
|
||||||
fd, tmppath = mkstemp(dir=tmp, suffix='.tmp')
|
fd, tmppath = mkstemp(dir=tmp, suffix='.tmp')
|
||||||
with os.fdopen(fd, 'wb') as fo:
|
with os.fdopen(fd, 'wb') as fo:
|
||||||
pickle.dump(obj, fo, pickle_protocol)
|
pickle.dump(obj, fo, pickle_protocol)
|
||||||
|
@ -132,11 +132,23 @@ class ObjectUpdater(Daemon):
|
|||||||
prefix_path = os.path.join(async_pending, prefix)
|
prefix_path = os.path.join(async_pending, prefix)
|
||||||
if not os.path.isdir(prefix_path):
|
if not os.path.isdir(prefix_path):
|
||||||
continue
|
continue
|
||||||
for update in os.listdir(prefix_path):
|
seen = set()
|
||||||
|
for update in sorted(os.listdir(prefix_path), reverse=True):
|
||||||
update_path = os.path.join(prefix_path, update)
|
update_path = os.path.join(prefix_path, update)
|
||||||
if not os.path.isfile(update_path):
|
if not os.path.isfile(update_path):
|
||||||
continue
|
continue
|
||||||
self.process_object_update(update_path, device)
|
try:
|
||||||
|
hash, timestamp = update.split('-')
|
||||||
|
except ValueError:
|
||||||
|
self.logger.error(
|
||||||
|
_('ERROR async pending file with unexpected name %s')
|
||||||
|
% (update_path))
|
||||||
|
continue
|
||||||
|
if hash in seen:
|
||||||
|
os.unlink(update_path)
|
||||||
|
else:
|
||||||
|
self.process_object_update(update_path, device)
|
||||||
|
seen.add(hash)
|
||||||
time.sleep(self.slowdown)
|
time.sleep(self.slowdown)
|
||||||
try:
|
try:
|
||||||
os.rmdir(prefix_path)
|
os.rmdir(prefix_path)
|
||||||
|
@ -20,14 +20,17 @@ import unittest
|
|||||||
from gzip import GzipFile
|
from gzip import GzipFile
|
||||||
from shutil import rmtree
|
from shutil import rmtree
|
||||||
from time import time
|
from time import time
|
||||||
|
from distutils.dir_util import mkpath
|
||||||
|
|
||||||
from eventlet import spawn, TimeoutError, listen
|
from eventlet import spawn, TimeoutError, listen
|
||||||
from eventlet.timeout import Timeout
|
from eventlet.timeout import Timeout
|
||||||
|
|
||||||
from swift.obj import updater as object_updater, server as object_server
|
from swift.obj import updater as object_updater, server as object_server
|
||||||
|
from swift.obj.server import ASYNCDIR
|
||||||
from swift.common.ring import RingData
|
from swift.common.ring import RingData
|
||||||
from swift.common import utils
|
from swift.common import utils
|
||||||
from swift.common.utils import hash_path, normalize_timestamp, mkdirs
|
from swift.common.utils import hash_path, normalize_timestamp, mkdirs, \
|
||||||
|
write_pickle
|
||||||
|
|
||||||
|
|
||||||
class TestObjectUpdater(unittest.TestCase):
|
class TestObjectUpdater(unittest.TestCase):
|
||||||
@ -48,7 +51,7 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
os.mkdir(self.devices_dir)
|
os.mkdir(self.devices_dir)
|
||||||
self.sda1 = os.path.join(self.devices_dir, 'sda1')
|
self.sda1 = os.path.join(self.devices_dir, 'sda1')
|
||||||
os.mkdir(self.sda1)
|
os.mkdir(self.sda1)
|
||||||
os.mkdir(os.path.join(self.sda1,'tmp'))
|
os.mkdir(os.path.join(self.sda1, 'tmp'))
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
rmtree(self.testdir, ignore_errors=1)
|
rmtree(self.testdir, ignore_errors=1)
|
||||||
@ -70,6 +73,45 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
self.assertEquals(cu.node_timeout, 5)
|
self.assertEquals(cu.node_timeout, 5)
|
||||||
self.assert_(cu.get_container_ring() is not None)
|
self.assert_(cu.get_container_ring() is not None)
|
||||||
|
|
||||||
|
def test_object_sweep(self):
|
||||||
|
prefix_dir = os.path.join(self.sda1, ASYNCDIR, 'abc')
|
||||||
|
mkpath(prefix_dir)
|
||||||
|
|
||||||
|
objects = {
|
||||||
|
'a': [1089.3, 18.37, 12.83, 1.3],
|
||||||
|
'b': [49.4, 49.3, 49.2, 49.1],
|
||||||
|
'c': [109984.123],
|
||||||
|
}
|
||||||
|
|
||||||
|
expected = set()
|
||||||
|
for o, timestamps in objects.iteritems():
|
||||||
|
ohash = hash_path('account', 'container', o)
|
||||||
|
for t in timestamps:
|
||||||
|
o_path = os.path.join(prefix_dir, ohash + '-' +
|
||||||
|
normalize_timestamp(t))
|
||||||
|
if t == timestamps[0]:
|
||||||
|
expected.add(o_path)
|
||||||
|
write_pickle({}, o_path)
|
||||||
|
|
||||||
|
seen = set()
|
||||||
|
|
||||||
|
class MockObjectUpdater(object_updater.ObjectUpdater):
|
||||||
|
def process_object_update(self, update_path, device):
|
||||||
|
seen.add(update_path)
|
||||||
|
os.unlink(update_path)
|
||||||
|
|
||||||
|
cu = MockObjectUpdater({
|
||||||
|
'devices': self.devices_dir,
|
||||||
|
'mount_check': 'false',
|
||||||
|
'swift_dir': self.testdir,
|
||||||
|
'interval': '1',
|
||||||
|
'concurrency': '1',
|
||||||
|
'node_timeout': '5',
|
||||||
|
})
|
||||||
|
cu.object_sweep(self.sda1)
|
||||||
|
self.assert_(not os.path.exists(prefix_dir))
|
||||||
|
self.assertEqual(expected, seen)
|
||||||
|
|
||||||
def test_run_once(self):
|
def test_run_once(self):
|
||||||
cu = object_updater.ObjectUpdater({
|
cu = object_updater.ObjectUpdater({
|
||||||
'devices': self.devices_dir,
|
'devices': self.devices_dir,
|
||||||
@ -103,6 +145,7 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
self.assert_(os.path.exists(op_path))
|
self.assert_(os.path.exists(op_path))
|
||||||
|
|
||||||
bindsock = listen(('127.0.0.1', 0))
|
bindsock = listen(('127.0.0.1', 0))
|
||||||
|
|
||||||
def accepter(sock, return_code):
|
def accepter(sock, return_code):
|
||||||
try:
|
try:
|
||||||
with Timeout(3):
|
with Timeout(3):
|
||||||
@ -123,6 +166,7 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
except BaseException, err:
|
except BaseException, err:
|
||||||
return err
|
return err
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def accept(return_codes):
|
def accept(return_codes):
|
||||||
codes = iter(return_codes)
|
codes = iter(return_codes)
|
||||||
try:
|
try:
|
||||||
@ -139,7 +183,7 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
except BaseException, err:
|
except BaseException, err:
|
||||||
return err
|
return err
|
||||||
return None
|
return None
|
||||||
event = spawn(accept, [201,500])
|
event = spawn(accept, [201, 500])
|
||||||
for dev in cu.get_container_ring().devs:
|
for dev in cu.get_container_ring().devs:
|
||||||
if dev is not None:
|
if dev is not None:
|
||||||
dev['port'] = bindsock.getsockname()[1]
|
dev['port'] = bindsock.getsockname()[1]
|
||||||
|
Loading…
x
Reference in New Issue
Block a user