From 71b076eb59f0634cb6f9851864b6e0bfd2df2c03 Mon Sep 17 00:00:00 2001
From: Greg Lange <greglange@gmail.com>
Date: Wed, 20 Apr 2011 19:54:28 +0000
Subject: [PATCH] only latest async pending is now sent

---
 swift/common/utils.py         |  6 +++--
 swift/obj/updater.py          | 16 +++++++++--
 test/unit/obj/test_updater.py | 50 ++++++++++++++++++++++++++++++++---
 3 files changed, 65 insertions(+), 7 deletions(-)

diff --git a/swift/common/utils.py b/swift/common/utils.py
index a71a9128ae..4ee57db8f7 100644
--- a/swift/common/utils.py
+++ b/swift/common/utils.py
@@ -776,7 +776,7 @@ def readconf(conf, section_name=None, log_name=None, defaults=None):
     return conf
 
 
-def write_pickle(obj, dest, tmp, pickle_protocol=0):
+def write_pickle(obj, dest, tmp=None, pickle_protocol=0):
     """
     Ensure that a pickle file gets written to disk.  The file
     is first written to a tmp location, ensure it is synced to disk, then
@@ -784,9 +784,11 @@ def write_pickle(obj, dest, tmp, pickle_protocol=0):
 
     :param obj: python object to be pickled
     :param dest: path of final destination file
-    :param tmp: path to tmp to use
+    :param tmp: path to tmp to use, defaults to None
     :param pickle_protocol: protocol to pickle the obj with, defaults to 0
     """
+    if tmp == None:
+        tmp = os.path.dirname(dest)
     fd, tmppath = mkstemp(dir=tmp, suffix='.tmp')
     with os.fdopen(fd, 'wb') as fo:
         pickle.dump(obj, fo, pickle_protocol)
diff --git a/swift/obj/updater.py b/swift/obj/updater.py
index a7715f5a72..c9d8c0ee66 100644
--- a/swift/obj/updater.py
+++ b/swift/obj/updater.py
@@ -132,11 +132,23 @@ class ObjectUpdater(Daemon):
             prefix_path = os.path.join(async_pending, prefix)
             if not os.path.isdir(prefix_path):
                 continue
-            for update in os.listdir(prefix_path):
+            seen = set()
+            for update in sorted(os.listdir(prefix_path), reverse=True):
                 update_path = os.path.join(prefix_path, update)
                 if not os.path.isfile(update_path):
                     continue
-                self.process_object_update(update_path, device)
+                try:
+                    hash, timestamp = update.split('-')
+                except ValueError:
+                    self.logger.error(
+                        _('ERROR async pending file with unexpected name %s')
+                        % (update_path))
+                    continue
+                if hash in seen:
+                    os.unlink(update_path)
+                else:
+                    self.process_object_update(update_path, device)
+                    seen.add(hash)
                 time.sleep(self.slowdown)
             try:
                 os.rmdir(prefix_path)
diff --git a/test/unit/obj/test_updater.py b/test/unit/obj/test_updater.py
index 52e327d1b8..7f43f314d2 100644
--- a/test/unit/obj/test_updater.py
+++ b/test/unit/obj/test_updater.py
@@ -20,14 +20,17 @@ import unittest
 from gzip import GzipFile
 from shutil import rmtree
 from time import time
+from distutils.dir_util import mkpath
 
 from eventlet import spawn, TimeoutError, listen
 from eventlet.timeout import Timeout
 
 from swift.obj import updater as object_updater, server as object_server
+from swift.obj.server import ASYNCDIR
 from swift.common.ring import RingData
 from swift.common import utils
-from swift.common.utils import hash_path, normalize_timestamp, mkdirs
+from swift.common.utils import hash_path, normalize_timestamp, mkdirs, \
+    write_pickle
 
 
 class TestObjectUpdater(unittest.TestCase):
@@ -48,7 +51,7 @@ class TestObjectUpdater(unittest.TestCase):
         os.mkdir(self.devices_dir)
         self.sda1 = os.path.join(self.devices_dir, 'sda1')
         os.mkdir(self.sda1)
-        os.mkdir(os.path.join(self.sda1,'tmp'))
+        os.mkdir(os.path.join(self.sda1, 'tmp'))
 
     def tearDown(self):
         rmtree(self.testdir, ignore_errors=1)
@@ -70,6 +73,45 @@ class TestObjectUpdater(unittest.TestCase):
         self.assertEquals(cu.node_timeout, 5)
         self.assert_(cu.get_container_ring() is not None)
 
+    def test_object_sweep(self):
+        prefix_dir = os.path.join(self.sda1, ASYNCDIR, 'abc')
+        mkpath(prefix_dir)
+
+        objects = {
+            'a': [1089.3, 18.37, 12.83, 1.3],
+            'b': [49.4, 49.3, 49.2, 49.1],
+            'c': [109984.123],
+        }
+
+        expected = set()
+        for o, timestamps in objects.iteritems():
+            ohash = hash_path('account', 'container', o)
+            for t in timestamps:
+                o_path = os.path.join(prefix_dir, ohash + '-' +
+                    normalize_timestamp(t))
+                if t == timestamps[0]:
+                    expected.add(o_path)
+                write_pickle({}, o_path)
+
+        seen = set()
+
+        class MockObjectUpdater(object_updater.ObjectUpdater):
+            def process_object_update(self, update_path, device):
+                seen.add(update_path)
+                os.unlink(update_path)
+
+        cu = MockObjectUpdater({
+            'devices': self.devices_dir,
+            'mount_check': 'false',
+            'swift_dir': self.testdir,
+            'interval': '1',
+            'concurrency': '1',
+            'node_timeout': '5',
+            })
+        cu.object_sweep(self.sda1)
+        self.assert_(not os.path.exists(prefix_dir))
+        self.assertEqual(expected, seen)
+
     def test_run_once(self):
         cu = object_updater.ObjectUpdater({
             'devices': self.devices_dir,
@@ -103,6 +145,7 @@ class TestObjectUpdater(unittest.TestCase):
         self.assert_(os.path.exists(op_path))
 
         bindsock = listen(('127.0.0.1', 0))
+
         def accepter(sock, return_code):
             try:
                 with Timeout(3):
@@ -123,6 +166,7 @@ class TestObjectUpdater(unittest.TestCase):
             except BaseException, err:
                 return err
             return None
+
         def accept(return_codes):
             codes = iter(return_codes)
             try:
@@ -139,7 +183,7 @@ class TestObjectUpdater(unittest.TestCase):
             except BaseException, err:
                 return err
             return None
-        event = spawn(accept, [201,500])
+        event = spawn(accept, [201, 500])
         for dev in cu.get_container_ring().devs:
             if dev is not None:
                 dev['port'] = bindsock.getsockname()[1]