Merge "Support delayed queues for swift"

This commit is contained in:
Zuul
2017-12-14 06:24:09 +00:00
committed by Gerrit Code Review
2 changed files with 21 additions and 5 deletions

View File

@@ -103,8 +103,12 @@ class ClaimController(storage.Claim):
dlq = True if ('_max_claim_count' in queue_meta and dlq = True if ('_max_claim_count' in queue_meta and
'_dead_letter_queue' in queue_meta) else False '_dead_letter_queue' in queue_meta) else False
include_delayed = False if queue_meta.get('_default_message_delay',
0) else True
messages, marker = message_ctrl._list(queue, project, limit=limit, messages, marker = message_ctrl._list(queue, project, limit=limit,
include_claimed=False) include_claimed=False,
include_delayed=include_delayed)
claimed = [] claimed = []
for msg in messages: for msg in messages:

View File

@@ -49,6 +49,8 @@ class MessageController(storage.Message):
+--------------+-----------------------------------------+ +--------------+-----------------------------------------+
| Claim ID | Object content 'claim_id' | | Claim ID | Object content 'claim_id' |
+--------------+-----------------------------------------+ +--------------+-----------------------------------------+
| Delay Expires| Object content 'delay_expires' |
+--------------+-----------------------------------------+
| Expires | Object Delete-After header | | Expires | Object Delete-After header |
+--------------------------------------------------------+ +--------------------------------------------------------+
""" """
@@ -90,7 +92,8 @@ class MessageController(storage.Message):
def _list(self, queue, project=None, marker=None, def _list(self, queue, project=None, marker=None,
limit=storage.DEFAULT_MESSAGES_PER_PAGE, limit=storage.DEFAULT_MESSAGES_PER_PAGE,
echo=False, client_uuid=None, echo=False, client_uuid=None,
include_claimed=False, sort=1): include_claimed=False, include_delayed=False,
sort=1):
"""List messages in the queue, oldest first(ish) """List messages in the queue, oldest first(ish)
Time ordering and message inclusion in lists are soft, there is no Time ordering and message inclusion in lists are soft, there is no
@@ -128,6 +131,12 @@ class MessageController(storage.Message):
queue, msg['claim_id'], project) queue, msg['claim_id'], project)
return claim_obj is not None and claim_obj['ttl'] > 0 return claim_obj is not None and claim_obj['ttl'] > 0
def is_delayed(msg, headers):
if include_delayed:
return False
now = timeutils.utcnow_ts()
return msg.get('delay_expires', 0) > now
def is_echo(msg, headers): def is_echo(msg, headers):
if echo: if echo:
return False return False
@@ -136,6 +145,7 @@ class MessageController(storage.Message):
filters = [ filters = [
is_echo, is_echo,
is_claimed, is_claimed,
is_delayed,
] ]
marker = {} marker = {}
get_object = functools.partial(client.get_object, container) get_object = functools.partial(client.get_object, container)
@@ -149,9 +159,9 @@ class MessageController(storage.Message):
def list(self, queue, project=None, marker=None, def list(self, queue, project=None, marker=None,
limit=storage.DEFAULT_MESSAGES_PER_PAGE, limit=storage.DEFAULT_MESSAGES_PER_PAGE,
echo=False, client_uuid=None, echo=False, client_uuid=None,
include_claimed=False): include_claimed=False, include_delayed=False,):
return self._list(queue, project, marker, limit, echo, return self._list(queue, project, marker, limit, echo,
client_uuid, include_claimed) client_uuid, include_claimed, include_delayed)
def first(self, queue, project=None, sort=1): def first(self, queue, project=None, sort=1):
if sort not in (1, -1): if sort not in (1, -1):
@@ -212,9 +222,11 @@ class MessageController(storage.Message):
def _create_msg(self, queue, msg, client_uuid, project): def _create_msg(self, queue, msg, client_uuid, project):
slug = str(uuid.uuid1()) slug = str(uuid.uuid1())
now = timeutils.utcnow_ts()
contents = jsonutils.dumps( contents = jsonutils.dumps(
{'body': msg.get('body', {}), 'claim_id': None, {'body': msg.get('body', {}), 'claim_id': None,
'ttl': msg['ttl'], 'claim_count': 0}) 'ttl': msg['ttl'], 'claim_count': 0,
'delay_expires': now + msg.get('delay', 0)})
utils._put_or_create_container( utils._put_or_create_container(
self._client, self._client,
utils._message_container(queue, project), utils._message_container(queue, project),