Make queues lazy in subscriptions

Currently queues are lazy on operations with messages, but not with
subscriptions. It means the user is forced to pre-create a queue before
creating a subscription to the queue, which is not very convenient.

Also currently if a queue has a subscription, the subscription will
stay on even if the user will delete the queue. But even if so Zaqar
will still work normally. This is strange that subscriptions can exist
with deleted corresponding queues, but it's impossible to create a
subscription to yet unexising queue.

This patch makes queues lazy also on operations with subscriptions, so
the user will be able to subscribe to yet unexisting queue.

Also this patch modifies tests, to make sure Zaqar's subscriptions
work both with pre-created queues and non-existing queues.

APIImpact
DocImpact
Implements: blueprint lazy-queues-in-subscriptions

Change-Id: I814b503243c4e06e74acc6b709bda4269df889e9
This commit is contained in:
Eva Balycheva 2016-05-25 05:19:25 +03:00
parent d01946c40f
commit 4c2b7e04db
8 changed files with 75 additions and 41 deletions

View File

@ -0,0 +1,8 @@
---
features:
- Queues now behave lazy in subscriptions also. So there is no need for
the user to pre-create a queue before creating a subscription for this
queue. Zaqar will create the queue automatically on the subscription
creation request. As before, all subscriptions will continue to stay
active even if the corresponding queue was deleted.

View File

@ -809,6 +809,9 @@ class Endpoints(object):
'options': req._body.get('options'), 'options': req._body.get('options'),
'ttl': req._body.get('ttl')} 'ttl': req._body.get('ttl')}
self._validate.subscription_posting(data) self._validate.subscription_posting(data)
self._validate.queue_identification(queue_name, project_id)
if not self._queue_controller.exists(queue_name, project_id):
self._queue_controller.create(queue_name, project=project_id)
created = self._subscription_controller.create(queue_name, created = self._subscription_controller.create(queue_name,
subscriber, subscriber,
data['ttl'], data['ttl'],
@ -818,11 +821,6 @@ class Endpoints(object):
LOG.debug(ex) LOG.debug(ex)
headers = {'status': 400} headers = {'status': 400}
return api_utils.error_response(req, ex, headers) return api_utils.error_response(req, ex, headers)
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
error = _('Queue %s does not exist.') % queue_name
headers = {'status': 404}
return api_utils.error_response(req, ex, headers, error)
except storage_errors.ExceptionBase as ex: except storage_errors.ExceptionBase as ex:
LOG.exception(ex) LOG.exception(ex)
error = _('Subscription %s could not be created.') % queue_name error = _('Subscription %s could not be created.') % queue_name

View File

@ -53,7 +53,6 @@ class SubscriptionController(base.Subscription):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(SubscriptionController, self).__init__(*args, **kwargs) super(SubscriptionController, self).__init__(*args, **kwargs)
self._collection = self.driver.subscriptions_database.subscriptions self._collection = self.driver.subscriptions_database.subscriptions
self._queue_ctrl = self.driver.queue_controller
self._collection.ensure_index(SUBSCRIPTIONS_INDEX, unique=True) self._collection.ensure_index(SUBSCRIPTIONS_INDEX, unique=True)
# NOTE(flwang): MongoDB will automatically delete the subscription # NOTE(flwang): MongoDB will automatically delete the subscription
# from the subscriptions collection when the subscription's 'e' value # from the subscriptions collection when the subscription's 'e' value
@ -109,8 +108,6 @@ class SubscriptionController(base.Subscription):
now_dt = datetime.datetime.utcfromtimestamp(now) now_dt = datetime.datetime.utcfromtimestamp(now)
expires = now_dt + datetime.timedelta(seconds=ttl) expires = now_dt + datetime.timedelta(seconds=ttl)
if not self._queue_ctrl.exists(source, project):
raise errors.QueueDoesNotExist(source, project)
try: try:
subscription_id = self._collection.insert({'s': source, subscription_id = self._collection.insert({'s': source,
'u': subscriber, 'u': subscriber,

View File

@ -19,7 +19,6 @@ import msgpack
from oslo_utils import timeutils from oslo_utils import timeutils
import redis import redis
from zaqar.common import decorators
from zaqar.common import utils as common_utils from zaqar.common import utils as common_utils
from zaqar.storage import base from zaqar.storage import base
from zaqar.storage import errors from zaqar.storage import errors
@ -53,10 +52,6 @@ class SubscriptionController(base.Subscription):
use_bin_type=True).pack use_bin_type=True).pack
self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8') self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8')
@decorators.lazy_property(write=False)
def _queue_ctrl(self):
return self.driver.queue_controller
@utils.raises_conn_error @utils.raises_conn_error
@utils.retries_on_connection_error @utils.retries_on_connection_error
def list(self, queue, project=None, marker=None, limit=10): def list(self, queue, project=None, marker=None, limit=10):
@ -117,8 +112,6 @@ class SubscriptionController(base.Subscription):
'o': self._packer(options), 'o': self._packer(options),
'p': project} 'p': project}
if not self._queue_ctrl.exists(queue, project):
raise errors.QueueDoesNotExist(queue, project)
try: try:
# Pipeline ensures atomic inserts. # Pipeline ensures atomic inserts.
with self._client.pipeline() as pipe: with self._client.pipeline() as pipe:

View File

@ -964,6 +964,7 @@ class ClaimControllerTest(ControllerBaseTest):
project=self.project) project=self.project)
@ddt.ddt
class SubscriptionControllerTest(ControllerBaseTest): class SubscriptionControllerTest(ControllerBaseTest):
"""Subscriptions Controller base tests. """Subscriptions Controller base tests.
@ -974,10 +975,7 @@ class SubscriptionControllerTest(ControllerBaseTest):
def setUp(self): def setUp(self):
super(SubscriptionControllerTest, self).setUp() super(SubscriptionControllerTest, self).setUp()
self.subscription_controller = self.driver.subscription_controller self.subscription_controller = self.driver.subscription_controller
# Lets create a queue as the source of subscription
self.queue_controller = self.driver.queue_controller self.queue_controller = self.driver.queue_controller
self.queue_controller.create(self.queue_name, project=self.project)
self.source = self.queue_name self.source = self.queue_name
self.subscriber = 'http://trigger.me' self.subscriber = 'http://trigger.me'
@ -988,7 +986,16 @@ class SubscriptionControllerTest(ControllerBaseTest):
self.queue_controller.delete(self.queue_name, project=self.project) self.queue_controller.delete(self.queue_name, project=self.project)
super(SubscriptionControllerTest, self).tearDown() super(SubscriptionControllerTest, self).tearDown()
def test_list(self): # NOTE(Eva-i): this method helps to test cases when the queue is
# pre-created and when it's not.
def _precreate_queue(self, precreate_queue):
if precreate_queue:
# Let's create a queue as the source of subscription
self.queue_controller.create(self.queue_name, project=self.project)
@ddt.data(True, False)
def test_list(self, precreate_queue):
self._precreate_queue(precreate_queue)
for s in six.moves.xrange(15): for s in six.moves.xrange(15):
subscriber = 'http://fake_{0}'.format(s) subscriber = 'http://fake_{0}'.format(s)
s_id = self.subscription_controller.create( s_id = self.subscription_controller.create(
@ -1019,14 +1026,18 @@ class SubscriptionControllerTest(ControllerBaseTest):
subscriptions))) subscriptions)))
self.assertEqual(5, len(subscriptions)) self.assertEqual(5, len(subscriptions))
def test_get_raises_if_subscription_does_not_exist(self): @ddt.data(True, False)
def test_get_raises_if_subscription_does_not_exist(self, precreate_queue):
self._precreate_queue(precreate_queue)
self.assertRaises(errors.SubscriptionDoesNotExist, self.assertRaises(errors.SubscriptionDoesNotExist,
self.subscription_controller.get, self.subscription_controller.get,
self.queue_name, self.queue_name,
'notexists', 'notexists',
project=self.project) project=self.project)
def test_lifecycle(self): @ddt.data(True, False)
def test_lifecycle(self, precreate_queue):
self._precreate_queue(precreate_queue)
s_id = self.subscription_controller.create(self.source, s_id = self.subscription_controller.create(self.source,
self.subscriber, self.subscriber,
self.ttl, self.ttl,
@ -1068,7 +1079,9 @@ class SubscriptionControllerTest(ControllerBaseTest):
self.subscription_controller.get, self.subscription_controller.get,
self.queue_name, s_id) self.queue_name, s_id)
def test_create_existed(self): @ddt.data(True, False)
def test_create_existed(self, precreate_queue):
self._precreate_queue(precreate_queue)
s_id = self.subscription_controller.create( s_id = self.subscription_controller.create(
self.source, self.source,
self.subscriber, self.subscriber,
@ -1087,15 +1100,23 @@ class SubscriptionControllerTest(ControllerBaseTest):
self.assertIsNone(s_id) self.assertIsNone(s_id)
def test_nonexist_source(self): def test_nonexist_source(self):
self.assertRaises(errors.QueueDoesNotExist, try:
self.subscription_controller.create, s_id = self.subscription_controller.create('fake_queue_name',
'fake_queue_name', self.subscriber,
self.subscriber, self.ttl,
self.ttl, self.options,
self.options, self.project)
self.project) except Exception:
self.fail("Subscription controller should not raise an exception "
"in case of non-existing queue.")
self.addCleanup(self.subscription_controller.delete, self.source, s_id,
self.project)
def test_update_raises_if_try_to_update_to_existing_subscription(self): @ddt.data(True, False)
def test_update_raises_if_try_to_update_to_existing_subscription(
self,
precreate_queue):
self._precreate_queue(precreate_queue)
# create two subscriptions: fake_0 and fake_1 # create two subscriptions: fake_0 and fake_1
ids = [] ids = []
for s in six.moves.xrange(2): for s in six.moves.xrange(2):
@ -1125,7 +1146,10 @@ class SubscriptionControllerTest(ControllerBaseTest):
project=self.project, project=self.project,
**update_fields) **update_fields)
def test_update_raises_if_subscription_does_not_exist(self): @ddt.data(True, False)
def test_update_raises_if_subscription_does_not_exist(self,
precreate_queue):
self._precreate_queue(precreate_queue)
update_fields = { update_fields = {
'subscriber': 'http://fake' 'subscriber': 'http://fake'
} }
@ -1612,7 +1636,6 @@ class FlavorsControllerTest(ControllerBaseTest):
def _insert_fixtures(controller, queue_name, project=None, def _insert_fixtures(controller, queue_name, project=None,
client_uuid=None, num=4, ttl=120): client_uuid=None, num=4, ttl=120):
def messages(): def messages():
for n in six.moves.xrange(num): for n in six.moves.xrange(num):
yield { yield {

View File

@ -155,10 +155,24 @@ class SubscriptionTest(base.V1_1Base):
req = test_utils.create_request(action, body, self.headers) req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False) self.protocol.onMessage(req, False)
[subscriber] = list(
next(
self.boot.storage.subscription_controller.list(
'shuffle', self.project_id)))
self.addCleanup(
self.boot.storage.subscription_controller.delete, 'shuffle',
subscriber['id'], project=self.project_id)
response = {
'body': {'message': 'Subscription shuffle created.',
'subscription_id': subscriber['id']},
'headers': {'status': 201},
'request': {'action': 'subscription_create',
'body': {'queue_name': 'shuffle', 'ttl': 600},
'api': 'v2', 'headers': self.headers}}
self.assertEqual(1, sender.call_count) self.assertEqual(1, sender.call_count)
self.assertEqual( self.assertEqual(response, json.loads(sender.call_args[0][0]))
'Queue shuffle does not exist.',
json.loads(sender.call_args[0][0])['body']['error'])
def test_subscription_get(self): def test_subscription_get(self):
sub = self.boot.storage.subscription_controller.create( sub = self.boot.storage.subscription_controller.create(

View File

@ -100,7 +100,8 @@ def public_endpoints(driver, conf):
('/queues/{queue_name}/subscriptions', ('/queues/{queue_name}/subscriptions',
subscriptions.CollectionResource(driver._validate, subscriptions.CollectionResource(driver._validate,
subscription_controller, subscription_controller,
defaults.subscription_ttl)), defaults.subscription_ttl,
queue_controller)),
('/queues/{queue_name}/subscriptions/{subscription_id}', ('/queues/{queue_name}/subscriptions/{subscription_id}',
subscriptions.ItemResource(driver._validate, subscriptions.ItemResource(driver._validate,

View File

@ -109,13 +109,14 @@ class ItemResource(object):
class CollectionResource(object): class CollectionResource(object):
__slots__ = ('_subscription_controller', '_validate', __slots__ = ('_subscription_controller', '_validate',
'_default_subscription_ttl') '_default_subscription_ttl', '_queue_controller')
def __init__(self, validate, subscription_controller, def __init__(self, validate, subscription_controller,
default_subscription_ttl): default_subscription_ttl, queue_controller):
self._subscription_controller = subscription_controller self._subscription_controller = subscription_controller
self._validate = validate self._validate = validate
self._default_subscription_ttl = default_subscription_ttl self._default_subscription_ttl = default_subscription_ttl
self._queue_controller = queue_controller
@decorators.TransportLog("Subscription collection") @decorators.TransportLog("Subscription collection")
@acl.enforce("subscription:get_all") @acl.enforce("subscription:get_all")
@ -171,6 +172,8 @@ class CollectionResource(object):
document = {} document = {}
try: try:
if not self._queue_controller.exists(queue_name, project_id):
self._queue_controller.create(queue_name, project=project_id)
self._validate.subscription_posting(document) self._validate.subscription_posting(document)
subscriber = document['subscriber'] subscriber = document['subscriber']
ttl = document.get('ttl', self._default_subscription_ttl) ttl = document.get('ttl', self._default_subscription_ttl)
@ -181,9 +184,6 @@ class CollectionResource(object):
options, options,
project=project_id) project=project_id)
except storage_errors.QueueDoesNotExist as ex:
LOG.exception(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
except validation.ValidationFailed as ex: except validation.ValidationFailed as ex:
LOG.debug(ex) LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))