Files
swift/test/unit/obj/test_auditor.py

1292 lines
54 KiB
Python
Raw Normal View History

# Copyright (c) 2010-2012 OpenStack Foundation
2010-07-12 17:03:45 -05:00
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from test import unit
2010-07-12 17:03:45 -05:00
import unittest
import mock
2010-12-16 16:20:57 -08:00
import os
import time
import string
2010-12-16 16:20:57 -08:00
from shutil import rmtree
from hashlib import md5
2011-01-19 14:18:37 -06:00
from tempfile import mkdtemp
import textwrap
from os.path import dirname, basename
from test.unit import (FakeLogger, patch_policies, make_timestamp_iter,
DEFAULT_TEST_EC_TYPE)
from swift.obj import auditor, replicator
from swift.obj.diskfile import (
DiskFile, write_metadata, invalidate_hash, get_data_dir,
DiskFileManager, ECDiskFileManager, AuditLocation, clear_auditor_status,
get_auditor_status, HASH_FILE, HASH_INVALIDATIONS_FILE)
from swift.common.utils import (
mkdirs, normalize_timestamp, Timestamp, readconf)
from swift.common.storage_policy import (
ECStoragePolicy, StoragePolicy, POLICIES, EC_POLICY)
from test.unit.obj.common import write_diskfile
2010-12-28 14:54:00 -08:00
_mocked_policies = [
StoragePolicy(0, 'zero', False),
StoragePolicy(1, 'one', True),
ECStoragePolicy(2, 'two', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=2, ec_nparity=1, ec_segment_size=4096),
]
def works_only_once(callable_thing, exception):
called = [False]
def only_once(*a, **kw):
if called[0]:
raise exception
else:
called[0] = True
return callable_thing(*a, **kw)
return only_once
@patch_policies(_mocked_policies)
2010-07-12 17:03:45 -05:00
class TestAuditor(unittest.TestCase):
2010-12-16 16:20:57 -08:00
def setUp(self):
2011-03-15 22:12:03 -07:00
self.testdir = os.path.join(mkdtemp(), 'tmp_test_object_auditor')
2010-12-16 16:20:57 -08:00
self.devices = os.path.join(self.testdir, 'node')
self.rcache = os.path.join(self.testdir, 'object.recon')
2011-03-15 22:12:03 -07:00
self.logger = FakeLogger()
2010-12-16 16:20:57 -08:00
rmtree(self.testdir, ignore_errors=1)
2011-03-15 22:12:03 -07:00
mkdirs(os.path.join(self.devices, 'sda'))
os.mkdir(os.path.join(self.devices, 'sdb'))
# policy 0
self.objects = os.path.join(self.devices, 'sda',
get_data_dir(POLICIES[0]))
self.objects_2 = os.path.join(self.devices, 'sdb',
get_data_dir(POLICIES[0]))
2010-12-16 16:20:57 -08:00
os.mkdir(self.objects)
# policy 1
self.objects_p1 = os.path.join(self.devices, 'sda',
get_data_dir(POLICIES[1]))
self.objects_2_p1 = os.path.join(self.devices, 'sdb',
get_data_dir(POLICIES[1]))
os.mkdir(self.objects_p1)
# policy 2
self.objects_p2 = os.path.join(self.devices, 'sda',
get_data_dir(POLICIES[2]))
self.objects_2_p2 = os.path.join(self.devices, 'sdb',
get_data_dir(POLICIES[2]))
os.mkdir(self.objects_p2)
self.parts = {}
self.parts_p1 = {}
self.parts_p2 = {}
2010-12-16 16:20:57 -08:00
for part in ['0', '1', '2', '3']:
self.parts[part] = os.path.join(self.objects, part)
self.parts_p1[part] = os.path.join(self.objects_p1, part)
self.parts_p2[part] = os.path.join(self.objects_p2, part)
2010-12-16 16:20:57 -08:00
os.mkdir(os.path.join(self.objects, part))
os.mkdir(os.path.join(self.objects_p1, part))
os.mkdir(os.path.join(self.objects_p2, part))
2010-12-16 16:20:57 -08:00
self.conf = dict(
devices=self.devices,
mount_check='false',
object_size_stats='10,100,1024,10240')
DiskFile API, with reference implementation Refactor on-disk knowledge out of the object server by pushing the async update pickle creation to the new DiskFileManager class (name is not the best, so suggestions welcome), along with the REPLICATOR method logic. We also move the mount checking and thread pool storage to the new ondisk.Devices object, which then also becomes the new home of the audit_location_generator method. For the object server, a new setup() method is now called at the end of the controller's construction, and the _diskfile() method has been renamed to get_diskfile(), to allow implementation specific behavior. We then hide the need for the REST API layer to know how and where quarantining needs to be performed. There are now two places it is checked internally, on open() where we verify the content-length, name, and x-timestamp metadata, and in the reader on close where the etag metadata is checked if the entire file was read. We add a reader class to allow implementations to isolate the WSGI handling code for that specific environment (it is used no-where else in the REST APIs). This simplifies the caller's code to just use a "with" statement once open to avoid multiple points where close needs to be called. For a full historical comparison, including the usage patterns see: https://gist.github.com/portante/5488238 (as of master, 2b639f5, Merge "Fix 500 from account-quota This Commit middleware") --------------------------------+------------------------------------ DiskFileManager(conf) Methods: .pickle_async_update() .get_diskfile() .get_hashes() Attributes: .devices .logger .disk_chunk_size .keep_cache_size .bytes_per_sync DiskFile(a,c,o,keep_data_fp=) DiskFile(a,c,o) Methods: Methods: *.__iter__() .close(verify_file=) .is_deleted() .is_expired() .quarantine() .get_data_file_size() .open() .read_metadata() .create() .create() .write_metadata() .delete() .delete() Attributes: Attributes: .quarantined_dir .keep_cache .metadata *DiskFileReader() Methods: .__iter__() .close() Attributes: +.was_quarantined DiskWriter() DiskFileWriter() Methods: Methods: .write() .write() .put() .put() * Note that the DiskFile class * Note that the DiskReader() object implements all the methods returned by the necessary for a WSGI app DiskFileOpened.reader() method iterator implements all the methods necessary for a WSGI app iterator + Note that if the auditor is refactored to not use the DiskFile class, see https://review.openstack.org/44787 then we don't need the was_quarantined attribute A reference "in-memory" object server implementation of a backend DiskFile class in swift/obj/mem_server.py and swift/obj/mem_diskfile.py. One can also reference https://github.com/portante/gluster-swift/commits/diskfile for the proposed integration with the gluster-swift code based on these changes. Change-Id: I44e153fdb405a5743e9c05349008f94136764916 Signed-off-by: Peter Portante <peter.portante@redhat.com>
2013-09-12 19:51:18 -04:00
self.df_mgr = DiskFileManager(self.conf, self.logger)
self.ec_df_mgr = ECDiskFileManager(self.conf, self.logger)
# diskfiles for policy 0, 1, 2
self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o',
policy=POLICIES[0])
self.disk_file_p1 = self.df_mgr.get_diskfile('sda', '0', 'a', 'c',
'o', policy=POLICIES[1])
self.disk_file_ec = self.ec_df_mgr.get_diskfile(
'sda', '0', 'a', 'c', 'o', policy=POLICIES[2], frag_index=1)
2010-12-16 16:20:57 -08:00
def tearDown(self):
2011-01-24 17:12:38 -08:00
rmtree(os.path.dirname(self.testdir), ignore_errors=1)
unit.xattr_data = {}
2010-12-16 16:20:57 -08:00
def test_worker_conf_parms(self):
def check_common_defaults():
self.assertEqual(auditor_worker.max_bytes_per_second, 10000000)
self.assertEqual(auditor_worker.log_time, 3600)
# test default values
conf = dict(
devices=self.devices,
mount_check='false',
object_size_stats='10,100,1024,10240')
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices)
check_common_defaults()
for policy in POLICIES:
mgr = auditor_worker.diskfile_router[policy]
self.assertEqual(mgr.disk_chunk_size, 65536)
self.assertEqual(auditor_worker.max_files_per_second, 20)
self.assertEqual(auditor_worker.zero_byte_only_at_fps, 0)
# test specified audit value overrides
conf.update({'disk_chunk_size': 4096})
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices,
zero_byte_only_at_fps=50)
check_common_defaults()
for policy in POLICIES:
mgr = auditor_worker.diskfile_router[policy]
self.assertEqual(mgr.disk_chunk_size, 4096)
self.assertEqual(auditor_worker.max_files_per_second, 50)
self.assertEqual(auditor_worker.zero_byte_only_at_fps, 50)
def test_object_audit_extra_data(self):
def run_tests(disk_file):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
data = b'0' * 1024
if disk_file.policy.policy_type == EC_POLICY:
data = disk_file.policy.pyeclib_driver.encode(data)[0]
etag = md5()
with disk_file.create() as writer:
writer.write(data)
etag.update(data)
etag = etag.hexdigest()
timestamp = str(normalize_timestamp(time.time()))
metadata = {
'ETag': etag,
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
writer.commit(Timestamp(timestamp))
pre_quarantines = auditor_worker.quarantines
2010-12-16 16:20:57 -08:00
auditor_worker.object_audit(
AuditLocation(disk_file._datadir, 'sda', '0',
policy=disk_file.policy))
self.assertEqual(auditor_worker.quarantines, pre_quarantines)
2010-12-16 16:20:57 -08:00
os.write(writer._fd, b'extra_data')
auditor_worker.object_audit(
AuditLocation(disk_file._datadir, 'sda', '0',
policy=disk_file.policy))
self.assertEqual(auditor_worker.quarantines,
pre_quarantines + 1)
run_tests(self.disk_file)
run_tests(self.disk_file_p1)
run_tests(self.disk_file_ec)
2010-12-16 16:20:57 -08:00
def test_object_audit_diff_data(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
data = b'0' * 1024
etag = md5()
timestamp = str(normalize_timestamp(time.time()))
with self.disk_file.create() as writer:
writer.write(data)
etag.update(data)
etag = etag.hexdigest()
metadata = {
'ETag': etag,
'X-Timestamp': timestamp,
DiskFile API, with reference implementation Refactor on-disk knowledge out of the object server by pushing the async update pickle creation to the new DiskFileManager class (name is not the best, so suggestions welcome), along with the REPLICATOR method logic. We also move the mount checking and thread pool storage to the new ondisk.Devices object, which then also becomes the new home of the audit_location_generator method. For the object server, a new setup() method is now called at the end of the controller's construction, and the _diskfile() method has been renamed to get_diskfile(), to allow implementation specific behavior. We then hide the need for the REST API layer to know how and where quarantining needs to be performed. There are now two places it is checked internally, on open() where we verify the content-length, name, and x-timestamp metadata, and in the reader on close where the etag metadata is checked if the entire file was read. We add a reader class to allow implementations to isolate the WSGI handling code for that specific environment (it is used no-where else in the REST APIs). This simplifies the caller's code to just use a "with" statement once open to avoid multiple points where close needs to be called. For a full historical comparison, including the usage patterns see: https://gist.github.com/portante/5488238 (as of master, 2b639f5, Merge "Fix 500 from account-quota This Commit middleware") --------------------------------+------------------------------------ DiskFileManager(conf) Methods: .pickle_async_update() .get_diskfile() .get_hashes() Attributes: .devices .logger .disk_chunk_size .keep_cache_size .bytes_per_sync DiskFile(a,c,o,keep_data_fp=) DiskFile(a,c,o) Methods: Methods: *.__iter__() .close(verify_file=) .is_deleted() .is_expired() .quarantine() .get_data_file_size() .open() .read_metadata() .create() .create() .write_metadata() .delete() .delete() Attributes: Attributes: .quarantined_dir .keep_cache .metadata *DiskFileReader() Methods: .__iter__() .close() Attributes: +.was_quarantined DiskWriter() DiskFileWriter() Methods: Methods: .write() .write() .put() .put() * Note that the DiskFile class * Note that the DiskReader() object implements all the methods returned by the necessary for a WSGI app DiskFileOpened.reader() method iterator implements all the methods necessary for a WSGI app iterator + Note that if the auditor is refactored to not use the DiskFile class, see https://review.openstack.org/44787 then we don't need the was_quarantined attribute A reference "in-memory" object server implementation of a backend DiskFile class in swift/obj/mem_server.py and swift/obj/mem_diskfile.py. One can also reference https://github.com/portante/gluster-swift/commits/diskfile for the proposed integration with the gluster-swift code based on these changes. Change-Id: I44e153fdb405a5743e9c05349008f94136764916 Signed-off-by: Peter Portante <peter.portante@redhat.com>
2013-09-12 19:51:18 -04:00
'Content-Length': str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
writer.commit(Timestamp(timestamp))
pre_quarantines = auditor_worker.quarantines
# remake so it will have metadata
self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o',
policy=POLICIES.legacy)
2010-12-16 16:20:57 -08:00
auditor_worker.object_audit(
AuditLocation(self.disk_file._datadir, 'sda', '0',
policy=POLICIES.legacy))
self.assertEqual(auditor_worker.quarantines, pre_quarantines)
etag = md5(b'1' + b'0' * 1023).hexdigest()
metadata['ETag'] = etag
with self.disk_file.create() as writer:
writer.write(data)
writer.put(metadata)
writer.commit(Timestamp(timestamp))
auditor_worker.object_audit(
AuditLocation(self.disk_file._datadir, 'sda', '0',
policy=POLICIES.legacy))
self.assertEqual(auditor_worker.quarantines, pre_quarantines + 1)
2010-07-12 17:03:45 -05:00
def test_object_audit_checks_EC_fragments(self):
disk_file = self.disk_file_ec
def do_test(data):
# create diskfile and set ETag and content-length to match the data
etag = md5(data).hexdigest()
timestamp = str(normalize_timestamp(time.time()))
with disk_file.create() as writer:
writer.write(data)
metadata = {
'ETag': etag,
'X-Timestamp': timestamp,
'Content-Length': len(data),
}
writer.put(metadata)
writer.commit(Timestamp(timestamp))
auditor_worker = auditor.AuditorWorker(self.conf, FakeLogger(),
self.rcache, self.devices)
self.assertEqual(0, auditor_worker.quarantines) # sanity check
auditor_worker.object_audit(
AuditLocation(disk_file._datadir, 'sda', '0',
policy=disk_file.policy))
return auditor_worker
# two good frags in an EC archive
frag_0 = disk_file.policy.pyeclib_driver.encode(
'x' * disk_file.policy.ec_segment_size)[0]
frag_1 = disk_file.policy.pyeclib_driver.encode(
'y' * disk_file.policy.ec_segment_size)[0]
data = frag_0 + frag_1
auditor_worker = do_test(data)
self.assertEqual(0, auditor_worker.quarantines)
self.assertFalse(auditor_worker.logger.get_lines_for_level('error'))
# corrupt second frag headers
corrupt_frag_1 = 'blah' * 16 + frag_1[64:]
data = frag_0 + corrupt_frag_1
auditor_worker = do_test(data)
self.assertEqual(1, auditor_worker.quarantines)
log_lines = auditor_worker.logger.get_lines_for_level('error')
self.assertIn('failed audit and was quarantined: '
'Invalid EC metadata at offset 0x%x' %
len(frag_0),
log_lines[0])
# dangling extra corrupt frag data
data = frag_0 + frag_1 + 'wtf' * 100
auditor_worker = do_test(data)
self.assertEqual(1, auditor_worker.quarantines)
log_lines = auditor_worker.logger.get_lines_for_level('error')
self.assertIn('failed audit and was quarantined: '
'Invalid EC metadata at offset 0x%x' %
len(frag_0 + frag_1),
log_lines[0])
# simulate bug https://bugs.launchpad.net/bugs/1631144 by writing start
# of an ssync subrequest into the diskfile
data = (
b'PUT /a/c/o\r\n' +
b'Content-Length: 999\r\n' +
b'Content-Type: image/jpeg\r\n' +
b'X-Object-Sysmeta-Ec-Content-Length: 1024\r\n' +
b'X-Object-Sysmeta-Ec-Etag: 1234bff7eb767cc6d19627c6b6f9edef\r\n' +
b'X-Object-Sysmeta-Ec-Frag-Index: 1\r\n' +
b'X-Object-Sysmeta-Ec-Scheme: ' + DEFAULT_TEST_EC_TYPE + '\r\n' +
b'X-Object-Sysmeta-Ec-Segment-Size: 1048576\r\n' +
b'X-Timestamp: 1471512345.17333\r\n\r\n'
)
data += frag_0[:disk_file.policy.fragment_size - len(data)]
auditor_worker = do_test(data)
self.assertEqual(1, auditor_worker.quarantines)
log_lines = auditor_worker.logger.get_lines_for_level('error')
self.assertIn('failed audit and was quarantined: '
'Invalid EC metadata at offset 0x0',
log_lines[0])
2010-12-28 14:54:00 -08:00
def test_object_audit_no_meta(self):
2011-01-24 17:12:38 -08:00
timestamp = str(normalize_timestamp(time.time()))
DiskFile API, with reference implementation Refactor on-disk knowledge out of the object server by pushing the async update pickle creation to the new DiskFileManager class (name is not the best, so suggestions welcome), along with the REPLICATOR method logic. We also move the mount checking and thread pool storage to the new ondisk.Devices object, which then also becomes the new home of the audit_location_generator method. For the object server, a new setup() method is now called at the end of the controller's construction, and the _diskfile() method has been renamed to get_diskfile(), to allow implementation specific behavior. We then hide the need for the REST API layer to know how and where quarantining needs to be performed. There are now two places it is checked internally, on open() where we verify the content-length, name, and x-timestamp metadata, and in the reader on close where the etag metadata is checked if the entire file was read. We add a reader class to allow implementations to isolate the WSGI handling code for that specific environment (it is used no-where else in the REST APIs). This simplifies the caller's code to just use a "with" statement once open to avoid multiple points where close needs to be called. For a full historical comparison, including the usage patterns see: https://gist.github.com/portante/5488238 (as of master, 2b639f5, Merge "Fix 500 from account-quota This Commit middleware") --------------------------------+------------------------------------ DiskFileManager(conf) Methods: .pickle_async_update() .get_diskfile() .get_hashes() Attributes: .devices .logger .disk_chunk_size .keep_cache_size .bytes_per_sync DiskFile(a,c,o,keep_data_fp=) DiskFile(a,c,o) Methods: Methods: *.__iter__() .close(verify_file=) .is_deleted() .is_expired() .quarantine() .get_data_file_size() .open() .read_metadata() .create() .create() .write_metadata() .delete() .delete() Attributes: Attributes: .quarantined_dir .keep_cache .metadata *DiskFileReader() Methods: .__iter__() .close() Attributes: +.was_quarantined DiskWriter() DiskFileWriter() Methods: Methods: .write() .write() .put() .put() * Note that the DiskFile class * Note that the DiskReader() object implements all the methods returned by the necessary for a WSGI app DiskFileOpened.reader() method iterator implements all the methods necessary for a WSGI app iterator + Note that if the auditor is refactored to not use the DiskFile class, see https://review.openstack.org/44787 then we don't need the was_quarantined attribute A reference "in-memory" object server implementation of a backend DiskFile class in swift/obj/mem_server.py and swift/obj/mem_diskfile.py. One can also reference https://github.com/portante/gluster-swift/commits/diskfile for the proposed integration with the gluster-swift code based on these changes. Change-Id: I44e153fdb405a5743e9c05349008f94136764916 Signed-off-by: Peter Portante <peter.portante@redhat.com>
2013-09-12 19:51:18 -04:00
path = os.path.join(self.disk_file._datadir, timestamp + '.data')
mkdirs(self.disk_file._datadir)
fp = open(path, 'wb')
fp.write(b'0' * 1024)
2011-01-24 17:12:38 -08:00
fp.close()
DiskFile API, with reference implementation Refactor on-disk knowledge out of the object server by pushing the async update pickle creation to the new DiskFileManager class (name is not the best, so suggestions welcome), along with the REPLICATOR method logic. We also move the mount checking and thread pool storage to the new ondisk.Devices object, which then also becomes the new home of the audit_location_generator method. For the object server, a new setup() method is now called at the end of the controller's construction, and the _diskfile() method has been renamed to get_diskfile(), to allow implementation specific behavior. We then hide the need for the REST API layer to know how and where quarantining needs to be performed. There are now two places it is checked internally, on open() where we verify the content-length, name, and x-timestamp metadata, and in the reader on close where the etag metadata is checked if the entire file was read. We add a reader class to allow implementations to isolate the WSGI handling code for that specific environment (it is used no-where else in the REST APIs). This simplifies the caller's code to just use a "with" statement once open to avoid multiple points where close needs to be called. For a full historical comparison, including the usage patterns see: https://gist.github.com/portante/5488238 (as of master, 2b639f5, Merge "Fix 500 from account-quota This Commit middleware") --------------------------------+------------------------------------ DiskFileManager(conf) Methods: .pickle_async_update() .get_diskfile() .get_hashes() Attributes: .devices .logger .disk_chunk_size .keep_cache_size .bytes_per_sync DiskFile(a,c,o,keep_data_fp=) DiskFile(a,c,o) Methods: Methods: *.__iter__() .close(verify_file=) .is_deleted() .is_expired() .quarantine() .get_data_file_size() .open() .read_metadata() .create() .create() .write_metadata() .delete() .delete() Attributes: Attributes: .quarantined_dir .keep_cache .metadata *DiskFileReader() Methods: .__iter__() .close() Attributes: +.was_quarantined DiskWriter() DiskFileWriter() Methods: Methods: .write() .write() .put() .put() * Note that the DiskFile class * Note that the DiskReader() object implements all the methods returned by the necessary for a WSGI app DiskFileOpened.reader() method iterator implements all the methods necessary for a WSGI app iterator + Note that if the auditor is refactored to not use the DiskFile class, see https://review.openstack.org/44787 then we don't need the was_quarantined attribute A reference "in-memory" object server implementation of a backend DiskFile class in swift/obj/mem_server.py and swift/obj/mem_diskfile.py. One can also reference https://github.com/portante/gluster-swift/commits/diskfile for the proposed integration with the gluster-swift code based on these changes. Change-Id: I44e153fdb405a5743e9c05349008f94136764916 Signed-off-by: Peter Portante <peter.portante@redhat.com>
2013-09-12 19:51:18 -04:00
invalidate_hash(os.path.dirname(self.disk_file._datadir))
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
pre_quarantines = auditor_worker.quarantines
auditor_worker.object_audit(
AuditLocation(self.disk_file._datadir, 'sda', '0',
policy=POLICIES.legacy))
self.assertEqual(auditor_worker.quarantines, pre_quarantines + 1)
2010-12-28 14:54:00 -08:00
def test_object_audit_will_not_swallow_errors_in_tests(self):
timestamp = str(normalize_timestamp(time.time()))
DiskFile API, with reference implementation Refactor on-disk knowledge out of the object server by pushing the async update pickle creation to the new DiskFileManager class (name is not the best, so suggestions welcome), along with the REPLICATOR method logic. We also move the mount checking and thread pool storage to the new ondisk.Devices object, which then also becomes the new home of the audit_location_generator method. For the object server, a new setup() method is now called at the end of the controller's construction, and the _diskfile() method has been renamed to get_diskfile(), to allow implementation specific behavior. We then hide the need for the REST API layer to know how and where quarantining needs to be performed. There are now two places it is checked internally, on open() where we verify the content-length, name, and x-timestamp metadata, and in the reader on close where the etag metadata is checked if the entire file was read. We add a reader class to allow implementations to isolate the WSGI handling code for that specific environment (it is used no-where else in the REST APIs). This simplifies the caller's code to just use a "with" statement once open to avoid multiple points where close needs to be called. For a full historical comparison, including the usage patterns see: https://gist.github.com/portante/5488238 (as of master, 2b639f5, Merge "Fix 500 from account-quota This Commit middleware") --------------------------------+------------------------------------ DiskFileManager(conf) Methods: .pickle_async_update() .get_diskfile() .get_hashes() Attributes: .devices .logger .disk_chunk_size .keep_cache_size .bytes_per_sync DiskFile(a,c,o,keep_data_fp=) DiskFile(a,c,o) Methods: Methods: *.__iter__() .close(verify_file=) .is_deleted() .is_expired() .quarantine() .get_data_file_size() .open() .read_metadata() .create() .create() .write_metadata() .delete() .delete() Attributes: Attributes: .quarantined_dir .keep_cache .metadata *DiskFileReader() Methods: .__iter__() .close() Attributes: +.was_quarantined DiskWriter() DiskFileWriter() Methods: Methods: .write() .write() .put() .put() * Note that the DiskFile class * Note that the DiskReader() object implements all the methods returned by the necessary for a WSGI app DiskFileOpened.reader() method iterator implements all the methods necessary for a WSGI app iterator + Note that if the auditor is refactored to not use the DiskFile class, see https://review.openstack.org/44787 then we don't need the was_quarantined attribute A reference "in-memory" object server implementation of a backend DiskFile class in swift/obj/mem_server.py and swift/obj/mem_diskfile.py. One can also reference https://github.com/portante/gluster-swift/commits/diskfile for the proposed integration with the gluster-swift code based on these changes. Change-Id: I44e153fdb405a5743e9c05349008f94136764916 Signed-off-by: Peter Portante <peter.portante@redhat.com>
2013-09-12 19:51:18 -04:00
path = os.path.join(self.disk_file._datadir, timestamp + '.data')
mkdirs(self.disk_file._datadir)
with open(path, 'w') as f:
write_metadata(f, {'name': '/a/c/o'})
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
def blowup(*args):
raise NameError('tpyo')
Alternate DiskFile constructor for efficient auditing. Before, to audit an object, the auditor: - calls listdir(object-hash-dir) - picks out the .data file from the listing - pulls out all N of its user.swift.metadata* xattrs - unpickles them - pulls out the value for 'name' - splits the name into a/c/o - then instantiates and opens a DiskFile(a, c, o), which does the following - joins a/c/o back into a name - hashes the name - calls listdir(object-hash-dir) (AGAIN) - picks out the .data file (and maybe .meta) from the listing (AGAIN) - pulls out all N of its user.swift.metadata* xattrs (AGAIN) - unpickles them (AGAIN) - starts reading object's contents off disk Now, the auditor simply locates the hash dir on the filesystem (saving one listdir) and then hands it off to DiskFileManager.get_diskfile_from_audit_location, which then instantiates a DiskFile in a way that lazy-loads the name later (saving one xattr reading). As part of this, DiskFile.open() will now quarantine a hash "directory" that's actually a file. Before, the audit location generator would skip those, but now they make it clear into DiskFile(). It's better to quarantine them anyway, as they're not doing any good the way they are. Also, removed the was_quarantined attribute on DiskFileReader. Now you can pass in a quarantine_hook callable to DiskFile.reader() that gets called if the file was quarantined. Default is to log quarantines, but otherwise do nothing. Change-Id: I04fc14569982a17fcc89e00832725ae71009335a
2013-10-28 14:57:18 -07:00
with mock.patch.object(DiskFileManager,
'get_diskfile_from_audit_location', blowup):
self.assertRaises(NameError, auditor_worker.object_audit,
AuditLocation(os.path.dirname(path), 'sda', '0',
policy=POLICIES.legacy))
def test_failsafe_object_audit_will_swallow_errors_in_tests(self):
timestamp = str(normalize_timestamp(time.time()))
DiskFile API, with reference implementation Refactor on-disk knowledge out of the object server by pushing the async update pickle creation to the new DiskFileManager class (name is not the best, so suggestions welcome), along with the REPLICATOR method logic. We also move the mount checking and thread pool storage to the new ondisk.Devices object, which then also becomes the new home of the audit_location_generator method. For the object server, a new setup() method is now called at the end of the controller's construction, and the _diskfile() method has been renamed to get_diskfile(), to allow implementation specific behavior. We then hide the need for the REST API layer to know how and where quarantining needs to be performed. There are now two places it is checked internally, on open() where we verify the content-length, name, and x-timestamp metadata, and in the reader on close where the etag metadata is checked if the entire file was read. We add a reader class to allow implementations to isolate the WSGI handling code for that specific environment (it is used no-where else in the REST APIs). This simplifies the caller's code to just use a "with" statement once open to avoid multiple points where close needs to be called. For a full historical comparison, including the usage patterns see: https://gist.github.com/portante/5488238 (as of master, 2b639f5, Merge "Fix 500 from account-quota This Commit middleware") --------------------------------+------------------------------------ DiskFileManager(conf) Methods: .pickle_async_update() .get_diskfile() .get_hashes() Attributes: .devices .logger .disk_chunk_size .keep_cache_size .bytes_per_sync DiskFile(a,c,o,keep_data_fp=) DiskFile(a,c,o) Methods: Methods: *.__iter__() .close(verify_file=) .is_deleted() .is_expired() .quarantine() .get_data_file_size() .open() .read_metadata() .create() .create() .write_metadata() .delete() .delete() Attributes: Attributes: .quarantined_dir .keep_cache .metadata *DiskFileReader() Methods: .__iter__() .close() Attributes: +.was_quarantined DiskWriter() DiskFileWriter() Methods: Methods: .write() .write() .put() .put() * Note that the DiskFile class * Note that the DiskReader() object implements all the methods returned by the necessary for a WSGI app DiskFileOpened.reader() method iterator implements all the methods necessary for a WSGI app iterator + Note that if the auditor is refactored to not use the DiskFile class, see https://review.openstack.org/44787 then we don't need the was_quarantined attribute A reference "in-memory" object server implementation of a backend DiskFile class in swift/obj/mem_server.py and swift/obj/mem_diskfile.py. One can also reference https://github.com/portante/gluster-swift/commits/diskfile for the proposed integration with the gluster-swift code based on these changes. Change-Id: I44e153fdb405a5743e9c05349008f94136764916 Signed-off-by: Peter Portante <peter.portante@redhat.com>
2013-09-12 19:51:18 -04:00
path = os.path.join(self.disk_file._datadir, timestamp + '.data')
mkdirs(self.disk_file._datadir)
with open(path, 'w') as f:
write_metadata(f, {'name': '/a/c/o'})
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
def blowup(*args):
raise NameError('tpyo')
with mock.patch('swift.obj.diskfile.DiskFileManager.diskfile_cls',
blowup):
Alternate DiskFile constructor for efficient auditing. Before, to audit an object, the auditor: - calls listdir(object-hash-dir) - picks out the .data file from the listing - pulls out all N of its user.swift.metadata* xattrs - unpickles them - pulls out the value for 'name' - splits the name into a/c/o - then instantiates and opens a DiskFile(a, c, o), which does the following - joins a/c/o back into a name - hashes the name - calls listdir(object-hash-dir) (AGAIN) - picks out the .data file (and maybe .meta) from the listing (AGAIN) - pulls out all N of its user.swift.metadata* xattrs (AGAIN) - unpickles them (AGAIN) - starts reading object's contents off disk Now, the auditor simply locates the hash dir on the filesystem (saving one listdir) and then hands it off to DiskFileManager.get_diskfile_from_audit_location, which then instantiates a DiskFile in a way that lazy-loads the name later (saving one xattr reading). As part of this, DiskFile.open() will now quarantine a hash "directory" that's actually a file. Before, the audit location generator would skip those, but now they make it clear into DiskFile(). It's better to quarantine them anyway, as they're not doing any good the way they are. Also, removed the was_quarantined attribute on DiskFileReader. Now you can pass in a quarantine_hook callable to DiskFile.reader() that gets called if the file was quarantined. Default is to log quarantines, but otherwise do nothing. Change-Id: I04fc14569982a17fcc89e00832725ae71009335a
2013-10-28 14:57:18 -07:00
auditor_worker.failsafe_object_audit(
AuditLocation(os.path.dirname(path), 'sda', '0',
policy=POLICIES.legacy))
self.assertEqual(auditor_worker.errors, 1)
def test_audit_location_gets_quarantined(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
location = AuditLocation(self.disk_file._datadir, 'sda', '0',
policy=self.disk_file.policy)
# instead of a datadir, we'll make a file!
mkdirs(os.path.dirname(self.disk_file._datadir))
open(self.disk_file._datadir, 'w')
# after we turn the crank ...
auditor_worker.object_audit(location)
# ... it should get quarantined
self.assertFalse(os.path.exists(self.disk_file._datadir))
self.assertEqual(1, auditor_worker.quarantines)
def test_rsync_tempfile_timeout_auto_option(self):
# if we don't have access to the replicator config section we'll use
# our default
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 86400)
# if the rsync_tempfile_timeout option is set explicitly we use that
self.conf['rsync_tempfile_timeout'] = '1800'
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 1800)
# if we have a real config we can be a little smarter
config_path = os.path.join(self.testdir, 'objserver.conf')
stub_config = """
[object-auditor]
rsync_tempfile_timeout = auto
"""
with open(config_path, 'w') as f:
f.write(textwrap.dedent(stub_config))
# the Daemon loader will hand the object-auditor config to the
# auditor who will build the workers from it
conf = readconf(config_path, 'object-auditor')
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices)
# if there is no object-replicator section we still have to fall back
# to default because we can't parse the config for that section!
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 86400)
stub_config = """
[object-replicator]
[object-auditor]
rsync_tempfile_timeout = auto
"""
with open(config_path, 'w') as f:
f.write(textwrap.dedent(stub_config))
conf = readconf(config_path, 'object-auditor')
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices)
# if the object-replicator section will parse but does not override
# the default rsync_timeout we assume the default rsync_timeout value
# and add 15mins
self.assertEqual(auditor_worker.rsync_tempfile_timeout,
replicator.DEFAULT_RSYNC_TIMEOUT + 900)
stub_config = """
[DEFAULT]
reclaim_age = 1209600
[object-replicator]
rsync_timeout = 3600
[object-auditor]
rsync_tempfile_timeout = auto
"""
with open(config_path, 'w') as f:
f.write(textwrap.dedent(stub_config))
conf = readconf(config_path, 'object-auditor')
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices)
# if there is an object-replicator section with a rsync_timeout
# configured we'll use that value (3600) + 900
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 3600 + 900)
def test_inprogress_rsync_tempfiles_get_cleaned_up(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
location = AuditLocation(self.disk_file._datadir, 'sda', '0',
policy=self.disk_file.policy)
data = b'VERIFY'
etag = md5()
timestamp = str(normalize_timestamp(time.time()))
with self.disk_file.create() as writer:
writer.write(data)
etag.update(data)
metadata = {
'ETag': etag.hexdigest(),
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
writer.commit(Timestamp(timestamp))
datafilename = None
datadir_files = os.listdir(self.disk_file._datadir)
for filename in datadir_files:
if filename.endswith('.data'):
datafilename = filename
break
else:
self.fail('Did not find .data file in %r: %r' %
(self.disk_file._datadir, datadir_files))
rsynctempfile_path = os.path.join(self.disk_file._datadir,
'.%s.9ILVBL' % datafilename)
open(rsynctempfile_path, 'w')
# sanity check we have an extra file
rsync_files = os.listdir(self.disk_file._datadir)
self.assertEqual(len(datadir_files) + 1, len(rsync_files))
# and after we turn the crank ...
auditor_worker.object_audit(location)
# ... we've still got the rsync file
self.assertEqual(rsync_files, os.listdir(self.disk_file._datadir))
# and we'll keep it - depending on the rsync_tempfile_timeout
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 86400)
self.conf['rsync_tempfile_timeout'] = '3600'
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 3600)
now = time.time() + 1900
with mock.patch('swift.obj.auditor.time.time',
return_value=now):
auditor_worker.object_audit(location)
self.assertEqual(rsync_files, os.listdir(self.disk_file._datadir))
# but *tomorrow* when we run
tomorrow = time.time() + 86400
with mock.patch('swift.obj.auditor.time.time',
return_value=tomorrow):
auditor_worker.object_audit(location)
# ... we'll totally clean that stuff up!
self.assertEqual(datadir_files, os.listdir(self.disk_file._datadir))
# but if we have some random crazy file in there
random_crazy_file_path = os.path.join(self.disk_file._datadir,
'.random.crazy.file')
open(random_crazy_file_path, 'w')
tomorrow = time.time() + 86400
with mock.patch('swift.obj.auditor.time.time',
return_value=tomorrow):
auditor_worker.object_audit(location)
# that's someone elses problem
self.assertIn(os.path.basename(random_crazy_file_path),
os.listdir(self.disk_file._datadir))
def test_generic_exception_handling(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
# pretend that we logged (and reset counters) just now
auditor_worker.last_logged = time.time()
timestamp = str(normalize_timestamp(time.time()))
pre_errors = auditor_worker.errors
data = b'0' * 1024
etag = md5()
with self.disk_file.create() as writer:
writer.write(data)
etag.update(data)
etag = etag.hexdigest()
metadata = {
'ETag': etag,
'X-Timestamp': timestamp,
DiskFile API, with reference implementation Refactor on-disk knowledge out of the object server by pushing the async update pickle creation to the new DiskFileManager class (name is not the best, so suggestions welcome), along with the REPLICATOR method logic. We also move the mount checking and thread pool storage to the new ondisk.Devices object, which then also becomes the new home of the audit_location_generator method. For the object server, a new setup() method is now called at the end of the controller's construction, and the _diskfile() method has been renamed to get_diskfile(), to allow implementation specific behavior. We then hide the need for the REST API layer to know how and where quarantining needs to be performed. There are now two places it is checked internally, on open() where we verify the content-length, name, and x-timestamp metadata, and in the reader on close where the etag metadata is checked if the entire file was read. We add a reader class to allow implementations to isolate the WSGI handling code for that specific environment (it is used no-where else in the REST APIs). This simplifies the caller's code to just use a "with" statement once open to avoid multiple points where close needs to be called. For a full historical comparison, including the usage patterns see: https://gist.github.com/portante/5488238 (as of master, 2b639f5, Merge "Fix 500 from account-quota This Commit middleware") --------------------------------+------------------------------------ DiskFileManager(conf) Methods: .pickle_async_update() .get_diskfile() .get_hashes() Attributes: .devices .logger .disk_chunk_size .keep_cache_size .bytes_per_sync DiskFile(a,c,o,keep_data_fp=) DiskFile(a,c,o) Methods: Methods: *.__iter__() .close(verify_file=) .is_deleted() .is_expired() .quarantine() .get_data_file_size() .open() .read_metadata() .create() .create() .write_metadata() .delete() .delete() Attributes: Attributes: .quarantined_dir .keep_cache .metadata *DiskFileReader() Methods: .__iter__() .close() Attributes: +.was_quarantined DiskWriter() DiskFileWriter() Methods: Methods: .write() .write() .put() .put() * Note that the DiskFile class * Note that the DiskReader() object implements all the methods returned by the necessary for a WSGI app DiskFileOpened.reader() method iterator implements all the methods necessary for a WSGI app iterator + Note that if the auditor is refactored to not use the DiskFile class, see https://review.openstack.org/44787 then we don't need the was_quarantined attribute A reference "in-memory" object server implementation of a backend DiskFile class in swift/obj/mem_server.py and swift/obj/mem_diskfile.py. One can also reference https://github.com/portante/gluster-swift/commits/diskfile for the proposed integration with the gluster-swift code based on these changes. Change-Id: I44e153fdb405a5743e9c05349008f94136764916 Signed-off-by: Peter Portante <peter.portante@redhat.com>
2013-09-12 19:51:18 -04:00
'Content-Length': str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
writer.commit(Timestamp(timestamp))
with mock.patch('swift.obj.diskfile.DiskFileManager.diskfile_cls',
lambda *_: 1 / 0):
auditor_worker.audit_all_objects()
self.assertEqual(auditor_worker.errors, pre_errors + 1)
2010-12-28 14:54:00 -08:00
def test_object_run_once_pass(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
auditor_worker.log_time = 0
timestamp = str(normalize_timestamp(time.time()))
pre_quarantines = auditor_worker.quarantines
data = b'0' * 1024
def write_file(df):
with df.create() as writer:
writer.write(data)
metadata = {
'ETag': md5(data).hexdigest(),
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
writer.commit(Timestamp(timestamp))
# policy 0
write_file(self.disk_file)
# policy 1
write_file(self.disk_file_p1)
# policy 2
write_file(self.disk_file_ec)
auditor_worker.audit_all_objects()
self.assertEqual(auditor_worker.quarantines, pre_quarantines)
# 1 object per policy falls into 1024 bucket
self.assertEqual(auditor_worker.stats_buckets[1024], 3)
self.assertEqual(auditor_worker.stats_buckets[10240], 0)
# pick up some additional code coverage, large file
data = b'0' * 1024 * 1024
for df in (self.disk_file, self.disk_file_ec):
with df.create() as writer:
writer.write(data)
metadata = {
'ETag': md5(data).hexdigest(),
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
writer.commit(Timestamp(timestamp))
auditor_worker.audit_all_objects(device_dirs=['sda', 'sdb'])
self.assertEqual(auditor_worker.quarantines, pre_quarantines)
# still have the 1024 byte object left in policy-1 (plus the
# stats from the original 3)
self.assertEqual(auditor_worker.stats_buckets[1024], 4)
self.assertEqual(auditor_worker.stats_buckets[10240], 0)
# and then policy-0 disk_file was re-written as a larger object
self.assertEqual(auditor_worker.stats_buckets['OVER'], 2)
# pick up even more additional code coverage, misc paths
auditor_worker.log_time = -1
auditor_worker.stats_sizes = []
auditor_worker.audit_all_objects(device_dirs=['sda', 'sdb'])
self.assertEqual(auditor_worker.quarantines, pre_quarantines)
self.assertEqual(auditor_worker.stats_buckets[1024], 4)
self.assertEqual(auditor_worker.stats_buckets[10240], 0)
self.assertEqual(auditor_worker.stats_buckets['OVER'], 2)
def test_object_run_logging(self):
logger = FakeLogger()
auditor_worker = auditor.AuditorWorker(self.conf, logger,
self.rcache, self.devices)
auditor_worker.audit_all_objects(device_dirs=['sda'])
log_lines = logger.get_lines_for_level('info')
self.assertTrue(len(log_lines) > 0)
self.assertTrue(log_lines[0].index('ALL - parallel, sda'))
logger = FakeLogger()
auditor_worker = auditor.AuditorWorker(self.conf, logger,
self.rcache, self.devices,
zero_byte_only_at_fps=50)
auditor_worker.audit_all_objects(device_dirs=['sda'])
log_lines = logger.get_lines_for_level('info')
self.assertTrue(len(log_lines) > 0)
self.assertTrue(log_lines[0].index('ZBF - sda'))
2010-12-28 14:54:00 -08:00
def test_object_run_once_no_sda(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
timestamp = str(normalize_timestamp(time.time()))
pre_quarantines = auditor_worker.quarantines
# pretend that we logged (and reset counters) just now
auditor_worker.last_logged = time.time()
data = b'0' * 1024
etag = md5()
with self.disk_file.create() as writer:
writer.write(data)
etag.update(data)
etag = etag.hexdigest()
metadata = {
'ETag': etag,
'X-Timestamp': timestamp,
DiskFile API, with reference implementation Refactor on-disk knowledge out of the object server by pushing the async update pickle creation to the new DiskFileManager class (name is not the best, so suggestions welcome), along with the REPLICATOR method logic. We also move the mount checking and thread pool storage to the new ondisk.Devices object, which then also becomes the new home of the audit_location_generator method. For the object server, a new setup() method is now called at the end of the controller's construction, and the _diskfile() method has been renamed to get_diskfile(), to allow implementation specific behavior. We then hide the need for the REST API layer to know how and where quarantining needs to be performed. There are now two places it is checked internally, on open() where we verify the content-length, name, and x-timestamp metadata, and in the reader on close where the etag metadata is checked if the entire file was read. We add a reader class to allow implementations to isolate the WSGI handling code for that specific environment (it is used no-where else in the REST APIs). This simplifies the caller's code to just use a "with" statement once open to avoid multiple points where close needs to be called. For a full historical comparison, including the usage patterns see: https://gist.github.com/portante/5488238 (as of master, 2b639f5, Merge "Fix 500 from account-quota This Commit middleware") --------------------------------+------------------------------------ DiskFileManager(conf) Methods: .pickle_async_update() .get_diskfile() .get_hashes() Attributes: .devices .logger .disk_chunk_size .keep_cache_size .bytes_per_sync DiskFile(a,c,o,keep_data_fp=) DiskFile(a,c,o) Methods: Methods: *.__iter__() .close(verify_file=) .is_deleted() .is_expired() .quarantine() .get_data_file_size() .open() .read_metadata() .create() .create() .write_metadata() .delete() .delete() Attributes: Attributes: .quarantined_dir .keep_cache .metadata *DiskFileReader() Methods: .__iter__() .close() Attributes: +.was_quarantined DiskWriter() DiskFileWriter() Methods: Methods: .write() .write() .put() .put() * Note that the DiskFile class * Note that the DiskReader() object implements all the methods returned by the necessary for a WSGI app DiskFileOpened.reader() method iterator implements all the methods necessary for a WSGI app iterator + Note that if the auditor is refactored to not use the DiskFile class, see https://review.openstack.org/44787 then we don't need the was_quarantined attribute A reference "in-memory" object server implementation of a backend DiskFile class in swift/obj/mem_server.py and swift/obj/mem_diskfile.py. One can also reference https://github.com/portante/gluster-swift/commits/diskfile for the proposed integration with the gluster-swift code based on these changes. Change-Id: I44e153fdb405a5743e9c05349008f94136764916 Signed-off-by: Peter Portante <peter.portante@redhat.com>
2013-09-12 19:51:18 -04:00
'Content-Length': str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
os.write(writer._fd, b'extra_data')
writer.commit(Timestamp(timestamp))
auditor_worker.audit_all_objects()
self.assertEqual(auditor_worker.quarantines, pre_quarantines + 1)
2010-12-28 14:54:00 -08:00
def test_object_run_once_multi_devices(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
# pretend that we logged (and reset counters) just now
auditor_worker.last_logged = time.time()
2010-12-28 14:54:00 -08:00
timestamp = str(normalize_timestamp(time.time()))
pre_quarantines = auditor_worker.quarantines
data = b'0' * 10
2010-12-28 14:54:00 -08:00
etag = md5()
with self.disk_file.create() as writer:
writer.write(data)
2010-12-28 14:54:00 -08:00
etag.update(data)
etag = etag.hexdigest()
metadata = {
'ETag': etag,
'X-Timestamp': timestamp,
DiskFile API, with reference implementation Refactor on-disk knowledge out of the object server by pushing the async update pickle creation to the new DiskFileManager class (name is not the best, so suggestions welcome), along with the REPLICATOR method logic. We also move the mount checking and thread pool storage to the new ondisk.Devices object, which then also becomes the new home of the audit_location_generator method. For the object server, a new setup() method is now called at the end of the controller's construction, and the _diskfile() method has been renamed to get_diskfile(), to allow implementation specific behavior. We then hide the need for the REST API layer to know how and where quarantining needs to be performed. There are now two places it is checked internally, on open() where we verify the content-length, name, and x-timestamp metadata, and in the reader on close where the etag metadata is checked if the entire file was read. We add a reader class to allow implementations to isolate the WSGI handling code for that specific environment (it is used no-where else in the REST APIs). This simplifies the caller's code to just use a "with" statement once open to avoid multiple points where close needs to be called. For a full historical comparison, including the usage patterns see: https://gist.github.com/portante/5488238 (as of master, 2b639f5, Merge "Fix 500 from account-quota This Commit middleware") --------------------------------+------------------------------------ DiskFileManager(conf) Methods: .pickle_async_update() .get_diskfile() .get_hashes() Attributes: .devices .logger .disk_chunk_size .keep_cache_size .bytes_per_sync DiskFile(a,c,o,keep_data_fp=) DiskFile(a,c,o) Methods: Methods: *.__iter__() .close(verify_file=) .is_deleted() .is_expired() .quarantine() .get_data_file_size() .open() .read_metadata() .create() .create() .write_metadata() .delete() .delete() Attributes: Attributes: .quarantined_dir .keep_cache .metadata *DiskFileReader() Methods: .__iter__() .close() Attributes: +.was_quarantined DiskWriter() DiskFileWriter() Methods: Methods: .write() .write() .put() .put() * Note that the DiskFile class * Note that the DiskReader() object implements all the methods returned by the necessary for a WSGI app DiskFileOpened.reader() method iterator implements all the methods necessary for a WSGI app iterator + Note that if the auditor is refactored to not use the DiskFile class, see https://review.openstack.org/44787 then we don't need the was_quarantined attribute A reference "in-memory" object server implementation of a backend DiskFile class in swift/obj/mem_server.py and swift/obj/mem_diskfile.py. One can also reference https://github.com/portante/gluster-swift/commits/diskfile for the proposed integration with the gluster-swift code based on these changes. Change-Id: I44e153fdb405a5743e9c05349008f94136764916 Signed-off-by: Peter Portante <peter.portante@redhat.com>
2013-09-12 19:51:18 -04:00
'Content-Length': str(os.fstat(writer._fd).st_size),
2010-12-28 14:54:00 -08:00
}
writer.put(metadata)
writer.commit(Timestamp(timestamp))
auditor_worker.audit_all_objects()
self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'ob',
policy=POLICIES.legacy)
data = b'1' * 10
2010-12-28 14:54:00 -08:00
etag = md5()
with self.disk_file.create() as writer:
writer.write(data)
2010-12-28 14:54:00 -08:00
etag.update(data)
etag = etag.hexdigest()
metadata = {
'ETag': etag,
'X-Timestamp': timestamp,
DiskFile API, with reference implementation Refactor on-disk knowledge out of the object server by pushing the async update pickle creation to the new DiskFileManager class (name is not the best, so suggestions welcome), along with the REPLICATOR method logic. We also move the mount checking and thread pool storage to the new ondisk.Devices object, which then also becomes the new home of the audit_location_generator method. For the object server, a new setup() method is now called at the end of the controller's construction, and the _diskfile() method has been renamed to get_diskfile(), to allow implementation specific behavior. We then hide the need for the REST API layer to know how and where quarantining needs to be performed. There are now two places it is checked internally, on open() where we verify the content-length, name, and x-timestamp metadata, and in the reader on close where the etag metadata is checked if the entire file was read. We add a reader class to allow implementations to isolate the WSGI handling code for that specific environment (it is used no-where else in the REST APIs). This simplifies the caller's code to just use a "with" statement once open to avoid multiple points where close needs to be called. For a full historical comparison, including the usage patterns see: https://gist.github.com/portante/5488238 (as of master, 2b639f5, Merge "Fix 500 from account-quota This Commit middleware") --------------------------------+------------------------------------ DiskFileManager(conf) Methods: .pickle_async_update() .get_diskfile() .get_hashes() Attributes: .devices .logger .disk_chunk_size .keep_cache_size .bytes_per_sync DiskFile(a,c,o,keep_data_fp=) DiskFile(a,c,o) Methods: Methods: *.__iter__() .close(verify_file=) .is_deleted() .is_expired() .quarantine() .get_data_file_size() .open() .read_metadata() .create() .create() .write_metadata() .delete() .delete() Attributes: Attributes: .quarantined_dir .keep_cache .metadata *DiskFileReader() Methods: .__iter__() .close() Attributes: +.was_quarantined DiskWriter() DiskFileWriter() Methods: Methods: .write() .write() .put() .put() * Note that the DiskFile class * Note that the DiskReader() object implements all the methods returned by the necessary for a WSGI app DiskFileOpened.reader() method iterator implements all the methods necessary for a WSGI app iterator + Note that if the auditor is refactored to not use the DiskFile class, see https://review.openstack.org/44787 then we don't need the was_quarantined attribute A reference "in-memory" object server implementation of a backend DiskFile class in swift/obj/mem_server.py and swift/obj/mem_diskfile.py. One can also reference https://github.com/portante/gluster-swift/commits/diskfile for the proposed integration with the gluster-swift code based on these changes. Change-Id: I44e153fdb405a5743e9c05349008f94136764916 Signed-off-by: Peter Portante <peter.portante@redhat.com>
2013-09-12 19:51:18 -04:00
'Content-Length': str(os.fstat(writer._fd).st_size),
2010-12-28 14:54:00 -08:00
}
writer.put(metadata)
writer.commit(Timestamp(timestamp))
os.write(writer._fd, b'extra_data')
auditor_worker.audit_all_objects()
self.assertEqual(auditor_worker.quarantines, pre_quarantines + 1)
2010-12-28 14:54:00 -08:00
def test_object_run_fast_track_non_zero(self):
self.auditor = auditor.ObjectAuditor(self.conf)
self.auditor.log_time = 0
data = b'0' * 1024
etag = md5()
with self.disk_file.create() as writer:
writer.write(data)
etag.update(data)
etag = etag.hexdigest()
timestamp = str(normalize_timestamp(time.time()))
metadata = {
'ETag': etag,
'X-Timestamp': timestamp,
DiskFile API, with reference implementation Refactor on-disk knowledge out of the object server by pushing the async update pickle creation to the new DiskFileManager class (name is not the best, so suggestions welcome), along with the REPLICATOR method logic. We also move the mount checking and thread pool storage to the new ondisk.Devices object, which then also becomes the new home of the audit_location_generator method. For the object server, a new setup() method is now called at the end of the controller's construction, and the _diskfile() method has been renamed to get_diskfile(), to allow implementation specific behavior. We then hide the need for the REST API layer to know how and where quarantining needs to be performed. There are now two places it is checked internally, on open() where we verify the content-length, name, and x-timestamp metadata, and in the reader on close where the etag metadata is checked if the entire file was read. We add a reader class to allow implementations to isolate the WSGI handling code for that specific environment (it is used no-where else in the REST APIs). This simplifies the caller's code to just use a "with" statement once open to avoid multiple points where close needs to be called. For a full historical comparison, including the usage patterns see: https://gist.github.com/portante/5488238 (as of master, 2b639f5, Merge "Fix 500 from account-quota This Commit middleware") --------------------------------+------------------------------------ DiskFileManager(conf) Methods: .pickle_async_update() .get_diskfile() .get_hashes() Attributes: .devices .logger .disk_chunk_size .keep_cache_size .bytes_per_sync DiskFile(a,c,o,keep_data_fp=) DiskFile(a,c,o) Methods: Methods: *.__iter__() .close(verify_file=) .is_deleted() .is_expired() .quarantine() .get_data_file_size() .open() .read_metadata() .create() .create() .write_metadata() .delete() .delete() Attributes: Attributes: .quarantined_dir .keep_cache .metadata *DiskFileReader() Methods: .__iter__() .close() Attributes: +.was_quarantined DiskWriter() DiskFileWriter() Methods: Methods: .write() .write() .put() .put() * Note that the DiskFile class * Note that the DiskReader() object implements all the methods returned by the necessary for a WSGI app DiskFileOpened.reader() method iterator implements all the methods necessary for a WSGI app iterator + Note that if the auditor is refactored to not use the DiskFile class, see https://review.openstack.org/44787 then we don't need the was_quarantined attribute A reference "in-memory" object server implementation of a backend DiskFile class in swift/obj/mem_server.py and swift/obj/mem_diskfile.py. One can also reference https://github.com/portante/gluster-swift/commits/diskfile for the proposed integration with the gluster-swift code based on these changes. Change-Id: I44e153fdb405a5743e9c05349008f94136764916 Signed-off-by: Peter Portante <peter.portante@redhat.com>
2013-09-12 19:51:18 -04:00
'Content-Length': str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
writer.commit(Timestamp(timestamp))
etag = md5()
etag.update(b'1' + b'0' * 1023)
etag = etag.hexdigest()
metadata['ETag'] = etag
DiskFile API, with reference implementation Refactor on-disk knowledge out of the object server by pushing the async update pickle creation to the new DiskFileManager class (name is not the best, so suggestions welcome), along with the REPLICATOR method logic. We also move the mount checking and thread pool storage to the new ondisk.Devices object, which then also becomes the new home of the audit_location_generator method. For the object server, a new setup() method is now called at the end of the controller's construction, and the _diskfile() method has been renamed to get_diskfile(), to allow implementation specific behavior. We then hide the need for the REST API layer to know how and where quarantining needs to be performed. There are now two places it is checked internally, on open() where we verify the content-length, name, and x-timestamp metadata, and in the reader on close where the etag metadata is checked if the entire file was read. We add a reader class to allow implementations to isolate the WSGI handling code for that specific environment (it is used no-where else in the REST APIs). This simplifies the caller's code to just use a "with" statement once open to avoid multiple points where close needs to be called. For a full historical comparison, including the usage patterns see: https://gist.github.com/portante/5488238 (as of master, 2b639f5, Merge "Fix 500 from account-quota This Commit middleware") --------------------------------+------------------------------------ DiskFileManager(conf) Methods: .pickle_async_update() .get_diskfile() .get_hashes() Attributes: .devices .logger .disk_chunk_size .keep_cache_size .bytes_per_sync DiskFile(a,c,o,keep_data_fp=) DiskFile(a,c,o) Methods: Methods: *.__iter__() .close(verify_file=) .is_deleted() .is_expired() .quarantine() .get_data_file_size() .open() .read_metadata() .create() .create() .write_metadata() .delete() .delete() Attributes: Attributes: .quarantined_dir .keep_cache .metadata *DiskFileReader() Methods: .__iter__() .close() Attributes: +.was_quarantined DiskWriter() DiskFileWriter() Methods: Methods: .write() .write() .put() .put() * Note that the DiskFile class * Note that the DiskReader() object implements all the methods returned by the necessary for a WSGI app DiskFileOpened.reader() method iterator implements all the methods necessary for a WSGI app iterator + Note that if the auditor is refactored to not use the DiskFile class, see https://review.openstack.org/44787 then we don't need the was_quarantined attribute A reference "in-memory" object server implementation of a backend DiskFile class in swift/obj/mem_server.py and swift/obj/mem_diskfile.py. One can also reference https://github.com/portante/gluster-swift/commits/diskfile for the proposed integration with the gluster-swift code based on these changes. Change-Id: I44e153fdb405a5743e9c05349008f94136764916 Signed-off-by: Peter Portante <peter.portante@redhat.com>
2013-09-12 19:51:18 -04:00
write_metadata(writer._fd, metadata)
quarantine_path = os.path.join(self.devices,
'sda', 'quarantined', 'objects')
kwargs = {'mode': 'once'}
kwargs['zero_byte_fps'] = 50
self.auditor.run_audit(**kwargs)
self.assertFalse(os.path.isdir(quarantine_path))
del(kwargs['zero_byte_fps'])
clear_auditor_status(self.devices)
self.auditor.run_audit(**kwargs)
self.assertTrue(os.path.isdir(quarantine_path))
def setup_bad_zero_byte(self, timestamp=None):
if timestamp is None:
timestamp = Timestamp(time.time())
self.auditor = auditor.ObjectAuditor(self.conf)
self.auditor.log_time = 0
etag = md5()
with self.disk_file.create() as writer:
etag = etag.hexdigest()
metadata = {
'ETag': etag,
'X-Timestamp': timestamp.internal,
'Content-Length': 10,
}
writer.put(metadata)
writer.commit(Timestamp(timestamp))
etag = md5()
etag = etag.hexdigest()
metadata['ETag'] = etag
DiskFile API, with reference implementation Refactor on-disk knowledge out of the object server by pushing the async update pickle creation to the new DiskFileManager class (name is not the best, so suggestions welcome), along with the REPLICATOR method logic. We also move the mount checking and thread pool storage to the new ondisk.Devices object, which then also becomes the new home of the audit_location_generator method. For the object server, a new setup() method is now called at the end of the controller's construction, and the _diskfile() method has been renamed to get_diskfile(), to allow implementation specific behavior. We then hide the need for the REST API layer to know how and where quarantining needs to be performed. There are now two places it is checked internally, on open() where we verify the content-length, name, and x-timestamp metadata, and in the reader on close where the etag metadata is checked if the entire file was read. We add a reader class to allow implementations to isolate the WSGI handling code for that specific environment (it is used no-where else in the REST APIs). This simplifies the caller's code to just use a "with" statement once open to avoid multiple points where close needs to be called. For a full historical comparison, including the usage patterns see: https://gist.github.com/portante/5488238 (as of master, 2b639f5, Merge "Fix 500 from account-quota This Commit middleware") --------------------------------+------------------------------------ DiskFileManager(conf) Methods: .pickle_async_update() .get_diskfile() .get_hashes() Attributes: .devices .logger .disk_chunk_size .keep_cache_size .bytes_per_sync DiskFile(a,c,o,keep_data_fp=) DiskFile(a,c,o) Methods: Methods: *.__iter__() .close(verify_file=) .is_deleted() .is_expired() .quarantine() .get_data_file_size() .open() .read_metadata() .create() .create() .write_metadata() .delete() .delete() Attributes: Attributes: .quarantined_dir .keep_cache .metadata *DiskFileReader() Methods: .__iter__() .close() Attributes: +.was_quarantined DiskWriter() DiskFileWriter() Methods: Methods: .write() .write() .put() .put() * Note that the DiskFile class * Note that the DiskReader() object implements all the methods returned by the necessary for a WSGI app DiskFileOpened.reader() method iterator implements all the methods necessary for a WSGI app iterator + Note that if the auditor is refactored to not use the DiskFile class, see https://review.openstack.org/44787 then we don't need the was_quarantined attribute A reference "in-memory" object server implementation of a backend DiskFile class in swift/obj/mem_server.py and swift/obj/mem_diskfile.py. One can also reference https://github.com/portante/gluster-swift/commits/diskfile for the proposed integration with the gluster-swift code based on these changes. Change-Id: I44e153fdb405a5743e9c05349008f94136764916 Signed-off-by: Peter Portante <peter.portante@redhat.com>
2013-09-12 19:51:18 -04:00
write_metadata(writer._fd, metadata)
2011-02-21 16:37:12 -08:00
def test_object_run_fast_track_all(self):
self.setup_bad_zero_byte()
kwargs = {'mode': 'once'}
self.auditor.run_audit(**kwargs)
quarantine_path = os.path.join(self.devices,
'sda', 'quarantined', 'objects')
self.assertTrue(os.path.isdir(quarantine_path))
2011-02-21 16:37:12 -08:00
def test_object_run_fast_track_zero(self):
self.setup_bad_zero_byte()
kwargs = {'mode': 'once'}
kwargs['zero_byte_fps'] = 50
called_args = [0]
def mock_get_auditor_status(path, logger, audit_type):
called_args[0] = audit_type
return get_auditor_status(path, logger, audit_type)
with mock.patch('swift.obj.diskfile.get_auditor_status',
mock_get_auditor_status):
self.auditor.run_audit(**kwargs)
2011-02-21 16:37:12 -08:00
quarantine_path = os.path.join(self.devices,
'sda', 'quarantined', 'objects')
self.assertTrue(os.path.isdir(quarantine_path))
self.assertEqual('ZBF', called_args[0])
2010-07-12 17:03:45 -05:00
2011-08-30 14:29:19 -07:00
def test_object_run_fast_track_zero_check_closed(self):
rat = [False]
class FakeFile(DiskFile):
DiskFile API, with reference implementation Refactor on-disk knowledge out of the object server by pushing the async update pickle creation to the new DiskFileManager class (name is not the best, so suggestions welcome), along with the REPLICATOR method logic. We also move the mount checking and thread pool storage to the new ondisk.Devices object, which then also becomes the new home of the audit_location_generator method. For the object server, a new setup() method is now called at the end of the controller's construction, and the _diskfile() method has been renamed to get_diskfile(), to allow implementation specific behavior. We then hide the need for the REST API layer to know how and where quarantining needs to be performed. There are now two places it is checked internally, on open() where we verify the content-length, name, and x-timestamp metadata, and in the reader on close where the etag metadata is checked if the entire file was read. We add a reader class to allow implementations to isolate the WSGI handling code for that specific environment (it is used no-where else in the REST APIs). This simplifies the caller's code to just use a "with" statement once open to avoid multiple points where close needs to be called. For a full historical comparison, including the usage patterns see: https://gist.github.com/portante/5488238 (as of master, 2b639f5, Merge "Fix 500 from account-quota This Commit middleware") --------------------------------+------------------------------------ DiskFileManager(conf) Methods: .pickle_async_update() .get_diskfile() .get_hashes() Attributes: .devices .logger .disk_chunk_size .keep_cache_size .bytes_per_sync DiskFile(a,c,o,keep_data_fp=) DiskFile(a,c,o) Methods: Methods: *.__iter__() .close(verify_file=) .is_deleted() .is_expired() .quarantine() .get_data_file_size() .open() .read_metadata() .create() .create() .write_metadata() .delete() .delete() Attributes: Attributes: .quarantined_dir .keep_cache .metadata *DiskFileReader() Methods: .__iter__() .close() Attributes: +.was_quarantined DiskWriter() DiskFileWriter() Methods: Methods: .write() .write() .put() .put() * Note that the DiskFile class * Note that the DiskReader() object implements all the methods returned by the necessary for a WSGI app DiskFileOpened.reader() method iterator implements all the methods necessary for a WSGI app iterator + Note that if the auditor is refactored to not use the DiskFile class, see https://review.openstack.org/44787 then we don't need the was_quarantined attribute A reference "in-memory" object server implementation of a backend DiskFile class in swift/obj/mem_server.py and swift/obj/mem_diskfile.py. One can also reference https://github.com/portante/gluster-swift/commits/diskfile for the proposed integration with the gluster-swift code based on these changes. Change-Id: I44e153fdb405a5743e9c05349008f94136764916 Signed-off-by: Peter Portante <peter.portante@redhat.com>
2013-09-12 19:51:18 -04:00
def _quarantine(self, data_file, msg):
2011-08-30 14:29:19 -07:00
rat[0] = True
DiskFile API, with reference implementation Refactor on-disk knowledge out of the object server by pushing the async update pickle creation to the new DiskFileManager class (name is not the best, so suggestions welcome), along with the REPLICATOR method logic. We also move the mount checking and thread pool storage to the new ondisk.Devices object, which then also becomes the new home of the audit_location_generator method. For the object server, a new setup() method is now called at the end of the controller's construction, and the _diskfile() method has been renamed to get_diskfile(), to allow implementation specific behavior. We then hide the need for the REST API layer to know how and where quarantining needs to be performed. There are now two places it is checked internally, on open() where we verify the content-length, name, and x-timestamp metadata, and in the reader on close where the etag metadata is checked if the entire file was read. We add a reader class to allow implementations to isolate the WSGI handling code for that specific environment (it is used no-where else in the REST APIs). This simplifies the caller's code to just use a "with" statement once open to avoid multiple points where close needs to be called. For a full historical comparison, including the usage patterns see: https://gist.github.com/portante/5488238 (as of master, 2b639f5, Merge "Fix 500 from account-quota This Commit middleware") --------------------------------+------------------------------------ DiskFileManager(conf) Methods: .pickle_async_update() .get_diskfile() .get_hashes() Attributes: .devices .logger .disk_chunk_size .keep_cache_size .bytes_per_sync DiskFile(a,c,o,keep_data_fp=) DiskFile(a,c,o) Methods: Methods: *.__iter__() .close(verify_file=) .is_deleted() .is_expired() .quarantine() .get_data_file_size() .open() .read_metadata() .create() .create() .write_metadata() .delete() .delete() Attributes: Attributes: .quarantined_dir .keep_cache .metadata *DiskFileReader() Methods: .__iter__() .close() Attributes: +.was_quarantined DiskWriter() DiskFileWriter() Methods: Methods: .write() .write() .put() .put() * Note that the DiskFile class * Note that the DiskReader() object implements all the methods returned by the necessary for a WSGI app DiskFileOpened.reader() method iterator implements all the methods necessary for a WSGI app iterator + Note that if the auditor is refactored to not use the DiskFile class, see https://review.openstack.org/44787 then we don't need the was_quarantined attribute A reference "in-memory" object server implementation of a backend DiskFile class in swift/obj/mem_server.py and swift/obj/mem_diskfile.py. One can also reference https://github.com/portante/gluster-swift/commits/diskfile for the proposed integration with the gluster-swift code based on these changes. Change-Id: I44e153fdb405a5743e9c05349008f94136764916 Signed-off-by: Peter Portante <peter.portante@redhat.com>
2013-09-12 19:51:18 -04:00
DiskFile._quarantine(self, data_file, msg)
2011-08-30 14:29:19 -07:00
self.setup_bad_zero_byte()
with mock.patch('swift.obj.diskfile.DiskFileManager.diskfile_cls',
FakeFile):
kwargs = {'mode': 'once'}
kwargs['zero_byte_fps'] = 50
self.auditor.run_audit(**kwargs)
2011-08-31 07:28:36 -07:00
quarantine_path = os.path.join(self.devices,
'sda', 'quarantined', 'objects')
self.assertTrue(os.path.isdir(quarantine_path))
self.assertTrue(rat[0])
2011-08-30 14:29:19 -07:00
@mock.patch.object(auditor.ObjectAuditor, 'run_audit')
@mock.patch('os.fork', return_value=0)
def test_with_inaccessible_object_location(self, mock_os_fork,
mock_run_audit):
# Need to ensure that any failures in run_audit do
# not prevent sys.exit() from running. Otherwise we get
# zombie processes.
e = OSError('permission denied')
mock_run_audit.side_effect = e
self.auditor = auditor.ObjectAuditor(self.conf)
self.assertRaises(SystemExit, self.auditor.fork_child, self)
def test_with_only_tombstone(self):
# sanity check that auditor doesn't touch solitary tombstones
ts_iter = make_timestamp_iter()
self.setup_bad_zero_byte(timestamp=next(ts_iter))
self.disk_file.delete(next(ts_iter))
files = os.listdir(self.disk_file._datadir)
self.assertEqual(1, len(files))
self.assertTrue(files[0].endswith('ts'))
kwargs = {'mode': 'once'}
self.auditor.run_audit(**kwargs)
files_after = os.listdir(self.disk_file._datadir)
self.assertEqual(files, files_after)
def test_with_tombstone_and_data(self):
# rsync replication could leave a tombstone and data file in object
# dir - verify they are both removed during audit
ts_iter = make_timestamp_iter()
ts_tomb = next(ts_iter)
ts_data = next(ts_iter)
self.setup_bad_zero_byte(timestamp=ts_data)
tomb_file_path = os.path.join(self.disk_file._datadir,
'%s.ts' % ts_tomb.internal)
with open(tomb_file_path, 'wb') as fd:
write_metadata(fd, {'X-Timestamp': ts_tomb.internal})
files = os.listdir(self.disk_file._datadir)
self.assertEqual(2, len(files))
self.assertTrue(os.path.basename(tomb_file_path) in files, files)
kwargs = {'mode': 'once'}
self.auditor.run_audit(**kwargs)
self.assertFalse(os.path.exists(self.disk_file._datadir))
DiskFile API, with reference implementation Refactor on-disk knowledge out of the object server by pushing the async update pickle creation to the new DiskFileManager class (name is not the best, so suggestions welcome), along with the REPLICATOR method logic. We also move the mount checking and thread pool storage to the new ondisk.Devices object, which then also becomes the new home of the audit_location_generator method. For the object server, a new setup() method is now called at the end of the controller's construction, and the _diskfile() method has been renamed to get_diskfile(), to allow implementation specific behavior. We then hide the need for the REST API layer to know how and where quarantining needs to be performed. There are now two places it is checked internally, on open() where we verify the content-length, name, and x-timestamp metadata, and in the reader on close where the etag metadata is checked if the entire file was read. We add a reader class to allow implementations to isolate the WSGI handling code for that specific environment (it is used no-where else in the REST APIs). This simplifies the caller's code to just use a "with" statement once open to avoid multiple points where close needs to be called. For a full historical comparison, including the usage patterns see: https://gist.github.com/portante/5488238 (as of master, 2b639f5, Merge "Fix 500 from account-quota This Commit middleware") --------------------------------+------------------------------------ DiskFileManager(conf) Methods: .pickle_async_update() .get_diskfile() .get_hashes() Attributes: .devices .logger .disk_chunk_size .keep_cache_size .bytes_per_sync DiskFile(a,c,o,keep_data_fp=) DiskFile(a,c,o) Methods: Methods: *.__iter__() .close(verify_file=) .is_deleted() .is_expired() .quarantine() .get_data_file_size() .open() .read_metadata() .create() .create() .write_metadata() .delete() .delete() Attributes: Attributes: .quarantined_dir .keep_cache .metadata *DiskFileReader() Methods: .__iter__() .close() Attributes: +.was_quarantined DiskWriter() DiskFileWriter() Methods: Methods: .write() .write() .put() .put() * Note that the DiskFile class * Note that the DiskReader() object implements all the methods returned by the necessary for a WSGI app DiskFileOpened.reader() method iterator implements all the methods necessary for a WSGI app iterator + Note that if the auditor is refactored to not use the DiskFile class, see https://review.openstack.org/44787 then we don't need the was_quarantined attribute A reference "in-memory" object server implementation of a backend DiskFile class in swift/obj/mem_server.py and swift/obj/mem_diskfile.py. One can also reference https://github.com/portante/gluster-swift/commits/diskfile for the proposed integration with the gluster-swift code based on these changes. Change-Id: I44e153fdb405a5743e9c05349008f94136764916 Signed-off-by: Peter Portante <peter.portante@redhat.com>
2013-09-12 19:51:18 -04:00
def _audit_tombstone(self, conf, ts_tomb, zero_byte_fps=0):
self.auditor = auditor.ObjectAuditor(conf)
self.auditor.log_time = 0
# create tombstone and hashes.pkl file, ensuring the tombstone is not
# reclaimed by mocking time to be the tombstone time
with mock.patch('time.time', return_value=float(ts_tomb)):
self.disk_file.delete(ts_tomb)
self.disk_file.manager.get_hashes(
self.devices + '/sda', '0', [], self.disk_file.policy)
suffix = basename(dirname(self.disk_file._datadir))
part_dir = dirname(dirname(self.disk_file._datadir))
# sanity checks...
self.assertEqual(['%s.ts' % ts_tomb.internal],
os.listdir(self.disk_file._datadir))
self.assertTrue(os.path.exists(os.path.join(part_dir, HASH_FILE)))
self.assertFalse(os.path.exists(
os.path.join(part_dir, HASH_INVALIDATIONS_FILE)))
# Run auditor
self.auditor.run_audit(mode='once', zero_byte_fps=zero_byte_fps)
# sanity check - auditor should not remove tombstone file
self.assertEqual(['%s.ts' % ts_tomb.internal],
os.listdir(self.disk_file._datadir))
return part_dir, suffix
def test_non_reclaimable_tombstone(self):
# audit with a recent tombstone
ts_tomb = Timestamp(time.time() - 55)
part_dir, suffix = self._audit_tombstone(self.conf, ts_tomb)
self.assertTrue(os.path.exists(os.path.join(part_dir, HASH_FILE)))
self.assertFalse(os.path.exists(
os.path.join(part_dir, HASH_INVALIDATIONS_FILE)))
def test_reclaimable_tombstone(self):
# audit with a reclaimable tombstone
ts_tomb = Timestamp(time.time() - 604800)
part_dir, suffix = self._audit_tombstone(self.conf, ts_tomb)
self.assertTrue(os.path.exists(os.path.join(part_dir, HASH_FILE)))
hash_invalid = os.path.join(part_dir, HASH_INVALIDATIONS_FILE)
self.assertTrue(os.path.exists(hash_invalid))
with open(hash_invalid, 'rb') as fp:
hash_val = fp.read()
self.assertEqual(suffix, hash_val.strip('\n'))
def test_non_reclaimable_tombstone_with_custom_reclaim_age(self):
# audit with a tombstone newer than custom reclaim age
ts_tomb = Timestamp(time.time() - 604800)
conf = dict(self.conf)
conf['reclaim_age'] = 2 * 604800
part_dir, suffix = self._audit_tombstone(conf, ts_tomb)
self.assertTrue(os.path.exists(os.path.join(part_dir, HASH_FILE)))
self.assertFalse(os.path.exists(
os.path.join(part_dir, HASH_INVALIDATIONS_FILE)))
def test_reclaimable_tombstone_with_custom_reclaim_age(self):
# audit with a tombstone older than custom reclaim age
ts_tomb = Timestamp(time.time() - 55)
conf = dict(self.conf)
conf['reclaim_age'] = 10
part_dir, suffix = self._audit_tombstone(conf, ts_tomb)
self.assertTrue(os.path.exists(os.path.join(part_dir, HASH_FILE)))
hash_invalid = os.path.join(part_dir, HASH_INVALIDATIONS_FILE)
self.assertTrue(os.path.exists(hash_invalid))
with open(hash_invalid, 'rb') as fp:
hash_val = fp.read()
self.assertEqual(suffix, hash_val.strip('\n'))
def test_reclaimable_tombstone_with_zero_byte_fps(self):
# audit with a tombstone older than reclaim age by a zero_byte_fps
# worker does not invalidate the hash
ts_tomb = Timestamp(time.time() - 604800)
part_dir, suffix = self._audit_tombstone(
self.conf, ts_tomb, zero_byte_fps=50)
self.assertTrue(os.path.exists(os.path.join(part_dir, HASH_FILE)))
self.assertFalse(os.path.exists(
os.path.join(part_dir, HASH_INVALIDATIONS_FILE)))
def _test_expired_object_is_ignored(self, zero_byte_fps):
# verify that an expired object does not get mistaken for a tombstone
audit = auditor.ObjectAuditor(self.conf)
audit.logger = FakeLogger()
audit.log_time = 0
now = time.time()
write_diskfile(self.disk_file, Timestamp(now - 20),
extra_metadata={'X-Delete-At': now - 10})
files = os.listdir(self.disk_file._datadir)
self.assertTrue([f for f in files if f.endswith('.data')]) # sanity
with mock.patch.object(auditor, 'dump_recon_cache'):
audit.run_audit(mode='once', zero_byte_fps=zero_byte_fps)
self.assertTrue(os.path.exists(self.disk_file._datadir))
part_dir = dirname(dirname(self.disk_file._datadir))
self.assertFalse(os.path.exists(
os.path.join(part_dir, HASH_INVALIDATIONS_FILE)))
self.assertEqual(files, os.listdir(self.disk_file._datadir))
self.assertFalse(audit.logger.get_lines_for_level('error'))
self.assertFalse(audit.logger.get_lines_for_level('warning'))
def test_expired_object_is_ignored(self):
self._test_expired_object_is_ignored(0)
def test_expired_object_is_ignored_with_zero_byte_fps(self):
self._test_expired_object_is_ignored(50)
def test_auditor_reclaim_age(self):
# if we don't have access to the replicator config section we'll use
# diskfile's default
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
router = auditor_worker.diskfile_router
for policy in POLICIES:
self.assertEqual(router[policy].reclaim_age, 86400 * 7)
# if the reclaim_age option is set explicitly we use that
self.conf['reclaim_age'] = '1800'
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
router = auditor_worker.diskfile_router
for policy in POLICIES:
self.assertEqual(router[policy].reclaim_age, 1800)
# if we have a real config we can be a little smarter
config_path = os.path.join(self.testdir, 'objserver.conf')
# if there is no object-replicator section we still have to fall back
# to default because we can't parse the config for that section!
stub_config = """
[object-auditor]
"""
with open(config_path, 'w') as f:
f.write(textwrap.dedent(stub_config))
conf = readconf(config_path, 'object-auditor')
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices)
router = auditor_worker.diskfile_router
for policy in POLICIES:
self.assertEqual(router[policy].reclaim_age, 86400 * 7)
# verify reclaim_age is of auditor config value
stub_config = """
[object-replicator]
[object-auditor]
reclaim_age = 60
"""
with open(config_path, 'w') as f:
f.write(textwrap.dedent(stub_config))
conf = readconf(config_path, 'object-auditor')
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices)
router = auditor_worker.diskfile_router
for policy in POLICIES:
self.assertEqual(router[policy].reclaim_age, 60)
# verify reclaim_age falls back to replicator config value
# if there is no auditor config value
config_path = os.path.join(self.testdir, 'objserver.conf')
stub_config = """
[object-replicator]
reclaim_age = 60
[object-auditor]
"""
with open(config_path, 'w') as f:
f.write(textwrap.dedent(stub_config))
conf = readconf(config_path, 'object-auditor')
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices)
router = auditor_worker.diskfile_router
for policy in POLICIES:
self.assertEqual(router[policy].reclaim_age, 60)
# we'll prefer our own DEFAULT section to the replicator though
self.assertEqual(auditor_worker.rsync_tempfile_timeout,
replicator.DEFAULT_RSYNC_TIMEOUT + 900)
stub_config = """
[DEFAULT]
reclaim_age = 1209600
[object-replicator]
reclaim_age = 1800
[object-auditor]
"""
with open(config_path, 'w') as f:
f.write(textwrap.dedent(stub_config))
conf = readconf(config_path, 'object-auditor')
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices)
router = auditor_worker.diskfile_router
for policy in POLICIES:
self.assertEqual(router[policy].reclaim_age, 1209600)
DiskFile API, with reference implementation Refactor on-disk knowledge out of the object server by pushing the async update pickle creation to the new DiskFileManager class (name is not the best, so suggestions welcome), along with the REPLICATOR method logic. We also move the mount checking and thread pool storage to the new ondisk.Devices object, which then also becomes the new home of the audit_location_generator method. For the object server, a new setup() method is now called at the end of the controller's construction, and the _diskfile() method has been renamed to get_diskfile(), to allow implementation specific behavior. We then hide the need for the REST API layer to know how and where quarantining needs to be performed. There are now two places it is checked internally, on open() where we verify the content-length, name, and x-timestamp metadata, and in the reader on close where the etag metadata is checked if the entire file was read. We add a reader class to allow implementations to isolate the WSGI handling code for that specific environment (it is used no-where else in the REST APIs). This simplifies the caller's code to just use a "with" statement once open to avoid multiple points where close needs to be called. For a full historical comparison, including the usage patterns see: https://gist.github.com/portante/5488238 (as of master, 2b639f5, Merge "Fix 500 from account-quota This Commit middleware") --------------------------------+------------------------------------ DiskFileManager(conf) Methods: .pickle_async_update() .get_diskfile() .get_hashes() Attributes: .devices .logger .disk_chunk_size .keep_cache_size .bytes_per_sync DiskFile(a,c,o,keep_data_fp=) DiskFile(a,c,o) Methods: Methods: *.__iter__() .close(verify_file=) .is_deleted() .is_expired() .quarantine() .get_data_file_size() .open() .read_metadata() .create() .create() .write_metadata() .delete() .delete() Attributes: Attributes: .quarantined_dir .keep_cache .metadata *DiskFileReader() Methods: .__iter__() .close() Attributes: +.was_quarantined DiskWriter() DiskFileWriter() Methods: Methods: .write() .write() .put() .put() * Note that the DiskFile class * Note that the DiskReader() object implements all the methods returned by the necessary for a WSGI app DiskFileOpened.reader() method iterator implements all the methods necessary for a WSGI app iterator + Note that if the auditor is refactored to not use the DiskFile class, see https://review.openstack.org/44787 then we don't need the was_quarantined attribute A reference "in-memory" object server implementation of a backend DiskFile class in swift/obj/mem_server.py and swift/obj/mem_diskfile.py. One can also reference https://github.com/portante/gluster-swift/commits/diskfile for the proposed integration with the gluster-swift code based on these changes. Change-Id: I44e153fdb405a5743e9c05349008f94136764916 Signed-off-by: Peter Portante <peter.portante@redhat.com>
2013-09-12 19:51:18 -04:00
def test_sleeper(self):
with mock.patch(
'time.sleep', mock.MagicMock()) as mock_sleep:
my_auditor = auditor.ObjectAuditor(self.conf)
my_auditor._sleep()
mock_sleep.assert_called_with(30)
my_conf = dict(interval=2)
my_conf.update(self.conf)
my_auditor = auditor.ObjectAuditor(my_conf)
my_auditor._sleep()
mock_sleep.assert_called_with(2)
my_auditor = auditor.ObjectAuditor(self.conf)
my_auditor.interval = 2
my_auditor._sleep()
mock_sleep.assert_called_with(2)
DiskFile API, with reference implementation Refactor on-disk knowledge out of the object server by pushing the async update pickle creation to the new DiskFileManager class (name is not the best, so suggestions welcome), along with the REPLICATOR method logic. We also move the mount checking and thread pool storage to the new ondisk.Devices object, which then also becomes the new home of the audit_location_generator method. For the object server, a new setup() method is now called at the end of the controller's construction, and the _diskfile() method has been renamed to get_diskfile(), to allow implementation specific behavior. We then hide the need for the REST API layer to know how and where quarantining needs to be performed. There are now two places it is checked internally, on open() where we verify the content-length, name, and x-timestamp metadata, and in the reader on close where the etag metadata is checked if the entire file was read. We add a reader class to allow implementations to isolate the WSGI handling code for that specific environment (it is used no-where else in the REST APIs). This simplifies the caller's code to just use a "with" statement once open to avoid multiple points where close needs to be called. For a full historical comparison, including the usage patterns see: https://gist.github.com/portante/5488238 (as of master, 2b639f5, Merge "Fix 500 from account-quota This Commit middleware") --------------------------------+------------------------------------ DiskFileManager(conf) Methods: .pickle_async_update() .get_diskfile() .get_hashes() Attributes: .devices .logger .disk_chunk_size .keep_cache_size .bytes_per_sync DiskFile(a,c,o,keep_data_fp=) DiskFile(a,c,o) Methods: Methods: *.__iter__() .close(verify_file=) .is_deleted() .is_expired() .quarantine() .get_data_file_size() .open() .read_metadata() .create() .create() .write_metadata() .delete() .delete() Attributes: Attributes: .quarantined_dir .keep_cache .metadata *DiskFileReader() Methods: .__iter__() .close() Attributes: +.was_quarantined DiskWriter() DiskFileWriter() Methods: Methods: .write() .write() .put() .put() * Note that the DiskFile class * Note that the DiskReader() object implements all the methods returned by the necessary for a WSGI app DiskFileOpened.reader() method iterator implements all the methods necessary for a WSGI app iterator + Note that if the auditor is refactored to not use the DiskFile class, see https://review.openstack.org/44787 then we don't need the was_quarantined attribute A reference "in-memory" object server implementation of a backend DiskFile class in swift/obj/mem_server.py and swift/obj/mem_diskfile.py. One can also reference https://github.com/portante/gluster-swift/commits/diskfile for the proposed integration with the gluster-swift code based on these changes. Change-Id: I44e153fdb405a5743e9c05349008f94136764916 Signed-off-by: Peter Portante <peter.portante@redhat.com>
2013-09-12 19:51:18 -04:00
def test_run_parallel_audit(self):
2011-02-24 12:27:20 -08:00
class StopForever(Exception):
pass
class Bogus(Exception):
pass
loop_error = Bogus('exception')
class LetMeOut(BaseException):
pass
2011-02-24 12:27:20 -08:00
class ObjectAuditorMock(object):
check_args = ()
check_kwargs = {}
check_device_dir = None
2011-02-24 12:27:20 -08:00
fork_called = 0
master = 0
wait_called = 0
2011-02-24 12:27:20 -08:00
def mock_run(self, *args, **kwargs):
self.check_args = args
self.check_kwargs = kwargs
if 'zero_byte_fps' in kwargs:
self.check_device_dir = kwargs.get('device_dirs')
2011-02-24 12:27:20 -08:00
def mock_sleep_stop(self):
2011-02-24 12:27:20 -08:00
raise StopForever('stop')
def mock_sleep_continue(self):
return
def mock_audit_loop_error(self, parent, zbo_fps,
override_devices=None, **kwargs):
raise loop_error
2011-02-24 12:27:20 -08:00
def mock_fork(self):
self.fork_called += 1
if self.master:
return self.fork_called
else:
return 0
def mock_wait(self):
self.wait_called += 1
return (self.wait_called, 0)
for i in string.ascii_letters[2:26]:
mkdirs(os.path.join(self.devices, 'sd%s' % i))
2011-02-24 12:27:20 -08:00
my_auditor = auditor.ObjectAuditor(dict(devices=self.devices,
mount_check='false',
zero_byte_files_per_second=89,
concurrency=1))
2011-02-24 12:27:20 -08:00
mocker = ObjectAuditorMock()
my_auditor.logger.exception = mock.MagicMock()
real_audit_loop = my_auditor.audit_loop
my_auditor.audit_loop = mocker.mock_audit_loop_error
my_auditor.run_audit = mocker.mock_run
2011-02-24 12:27:20 -08:00
was_fork = os.fork
was_wait = os.wait
os.fork = mocker.mock_fork
os.wait = mocker.mock_wait
2011-02-24 12:27:20 -08:00
try:
my_auditor._sleep = mocker.mock_sleep_stop
my_auditor.run_once(zero_byte_fps=50)
my_auditor.logger.exception.assert_called_once_with(
'ERROR auditing: %s', loop_error)
my_auditor.logger.exception.reset_mock()
self.assertRaises(StopForever, my_auditor.run_forever)
my_auditor.logger.exception.assert_called_once_with(
'ERROR auditing: %s', loop_error)
my_auditor.audit_loop = real_audit_loop
2011-02-24 12:27:20 -08:00
self.assertRaises(StopForever,
my_auditor.run_forever, zero_byte_fps=50)
self.assertEqual(mocker.check_kwargs['zero_byte_fps'], 50)
self.assertEqual(mocker.fork_called, 0)
2011-02-24 12:27:20 -08:00
self.assertRaises(SystemExit, my_auditor.run_once)
self.assertEqual(mocker.fork_called, 1)
self.assertEqual(mocker.check_kwargs['zero_byte_fps'], 89)
self.assertEqual(mocker.check_device_dir, [])
self.assertEqual(mocker.check_args, ())
2011-02-24 12:27:20 -08:00
device_list = ['sd%s' % i for i in string.ascii_letters[2:10]]
device_string = ','.join(device_list)
device_string_bogus = device_string + ',bogus'
mocker.fork_called = 0
self.assertRaises(SystemExit, my_auditor.run_once,
devices=device_string_bogus)
self.assertEqual(mocker.fork_called, 1)
self.assertEqual(mocker.check_kwargs['zero_byte_fps'], 89)
self.assertEqual(sorted(mocker.check_device_dir), device_list)
mocker.master = 1
mocker.fork_called = 0
self.assertRaises(StopForever, my_auditor.run_forever)
# Fork is called 2 times since the zbf process is forked just
# once before self._sleep() is called and StopForever is raised
# Also wait is called just once before StopForever is raised
self.assertEqual(mocker.fork_called, 2)
self.assertEqual(mocker.wait_called, 1)
my_auditor._sleep = mocker.mock_sleep_continue
my_auditor.audit_loop = works_only_once(my_auditor.audit_loop,
LetMeOut())
my_auditor.concurrency = 2
mocker.fork_called = 0
mocker.wait_called = 0
self.assertRaises(LetMeOut, my_auditor.run_forever)
# Fork is called no. of devices + (no. of devices)/2 + 1 times
# since zbf process is forked (no.of devices)/2 + 1 times
no_devices = len(os.listdir(self.devices))
self.assertEqual(mocker.fork_called, no_devices + no_devices / 2
+ 1)
self.assertEqual(mocker.wait_called, no_devices + no_devices / 2
+ 1)
2011-02-24 12:27:20 -08:00
finally:
os.fork = was_fork
os.wait = was_wait
2011-02-24 12:27:20 -08:00
def test_run_audit_once(self):
my_auditor = auditor.ObjectAuditor(dict(devices=self.devices,
mount_check='false',
zero_byte_files_per_second=89,
concurrency=1))
forked_pids = []
next_zbf_pid = [2]
next_normal_pid = [1001]
outstanding_pids = [[]]
def fake_fork_child(**kwargs):
if len(forked_pids) > 10:
# something's gone horribly wrong
raise BaseException("forking too much")
# ZBF pids are all smaller than the normal-audit pids; this way
# we can return them first.
#
# Also, ZBF pids are even and normal-audit pids are odd; this is
# so humans seeing this test fail can better tell what's happening.
if kwargs.get('zero_byte_fps'):
pid = next_zbf_pid[0]
next_zbf_pid[0] += 2
else:
pid = next_normal_pid[0]
next_normal_pid[0] += 2
outstanding_pids[0].append(pid)
forked_pids.append(pid)
return pid
def fake_os_wait():
# Smallest pid first; that's ZBF if we have one, else normal
outstanding_pids[0].sort()
pid = outstanding_pids[0].pop(0)
return (pid, 0) # (pid, status)
with mock.patch("swift.obj.auditor.os.wait", fake_os_wait), \
mock.patch.object(my_auditor, 'fork_child', fake_fork_child), \
mock.patch.object(my_auditor, '_sleep', lambda *a: None):
my_auditor.run_once()
self.assertEqual(sorted(forked_pids), [2, 1001])
def test_run_parallel_audit_once(self):
my_auditor = auditor.ObjectAuditor(
dict(devices=self.devices, mount_check='false',
zero_byte_files_per_second=89, concurrency=2))
# ZBF pids are smaller than the normal-audit pids; this way we can
# return them first from our mocked os.wait().
#
# Also, ZBF pids are even and normal-audit pids are odd; this is so
# humans seeing this test fail can better tell what's happening.
forked_pids = []
next_zbf_pid = [2]
next_normal_pid = [1001]
outstanding_pids = [[]]
def fake_fork_child(**kwargs):
if len(forked_pids) > 10:
# something's gone horribly wrong; try not to hang the test
# run because of it
raise BaseException("forking too much")
if kwargs.get('zero_byte_fps'):
pid = next_zbf_pid[0]
next_zbf_pid[0] += 2
else:
pid = next_normal_pid[0]
next_normal_pid[0] += 2
outstanding_pids[0].append(pid)
forked_pids.append(pid)
return pid
def fake_os_wait():
if not outstanding_pids[0]:
raise BaseException("nobody waiting")
# ZBF auditor finishes first
outstanding_pids[0].sort()
pid = outstanding_pids[0].pop(0)
return (pid, 0) # (pid, status)
# make sure we've got enough devs that the ZBF auditor can finish
# before all the normal auditors have been started
mkdirs(os.path.join(self.devices, 'sdc'))
mkdirs(os.path.join(self.devices, 'sdd'))
with mock.patch("swift.obj.auditor.os.wait", fake_os_wait), \
mock.patch.object(my_auditor, 'fork_child', fake_fork_child), \
mock.patch.object(my_auditor, '_sleep', lambda *a: None):
my_auditor.run_once()
self.assertEqual(sorted(forked_pids), [2, 1001, 1003, 1005, 1007])
def test_run_parallel_audit_once_failed_fork(self):
my_auditor = auditor.ObjectAuditor(
dict(devices=self.devices, mount_check='false',
concurrency=2))
start_pid = [1001]
outstanding_pids = []
failed_once = [False]
def failing_fork(**kwargs):
# this fork fails only on the 2nd call
# it's enough to cause the growth of orphaned child processes
if len(outstanding_pids) > 0 and not failed_once[0]:
failed_once[0] = True
raise OSError
start_pid[0] += 2
pid = start_pid[0]
outstanding_pids.append(pid)
return pid
def fake_wait():
return outstanding_pids.pop(0), 0
with mock.patch("swift.obj.auditor.os.wait", fake_wait), \
mock.patch.object(my_auditor, 'fork_child', failing_fork), \
mock.patch.object(my_auditor, '_sleep', lambda *a: None):
for i in range(3):
my_auditor.run_once()
self.assertEqual(len(outstanding_pids), 0,
"orphaned children left {0}, expected 0."
.format(outstanding_pids))
2010-07-12 17:03:45 -05:00
if __name__ == '__main__':
unittest.main()