From 4c2b7e04dbca0ae1e5d3480c8bad60dcbfac8ff8 Mon Sep 17 00:00:00 2001 From: Eva Balycheva Date: Wed, 25 May 2016 05:19:25 +0300 Subject: [PATCH] Make queues lazy in subscriptions Currently queues are lazy on operations with messages, but not with subscriptions. It means the user is forced to pre-create a queue before creating a subscription to the queue, which is not very convenient. Also currently if a queue has a subscription, the subscription will stay on even if the user will delete the queue. But even if so Zaqar will still work normally. This is strange that subscriptions can exist with deleted corresponding queues, but it's impossible to create a subscription to yet unexising queue. This patch makes queues lazy also on operations with subscriptions, so the user will be able to subscribe to yet unexisting queue. Also this patch modifies tests, to make sure Zaqar's subscriptions work both with pre-created queues and non-existing queues. APIImpact DocImpact Implements: blueprint lazy-queues-in-subscriptions Change-Id: I814b503243c4e06e74acc6b709bda4269df889e9 --- ...ues-in-subscriptions-6bade4a1b8eca3e5.yaml | 8 +++ zaqar/api/v2/endpoints.py | 8 +-- zaqar/storage/mongodb/subscriptions.py | 3 - zaqar/storage/redis/subscriptions.py | 7 --- zaqar/tests/unit/storage/base.py | 57 +++++++++++++------ .../websocket/v2/test_subscriptions.py | 20 ++++++- zaqar/transport/wsgi/v2_0/__init__.py | 3 +- zaqar/transport/wsgi/v2_0/subscriptions.py | 10 ++-- 8 files changed, 75 insertions(+), 41 deletions(-) create mode 100644 releasenotes/notes/lazy-queues-in-subscriptions-6bade4a1b8eca3e5.yaml diff --git a/releasenotes/notes/lazy-queues-in-subscriptions-6bade4a1b8eca3e5.yaml b/releasenotes/notes/lazy-queues-in-subscriptions-6bade4a1b8eca3e5.yaml new file mode 100644 index 000000000..2d19cec12 --- /dev/null +++ b/releasenotes/notes/lazy-queues-in-subscriptions-6bade4a1b8eca3e5.yaml @@ -0,0 +1,8 @@ +--- +features: + - Queues now behave lazy in subscriptions also. So there is no need for + the user to pre-create a queue before creating a subscription for this + queue. Zaqar will create the queue automatically on the subscription + creation request. As before, all subscriptions will continue to stay + active even if the corresponding queue was deleted. + diff --git a/zaqar/api/v2/endpoints.py b/zaqar/api/v2/endpoints.py index 22f966247..547b97dd5 100644 --- a/zaqar/api/v2/endpoints.py +++ b/zaqar/api/v2/endpoints.py @@ -809,6 +809,9 @@ class Endpoints(object): 'options': req._body.get('options'), 'ttl': req._body.get('ttl')} self._validate.subscription_posting(data) + self._validate.queue_identification(queue_name, project_id) + if not self._queue_controller.exists(queue_name, project_id): + self._queue_controller.create(queue_name, project=project_id) created = self._subscription_controller.create(queue_name, subscriber, data['ttl'], @@ -818,11 +821,6 @@ class Endpoints(object): LOG.debug(ex) headers = {'status': 400} return api_utils.error_response(req, ex, headers) - except storage_errors.DoesNotExist as ex: - LOG.debug(ex) - error = _('Queue %s does not exist.') % queue_name - headers = {'status': 404} - return api_utils.error_response(req, ex, headers, error) except storage_errors.ExceptionBase as ex: LOG.exception(ex) error = _('Subscription %s could not be created.') % queue_name diff --git a/zaqar/storage/mongodb/subscriptions.py b/zaqar/storage/mongodb/subscriptions.py index 66447eaf1..447cdf654 100644 --- a/zaqar/storage/mongodb/subscriptions.py +++ b/zaqar/storage/mongodb/subscriptions.py @@ -53,7 +53,6 @@ class SubscriptionController(base.Subscription): def __init__(self, *args, **kwargs): super(SubscriptionController, self).__init__(*args, **kwargs) self._collection = self.driver.subscriptions_database.subscriptions - self._queue_ctrl = self.driver.queue_controller self._collection.ensure_index(SUBSCRIPTIONS_INDEX, unique=True) # NOTE(flwang): MongoDB will automatically delete the subscription # from the subscriptions collection when the subscription's 'e' value @@ -109,8 +108,6 @@ class SubscriptionController(base.Subscription): now_dt = datetime.datetime.utcfromtimestamp(now) expires = now_dt + datetime.timedelta(seconds=ttl) - if not self._queue_ctrl.exists(source, project): - raise errors.QueueDoesNotExist(source, project) try: subscription_id = self._collection.insert({'s': source, 'u': subscriber, diff --git a/zaqar/storage/redis/subscriptions.py b/zaqar/storage/redis/subscriptions.py index c8dd46771..36ae3e9f3 100644 --- a/zaqar/storage/redis/subscriptions.py +++ b/zaqar/storage/redis/subscriptions.py @@ -19,7 +19,6 @@ import msgpack from oslo_utils import timeutils import redis -from zaqar.common import decorators from zaqar.common import utils as common_utils from zaqar.storage import base from zaqar.storage import errors @@ -53,10 +52,6 @@ class SubscriptionController(base.Subscription): use_bin_type=True).pack self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8') - @decorators.lazy_property(write=False) - def _queue_ctrl(self): - return self.driver.queue_controller - @utils.raises_conn_error @utils.retries_on_connection_error def list(self, queue, project=None, marker=None, limit=10): @@ -117,8 +112,6 @@ class SubscriptionController(base.Subscription): 'o': self._packer(options), 'p': project} - if not self._queue_ctrl.exists(queue, project): - raise errors.QueueDoesNotExist(queue, project) try: # Pipeline ensures atomic inserts. with self._client.pipeline() as pipe: diff --git a/zaqar/tests/unit/storage/base.py b/zaqar/tests/unit/storage/base.py index 90028baf8..9d8b3ce13 100644 --- a/zaqar/tests/unit/storage/base.py +++ b/zaqar/tests/unit/storage/base.py @@ -964,6 +964,7 @@ class ClaimControllerTest(ControllerBaseTest): project=self.project) +@ddt.ddt class SubscriptionControllerTest(ControllerBaseTest): """Subscriptions Controller base tests. @@ -974,10 +975,7 @@ class SubscriptionControllerTest(ControllerBaseTest): def setUp(self): super(SubscriptionControllerTest, self).setUp() self.subscription_controller = self.driver.subscription_controller - - # Lets create a queue as the source of subscription self.queue_controller = self.driver.queue_controller - self.queue_controller.create(self.queue_name, project=self.project) self.source = self.queue_name self.subscriber = 'http://trigger.me' @@ -988,7 +986,16 @@ class SubscriptionControllerTest(ControllerBaseTest): self.queue_controller.delete(self.queue_name, project=self.project) super(SubscriptionControllerTest, self).tearDown() - def test_list(self): + # NOTE(Eva-i): this method helps to test cases when the queue is + # pre-created and when it's not. + def _precreate_queue(self, precreate_queue): + if precreate_queue: + # Let's create a queue as the source of subscription + self.queue_controller.create(self.queue_name, project=self.project) + + @ddt.data(True, False) + def test_list(self, precreate_queue): + self._precreate_queue(precreate_queue) for s in six.moves.xrange(15): subscriber = 'http://fake_{0}'.format(s) s_id = self.subscription_controller.create( @@ -1019,14 +1026,18 @@ class SubscriptionControllerTest(ControllerBaseTest): subscriptions))) self.assertEqual(5, len(subscriptions)) - def test_get_raises_if_subscription_does_not_exist(self): + @ddt.data(True, False) + def test_get_raises_if_subscription_does_not_exist(self, precreate_queue): + self._precreate_queue(precreate_queue) self.assertRaises(errors.SubscriptionDoesNotExist, self.subscription_controller.get, self.queue_name, 'notexists', project=self.project) - def test_lifecycle(self): + @ddt.data(True, False) + def test_lifecycle(self, precreate_queue): + self._precreate_queue(precreate_queue) s_id = self.subscription_controller.create(self.source, self.subscriber, self.ttl, @@ -1068,7 +1079,9 @@ class SubscriptionControllerTest(ControllerBaseTest): self.subscription_controller.get, self.queue_name, s_id) - def test_create_existed(self): + @ddt.data(True, False) + def test_create_existed(self, precreate_queue): + self._precreate_queue(precreate_queue) s_id = self.subscription_controller.create( self.source, self.subscriber, @@ -1087,15 +1100,23 @@ class SubscriptionControllerTest(ControllerBaseTest): self.assertIsNone(s_id) def test_nonexist_source(self): - self.assertRaises(errors.QueueDoesNotExist, - self.subscription_controller.create, - 'fake_queue_name', - self.subscriber, - self.ttl, - self.options, - self.project) + try: + s_id = self.subscription_controller.create('fake_queue_name', + self.subscriber, + self.ttl, + self.options, + self.project) + except Exception: + self.fail("Subscription controller should not raise an exception " + "in case of non-existing queue.") + self.addCleanup(self.subscription_controller.delete, self.source, s_id, + self.project) - def test_update_raises_if_try_to_update_to_existing_subscription(self): + @ddt.data(True, False) + def test_update_raises_if_try_to_update_to_existing_subscription( + self, + precreate_queue): + self._precreate_queue(precreate_queue) # create two subscriptions: fake_0 and fake_1 ids = [] for s in six.moves.xrange(2): @@ -1125,7 +1146,10 @@ class SubscriptionControllerTest(ControllerBaseTest): project=self.project, **update_fields) - def test_update_raises_if_subscription_does_not_exist(self): + @ddt.data(True, False) + def test_update_raises_if_subscription_does_not_exist(self, + precreate_queue): + self._precreate_queue(precreate_queue) update_fields = { 'subscriber': 'http://fake' } @@ -1612,7 +1636,6 @@ class FlavorsControllerTest(ControllerBaseTest): def _insert_fixtures(controller, queue_name, project=None, client_uuid=None, num=4, ttl=120): - def messages(): for n in six.moves.xrange(num): yield { diff --git a/zaqar/tests/unit/transport/websocket/v2/test_subscriptions.py b/zaqar/tests/unit/transport/websocket/v2/test_subscriptions.py index 2a460558c..1a438fc3a 100644 --- a/zaqar/tests/unit/transport/websocket/v2/test_subscriptions.py +++ b/zaqar/tests/unit/transport/websocket/v2/test_subscriptions.py @@ -155,10 +155,24 @@ class SubscriptionTest(base.V1_1Base): req = test_utils.create_request(action, body, self.headers) self.protocol.onMessage(req, False) + [subscriber] = list( + next( + self.boot.storage.subscription_controller.list( + 'shuffle', self.project_id))) + self.addCleanup( + self.boot.storage.subscription_controller.delete, 'shuffle', + subscriber['id'], project=self.project_id) + + response = { + 'body': {'message': 'Subscription shuffle created.', + 'subscription_id': subscriber['id']}, + 'headers': {'status': 201}, + 'request': {'action': 'subscription_create', + 'body': {'queue_name': 'shuffle', 'ttl': 600}, + 'api': 'v2', 'headers': self.headers}} + self.assertEqual(1, sender.call_count) - self.assertEqual( - 'Queue shuffle does not exist.', - json.loads(sender.call_args[0][0])['body']['error']) + self.assertEqual(response, json.loads(sender.call_args[0][0])) def test_subscription_get(self): sub = self.boot.storage.subscription_controller.create( diff --git a/zaqar/transport/wsgi/v2_0/__init__.py b/zaqar/transport/wsgi/v2_0/__init__.py index 3445ba216..f5c3eec07 100644 --- a/zaqar/transport/wsgi/v2_0/__init__.py +++ b/zaqar/transport/wsgi/v2_0/__init__.py @@ -100,7 +100,8 @@ def public_endpoints(driver, conf): ('/queues/{queue_name}/subscriptions', subscriptions.CollectionResource(driver._validate, subscription_controller, - defaults.subscription_ttl)), + defaults.subscription_ttl, + queue_controller)), ('/queues/{queue_name}/subscriptions/{subscription_id}', subscriptions.ItemResource(driver._validate, diff --git a/zaqar/transport/wsgi/v2_0/subscriptions.py b/zaqar/transport/wsgi/v2_0/subscriptions.py index 9b4c1ece6..0510a36a3 100644 --- a/zaqar/transport/wsgi/v2_0/subscriptions.py +++ b/zaqar/transport/wsgi/v2_0/subscriptions.py @@ -109,13 +109,14 @@ class ItemResource(object): class CollectionResource(object): __slots__ = ('_subscription_controller', '_validate', - '_default_subscription_ttl') + '_default_subscription_ttl', '_queue_controller') def __init__(self, validate, subscription_controller, - default_subscription_ttl): + default_subscription_ttl, queue_controller): self._subscription_controller = subscription_controller self._validate = validate self._default_subscription_ttl = default_subscription_ttl + self._queue_controller = queue_controller @decorators.TransportLog("Subscription collection") @acl.enforce("subscription:get_all") @@ -171,6 +172,8 @@ class CollectionResource(object): document = {} try: + if not self._queue_controller.exists(queue_name, project_id): + self._queue_controller.create(queue_name, project=project_id) self._validate.subscription_posting(document) subscriber = document['subscriber'] ttl = document.get('ttl', self._default_subscription_ttl) @@ -181,9 +184,6 @@ class CollectionResource(object): options, project=project_id) - except storage_errors.QueueDoesNotExist as ex: - LOG.exception(ex) - raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) except validation.ValidationFailed as ex: LOG.debug(ex) raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))