From 5bd31315c2d1ff492758c5717dd96ce237d823dd Mon Sep 17 00:00:00 2001 From: Mark McLoughlin <markmc@redhat.com> Date: Mon, 3 Mar 2014 07:35:29 -0800 Subject: [PATCH] notification listener: add allow_requeue param In commit d8d2ad9 we added support for notification listener endpoint methods to return REQUEUE, but if a driver does not support this we raise NotImplementedError when the application attempts to requeue a message. This requeuing behaviour might only be used by an application in unusual, exceptional circumstances and catch users by surprise. Instead, let's require the application to assert that it needs this feature in advance and raise NotImplementError at that point if the driver doesn't support it. Change-Id: Id0bb0e57d2dcc1ec7d752e98c9b1e8e48d99f35c --- oslo/messaging/_drivers/base.py | 5 ++++ oslo/messaging/_drivers/impl_fake.py | 3 +++ oslo/messaging/_drivers/impl_qpid.py | 3 +-- oslo/messaging/_drivers/impl_rabbit.py | 3 +++ oslo/messaging/_drivers/impl_zmq.py | 3 +-- oslo/messaging/notify/dispatcher.py | 5 ++-- oslo/messaging/notify/listener.py | 36 +++++++++++++------------- oslo/messaging/transport.py | 3 +++ tests/test_notify_dispatcher.py | 10 +++---- tests/test_notify_listener.py | 2 +- 10 files changed, 42 insertions(+), 31 deletions(-) diff --git a/oslo/messaging/_drivers/base.py b/oslo/messaging/_drivers/base.py index dcf08287e..82b36412e 100644 --- a/oslo/messaging/_drivers/base.py +++ b/oslo/messaging/_drivers/base.py @@ -67,6 +67,11 @@ class BaseDriver(object): self._default_exchange = default_exchange self._allowed_remote_exmods = allowed_remote_exmods + def require_features(self, requeue=False): + if requeue: + raise NotImplementedError('Message requeueing not supported by ' + 'this transport driver') + @abc.abstractmethod def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, envelope=False): diff --git a/oslo/messaging/_drivers/impl_fake.py b/oslo/messaging/_drivers/impl_fake.py index 2bfbe16b3..564dc21ce 100644 --- a/oslo/messaging/_drivers/impl_fake.py +++ b/oslo/messaging/_drivers/impl_fake.py @@ -112,6 +112,9 @@ class FakeDriver(base.BaseDriver): self._exchanges_lock = threading.Lock() self._exchanges = {} + def require_features(self, requeue=True): + pass + @staticmethod def _check_serialize(message): """Make sure a message intended for rpc can be serialized. diff --git a/oslo/messaging/_drivers/impl_qpid.py b/oslo/messaging/_drivers/impl_qpid.py index c53ad8946..f9b275f70 100644 --- a/oslo/messaging/_drivers/impl_qpid.py +++ b/oslo/messaging/_drivers/impl_qpid.py @@ -100,8 +100,7 @@ class QpidMessage(dict): self._session.acknowledge(self._raw_message) def requeue(self): - raise NotImplementedError('The QPID driver does not yet support ' - 'requeuing messages') + pass class ConsumerBase(object): diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py index be41a4ec1..4df6964b5 100644 --- a/oslo/messaging/_drivers/impl_rabbit.py +++ b/oslo/messaging/_drivers/impl_rabbit.py @@ -744,3 +744,6 @@ class RabbitDriver(amqpdriver.AMQPDriverBase): connection_pool, default_exchange, allowed_remote_exmods) + + def require_features(self, requeue=True): + pass diff --git a/oslo/messaging/_drivers/impl_zmq.py b/oslo/messaging/_drivers/impl_zmq.py index 6b2b53957..af5b612b2 100644 --- a/oslo/messaging/_drivers/impl_zmq.py +++ b/oslo/messaging/_drivers/impl_zmq.py @@ -844,8 +844,7 @@ class ZmqIncomingMessage(base.IncomingMessage): self.condition.notify() def requeue(self): - raise NotImplementedError('The ZeroMQ driver does not yet support ' - 'requeuing messages') + pass class ZmqListener(base.Listener): diff --git a/oslo/messaging/notify/dispatcher.py b/oslo/messaging/notify/dispatcher.py index 87deee854..ca8a2f01e 100644 --- a/oslo/messaging/notify/dispatcher.py +++ b/oslo/messaging/notify/dispatcher.py @@ -44,10 +44,11 @@ class NotificationDispatcher(object): message to the endpoints """ - def __init__(self, targets, endpoints, serializer): + def __init__(self, targets, endpoints, serializer, allow_requeue): self.targets = targets self.endpoints = endpoints self.serializer = serializer or msg_serializer.NoOpSerializer() + self.allow_requeue = allow_requeue self._callbacks_by_priority = {} for endpoint, prio in itertools.product(endpoints, PRIORITIES): @@ -114,7 +115,7 @@ class NotificationDispatcher(object): try: ret = callback(ctxt, publisher_id, event_type, payload) ret = NotificationResult.HANDLED if ret is None else ret - if ret != NotificationResult.HANDLED: + if self.allow_requeue and ret == NotificationResult.REQUEUE: return ret finally: localcontext.clear_local_context() diff --git a/oslo/messaging/notify/listener.py b/oslo/messaging/notify/listener.py index 06a2e24ce..c6c17df8b 100644 --- a/oslo/messaging/notify/listener.py +++ b/oslo/messaging/notify/listener.py @@ -73,26 +73,20 @@ priority Parameters to endpoint methods are the request context supplied by the client, the publisher_id of the notification message, the event_type, the payload. -An endpoint method can return explicitly messaging.NotificationResult.HANDLED +By supplying a serializer object, a listener can deserialize a request context +and arguments from - and serialize return values to - primitive types. + +An endpoint method can explicitly return messaging.NotificationResult.HANDLED to acknowledge a message or messaging.NotificationResult.REQUEUE to requeue the message. -The message is acknowledge only if all endpoints return -messaging.NotificationResult.HANDLED +The message is acknowledged only if all endpoints either return +messaging.NotificationResult.HANDLED or None. -If nothing is returned by an endpoint, this is considered like -messaging.NotificationResult.HANDLED - -messaging.NotificationResult values needs to be handled by drivers: - -* HANDLED: supported by all drivers -* REQUEUE: supported by drivers: fake://, rabbit:// - -In case of an unsupported driver nothing is done to the message and a -NotImplementedError is raised and logged. - -By supplying a serializer object, a listener can deserialize a request context -and arguments from - and serialize return values to - primitive types. +Note that not all transport drivers implement support for requeueing. In order +to use this feature, applications should assert that the feature is available +by passing allow_requeue=True to get_notification_listener(). If the driver +does not support requeueing, it will raise NotImplementedError at this point. """ from oslo.messaging.notify import dispatcher as notify_dispatcher @@ -100,7 +94,8 @@ from oslo.messaging import server as msg_server def get_notification_listener(transport, targets, endpoints, - executor='blocking', serializer=None): + executor='blocking', serializer=None, + allow_requeue=False): """Construct a notification listener The executor parameter controls how incoming messages will be received and @@ -117,7 +112,12 @@ def get_notification_listener(transport, targets, endpoints, :type executor: str :param serializer: an optional entity serializer :type serializer: Serializer + :param allow_requeue: whether NotificationResult.REQUEUE support is needed + :type allow_requeue: bool + :raises: NotImplementedError """ + transport._require_driver_features(requeue=allow_requeue) dispatcher = notify_dispatcher.NotificationDispatcher(targets, endpoints, - serializer) + serializer, + allow_requeue) return msg_server.MessageHandlingServer(transport, dispatcher, executor) diff --git a/oslo/messaging/transport.py b/oslo/messaging/transport.py index 7c8a3be97..16e6f49bd 100644 --- a/oslo/messaging/transport.py +++ b/oslo/messaging/transport.py @@ -78,6 +78,9 @@ class Transport(object): self.conf = driver.conf self._driver = driver + def _require_driver_features(self, requeue=False): + self._driver.require_features(requeue=requeue) + def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None): if not target.topic: raise exceptions.InvalidTarget('A topic is required to send', diff --git a/tests/test_notify_dispatcher.py b/tests/test_notify_dispatcher.py index 3e2c966a1..7b3531374 100644 --- a/tests/test_notify_dispatcher.py +++ b/tests/test_notify_dispatcher.py @@ -97,9 +97,8 @@ class TestDispatcher(test_utils.BaseTestCase): msg['priority'] = self.priority targets = [messaging.Target(topic='notifications')] - dispatcher = notify_dispatcher.NotificationDispatcher(targets, - endpoints, - None) + dispatcher = notify_dispatcher.NotificationDispatcher( + targets, endpoints, None, allow_requeue=True) # check it listen on wanted topics self.assertEqual(sorted(dispatcher._targets_priorities), @@ -138,9 +137,8 @@ class TestDispatcher(test_utils.BaseTestCase): def test_dispatcher_unknown_prio(self, mylog): msg = notification_msg.copy() msg['priority'] = 'what???' - dispatcher = notify_dispatcher.NotificationDispatcher([mock.Mock()], - [mock.Mock()], - None) + dispatcher = notify_dispatcher.NotificationDispatcher( + [mock.Mock()], [mock.Mock()], None, allow_requeue=True) with dispatcher(mock.Mock(ctxt={}, message=msg)) as callback: callback() mylog.warning.assert_called_once_with('Unknown priority "what???"') diff --git a/tests/test_notify_listener.py b/tests/test_notify_listener.py index 3b35a3bb1..0cf5f5405 100644 --- a/tests/test_notify_listener.py +++ b/tests/test_notify_listener.py @@ -35,7 +35,7 @@ class ListenerSetupMixin(object): self._expect_messages = expect_messages self._received_msgs = 0 self._listener = messaging.get_notification_listener( - transport, targets, endpoints + [self]) + transport, targets, endpoints + [self], allow_requeue=True) def info(self, ctxt, publisher_id, event_type, payload): self._received_msgs += 1