Merge "py3: port expirer"
This commit is contained in:
commit
720979e3f1
@ -13,7 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from six.moves import urllib
|
||||
import six
|
||||
|
||||
from random import random
|
||||
from time import time
|
||||
@ -31,6 +31,7 @@ from swift.common.utils import get_logger, dump_recon_cache, split_path, \
|
||||
Timestamp, config_true_value
|
||||
from swift.common.http import HTTP_NOT_FOUND, HTTP_CONFLICT, \
|
||||
HTTP_PRECONDITION_FAILED
|
||||
from swift.common.swob import wsgi_quote, str_to_wsgi
|
||||
|
||||
from swift.container.reconciler import direct_delete_container_entry
|
||||
|
||||
@ -182,6 +183,8 @@ class ObjectExpirer(Daemon):
|
||||
:param divisor: a divisor number
|
||||
:return: an integer to decide which expirer is assigned to the task
|
||||
"""
|
||||
if not isinstance(name, bytes):
|
||||
name = name.encode('utf8')
|
||||
# md5 is only used for shuffling mod
|
||||
return int(hashlib.md5(name).hexdigest(), 16) % divisor
|
||||
|
||||
@ -229,7 +232,10 @@ class ObjectExpirer(Daemon):
|
||||
"""
|
||||
for task_account, task_container in task_account_container_list:
|
||||
for o in self.swift.iter_objects(task_account, task_container):
|
||||
if six.PY2:
|
||||
task_object = o['name'].encode('utf8')
|
||||
else:
|
||||
task_object = o['name']
|
||||
try:
|
||||
delete_timestamp, target_account, target_container, \
|
||||
target_object = self.parse_task_obj(task_object)
|
||||
@ -439,7 +445,7 @@ class ObjectExpirer(Daemon):
|
||||
:raises UnexpectedResponse: if the delete was unsuccessful and
|
||||
should be retried later
|
||||
"""
|
||||
path = '/v1/' + urllib.parse.quote(actual_obj.lstrip('/'))
|
||||
path = '/v1/' + wsgi_quote(str_to_wsgi(actual_obj.lstrip('/')))
|
||||
self.swift.make_request(
|
||||
'DELETE', path,
|
||||
{'X-If-Delete-At': timestamp.normal,
|
||||
|
@ -69,8 +69,9 @@ class FakeInternalClient(object):
|
||||
|
||||
def iter_containers(self, account, prefix=''):
|
||||
acc_dict = self.aco_dict[account]
|
||||
return sorted([{'name': six.text_type(container)} for container in
|
||||
acc_dict if container.startswith(prefix)])
|
||||
return [{'name': six.text_type(container)}
|
||||
for container in sorted(acc_dict)
|
||||
if container.startswith(prefix)]
|
||||
|
||||
def delete_container(*a, **kw):
|
||||
pass
|
||||
@ -131,9 +132,11 @@ class TestObjectExpirer(TestCase):
|
||||
|
||||
# target object paths which should be expirerd now
|
||||
self.expired_target_path_list = [
|
||||
swob.wsgi_to_str(tgt) for tgt in (
|
||||
'a0/c0/o0', 'a1/c1/o1', 'a2/c2/o2', 'a3/c3/o3', 'a4/c4/o4',
|
||||
'a5/c5/o5', 'a6/c6/o6', 'a7/c7/o7',
|
||||
'a8/c8/o8\xe2\x99\xa1', 'a9/c9/o9\xc3\xb8',
|
||||
)
|
||||
]
|
||||
|
||||
def tearDown(self):
|
||||
@ -656,23 +659,23 @@ class TestObjectExpirer(TestCase):
|
||||
self.expirer.run_once()
|
||||
self.assertEqual(self.expirer.report_objects, 10)
|
||||
|
||||
def test_delete_actual_object_does_not_get_unicode(self):
|
||||
got_unicode = [False]
|
||||
def test_delete_actual_object_gets_native_string(self):
|
||||
got_str = [False]
|
||||
|
||||
def delete_actual_object_test_for_unicode(actual_obj, timestamp):
|
||||
if isinstance(actual_obj, six.text_type):
|
||||
got_unicode[0] = True
|
||||
def delete_actual_object_test_for_string(actual_obj, timestamp):
|
||||
if isinstance(actual_obj, str):
|
||||
got_str[0] = True
|
||||
|
||||
self.assertEqual(self.expirer.report_objects, 0)
|
||||
|
||||
with mock.patch.object(self.expirer, 'delete_actual_object',
|
||||
delete_actual_object_test_for_unicode), \
|
||||
delete_actual_object_test_for_string), \
|
||||
mock.patch.object(self.expirer, 'pop_queue',
|
||||
lambda a, c, o: None):
|
||||
self.expirer.run_once()
|
||||
|
||||
self.assertEqual(self.expirer.report_objects, 10)
|
||||
self.assertFalse(got_unicode[0])
|
||||
self.assertTrue(got_str[0])
|
||||
|
||||
def test_failed_delete_continues_on(self):
|
||||
def fail_delete_container(*a, **kw):
|
||||
@ -713,19 +716,12 @@ class TestObjectExpirer(TestCase):
|
||||
interval = 1234
|
||||
x = expirer.ObjectExpirer({'__file__': 'unit_test',
|
||||
'interval': interval})
|
||||
orig_random = expirer.random
|
||||
orig_sleep = expirer.sleep
|
||||
try:
|
||||
expirer.random = not_random
|
||||
expirer.sleep = not_sleep
|
||||
with mock.patch.object(expirer, 'random', not_random), \
|
||||
mock.patch.object(expirer, 'sleep', not_sleep), \
|
||||
self.assertRaises(SystemExit) as caught:
|
||||
x.run_once = raise_system_exit
|
||||
x.run_forever()
|
||||
except SystemExit as err:
|
||||
pass
|
||||
finally:
|
||||
expirer.random = orig_random
|
||||
expirer.sleep = orig_sleep
|
||||
self.assertEqual(str(err), 'test_run_forever')
|
||||
self.assertEqual(str(caught.exception), 'test_run_forever')
|
||||
self.assertEqual(last_not_sleep, 0.5 * interval)
|
||||
|
||||
def test_run_forever_catches_usual_exceptions(self):
|
||||
@ -872,7 +868,8 @@ class TestObjectExpirer(TestCase):
|
||||
with mocked_http_conn(
|
||||
200, 200, 200, give_connect=capture_requests) as fake_conn:
|
||||
x.pop_queue('a', 'c', 'o')
|
||||
self.assertRaises(StopIteration, fake_conn.code_iter.next)
|
||||
with self.assertRaises(StopIteration):
|
||||
next(fake_conn.code_iter)
|
||||
for method, path in requests:
|
||||
self.assertEqual(method, 'DELETE')
|
||||
device, part, account, container, obj = utils.split_path(
|
||||
|
Loading…
x
Reference in New Issue
Block a user