diff --git a/swift/common/utils.py b/swift/common/utils.py
index 1c06598f37..cbe27fbd17 100644
--- a/swift/common/utils.py
+++ b/swift/common/utils.py
@@ -3060,18 +3060,27 @@ def human_readable(value):
 
 def put_recon_cache_entry(cache_entry, key, item):
     """
-    Function that will check if item is a dict, and if so put it under
-    cache_entry[key].  We use nested recon cache entries when the object
-    auditor runs in parallel or else in 'once' mode with a specified
-    subset of devices.
+    Update a recon cache entry item.
+
+    If ``item`` is an empty dict then any existing ``key`` in ``cache_entry``
+    will be deleted. Similarly if ``item`` is a dict and any of its values are
+    empty dicts then the corrsponsing key will be deleted from the nested dict
+    in ``cache_entry``.
+
+    We use nested recon cache entries when the object auditor
+    runs in parallel or else in 'once' mode with a specified subset of devices.
+
+    :param cache_entry: a dict of existing cache entries
+    :param key: key for item to update
+    :param item: value for item to update
     """
     if isinstance(item, dict):
+        if not item:
+            cache_entry.pop(key, None)
+            return
         if key not in cache_entry or key in cache_entry and not \
                 isinstance(cache_entry[key], dict):
             cache_entry[key] = {}
-        elif key in cache_entry and item == {}:
-            cache_entry.pop(key, None)
-            return
         for k, v in item.items():
             if v == {}:
                 cache_entry[key].pop(k, None)
diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py
index ea788c61a7..076cc10ba0 100644
--- a/test/unit/common/test_utils.py
+++ b/test/unit/common/test_utils.py
@@ -1361,21 +1361,53 @@ class TestUtils(unittest.TestCase):
         testcache_file = os.path.join(testdir_base, 'cache.recon')
         logger = utils.get_logger(None, 'server', log_route='server')
         try:
-            submit_dict = {'key1': {'value1': 1, 'value2': 2}}
+            submit_dict = {'key0': 99,
+                           'key1': {'value1': 1, 'value2': 2}}
             utils.dump_recon_cache(submit_dict, testcache_file, logger)
-            fd = open(testcache_file)
-            file_dict = json.loads(fd.readline())
-            fd.close()
+            with open(testcache_file) as fd:
+                file_dict = json.loads(fd.readline())
             self.assertEqual(submit_dict, file_dict)
             # Use a nested entry
-            submit_dict = {'key1': {'key2': {'value1': 1, 'value2': 2}}}
-            result_dict = {'key1': {'key2': {'value1': 1, 'value2': 2},
-                           'value1': 1, 'value2': 2}}
+            submit_dict = {'key0': 101,
+                           'key1': {'key2': {'value1': 1, 'value2': 2}}}
+            expect_dict = {'key0': 101,
+                           'key1': {'key2': {'value1': 1, 'value2': 2},
+                                    'value1': 1, 'value2': 2}}
             utils.dump_recon_cache(submit_dict, testcache_file, logger)
-            fd = open(testcache_file)
-            file_dict = json.loads(fd.readline())
-            fd.close()
-            self.assertEqual(result_dict, file_dict)
+            with open(testcache_file) as fd:
+                file_dict = json.loads(fd.readline())
+            self.assertEqual(expect_dict, file_dict)
+            # cached entries are sticky
+            submit_dict = {}
+            utils.dump_recon_cache(submit_dict, testcache_file, logger)
+            with open(testcache_file) as fd:
+                file_dict = json.loads(fd.readline())
+            self.assertEqual(expect_dict, file_dict)
+            # nested dicts can be erased...
+            submit_dict = {'key1': {'key2': {}}}
+            expect_dict = {'key0': 101,
+                           'key1': {'value1': 1, 'value2': 2}}
+            utils.dump_recon_cache(submit_dict, testcache_file, logger)
+            with open(testcache_file) as fd:
+                file_dict = json.loads(fd.readline())
+            self.assertEqual(expect_dict, file_dict)
+            # ... and erasure is idempotent
+            utils.dump_recon_cache(submit_dict, testcache_file, logger)
+            with open(testcache_file) as fd:
+                file_dict = json.loads(fd.readline())
+            self.assertEqual(expect_dict, file_dict)
+            # top level dicts can be erased...
+            submit_dict = {'key1': {}}
+            expect_dict = {'key0': 101}
+            utils.dump_recon_cache(submit_dict, testcache_file, logger)
+            with open(testcache_file) as fd:
+                file_dict = json.loads(fd.readline())
+            self.assertEqual(expect_dict, file_dict)
+            # ... and erasure is idempotent
+            utils.dump_recon_cache(submit_dict, testcache_file, logger)
+            with open(testcache_file) as fd:
+                file_dict = json.loads(fd.readline())
+            self.assertEqual(expect_dict, file_dict)
         finally:
             rmtree(testdir_base)
 
diff --git a/test/unit/obj/test_auditor.py b/test/unit/obj/test_auditor.py
index 1bc076ff2a..8482abb927 100644
--- a/test/unit/obj/test_auditor.py
+++ b/test/unit/obj/test_auditor.py
@@ -12,6 +12,7 @@
 # implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import json
 
 from test import unit
 import unittest
@@ -616,6 +617,77 @@ class TestAuditor(unittest.TestCase):
         self.assertGreater(len(log_lines), 0)
         self.assertTrue(log_lines[0].index('ZBF - sda'))
 
+    def test_object_run_recon_cache(self):
+        ts = Timestamp(time.time())
+        data = 'test_data'
+
+        with self.disk_file.create() as writer:
+            writer.write(data)
+            metadata = {
+                'ETag': md5(data).hexdigest(),
+                'X-Timestamp': ts.normal,
+                'Content-Length': str(os.fstat(writer._fd).st_size),
+            }
+            writer.put(metadata)
+            writer.commit(ts)
+
+        # all devices
+        auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
+                                               self.rcache, self.devices)
+        auditor_worker.audit_all_objects()
+        with open(self.rcache) as fd:
+            actual_rcache = json.load(fd)
+        expected = {'object_auditor_stats_ALL':
+                    {'passes': 1, 'errors': 0, 'audit_time': mock.ANY,
+                     'start_time': mock.ANY, 'quarantined': 0,
+                     'bytes_processed': 9}}
+        with open(self.rcache) as fd:
+            actual_rcache = json.load(fd)
+        self.assertEqual(expected, actual_rcache)
+
+        auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
+                                               self.rcache, self.devices,
+                                               zero_byte_only_at_fps=50)
+        auditor_worker.audit_all_objects()
+        self.assertEqual(expected, actual_rcache)
+        with open(self.rcache) as fd:
+            actual_rcache = json.load(fd)
+        expected.update({
+            'object_auditor_stats_ZBF':
+            {'passes': 1, 'errors': 0, 'audit_time': mock.ANY,
+             'start_time': mock.ANY, 'quarantined': 0,
+             'bytes_processed': 0}})
+        self.assertEqual(expected, actual_rcache)
+
+        # specific devices
+        os.unlink(self.rcache)
+        auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
+                                               self.rcache, self.devices)
+        auditor_worker.audit_all_objects(device_dirs=['sda'])
+        with open(self.rcache) as fd:
+            actual_rcache = json.load(fd)
+        expected = {'object_auditor_stats_ALL':
+                    {'sda': {'passes': 1, 'errors': 0, 'audit_time': mock.ANY,
+                             'start_time': mock.ANY, 'quarantined': 0,
+                             'bytes_processed': 9}}}
+        with open(self.rcache) as fd:
+            actual_rcache = json.load(fd)
+        self.assertEqual(expected, actual_rcache)
+
+        auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
+                                               self.rcache, self.devices,
+                                               zero_byte_only_at_fps=50)
+        auditor_worker.audit_all_objects(device_dirs=['sda'])
+        self.assertEqual(expected, actual_rcache)
+        with open(self.rcache) as fd:
+            actual_rcache = json.load(fd)
+        expected.update({
+            'object_auditor_stats_ZBF':
+            {'sda': {'passes': 1, 'errors': 0, 'audit_time': mock.ANY,
+                     'start_time': mock.ANY, 'quarantined': 0,
+                     'bytes_processed': 0}}})
+        self.assertEqual(expected, actual_rcache)
+
     def test_object_run_once_no_sda(self):
         auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
                                                self.rcache, self.devices)
@@ -1237,6 +1309,53 @@ class TestAuditor(unittest.TestCase):
 
         self.assertEqual(sorted(forked_pids), [2, 1001])
 
+    def test_run_audit_once_zbfps(self):
+        my_auditor = auditor.ObjectAuditor(dict(devices=self.devices,
+                                                mount_check='false',
+                                                zero_byte_files_per_second=89,
+                                                concurrency=1,
+                                                recon_cache_path=self.testdir))
+
+        with mock.patch.object(my_auditor, '_sleep', lambda *a: None):
+            my_auditor.run_once(zero_byte_fps=50)
+
+        with open(self.rcache) as fd:
+            # there's no objects to audit so expect no stats; this assertion
+            # may change if https://bugs.launchpad.net/swift/+bug/1704858 is
+            # fixed
+            self.assertEqual({}, json.load(fd))
+
+        # check recon cache stays clean after a second run
+        with mock.patch.object(my_auditor, '_sleep', lambda *a: None):
+            my_auditor.run_once(zero_byte_fps=50)
+
+        with open(self.rcache) as fd:
+            self.assertEqual({}, json.load(fd))
+
+        ts = Timestamp(time.time())
+        with self.disk_file.create() as writer:
+            metadata = {
+                'ETag': md5('').hexdigest(),
+                'X-Timestamp': ts.normal,
+                'Content-Length': str(os.fstat(writer._fd).st_size),
+            }
+            writer.put(metadata)
+            writer.commit(ts)
+
+        # check recon cache stays clean after a second run
+        with mock.patch.object(my_auditor, '_sleep', lambda *a: None):
+            my_auditor.run_once(zero_byte_fps=50)
+        with open(self.rcache) as fd:
+            self.assertEqual({
+                'object_auditor_stats_ZBF': {
+                    'audit_time': 0,
+                    'bytes_processed': 0,
+                    'errors': 0,
+                    'passes': 1,
+                    'quarantined': 0,
+                    'start_time': mock.ANY}},
+                json.load(fd))
+
     def test_run_parallel_audit_once(self):
         my_auditor = auditor.ObjectAuditor(
             dict(devices=self.devices, mount_check='false',