Move server related logic from dispatchers
Dispatcher should be responsible for routing message to the callback method of endpoint object and returning result back to the server only. But now it is also responsible for sending reply, ack/reque messages etc. Also this patch makes small improvements: 1) Notification dispatcher now requeue message if endpoint raises exception 2) unstable behaviour of test_mask_passwords test is fixed Change-Id: I5f23e23644e90919cb67f81fc306ee85c5e09974
This commit is contained in:
parent
3802dd5e4b
commit
990d894eaf
@ -192,8 +192,11 @@ class PikaDriver(base.BaseDriver):
|
||||
# exchange which is not exists, we get ChannelClosed exception
|
||||
# and need to reconnect
|
||||
try:
|
||||
self._declare_rpc_exchange(exchange,
|
||||
expiration_time - time.time())
|
||||
self._declare_rpc_exchange(
|
||||
exchange,
|
||||
None if expiration_time is None else
|
||||
expiration_time - time.time()
|
||||
)
|
||||
except pika_drv_exc.ConnectionException as e:
|
||||
LOG.warning("Problem during declaring exchange. %s", e)
|
||||
return True
|
||||
|
@ -16,96 +16,22 @@ import logging
|
||||
|
||||
import six
|
||||
|
||||
from oslo_messaging._i18n import _
|
||||
|
||||
|
||||
__all__ = [
|
||||
"DispatcherBase",
|
||||
"DispatcherExecutorContext"
|
||||
"DispatcherBase"
|
||||
]
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DispatcherExecutorContext(object):
|
||||
"""Dispatcher executor context helper
|
||||
|
||||
A dispatcher can have work to do before and after the dispatch of the
|
||||
request in the main server thread while the dispatcher itself can be
|
||||
done in its own thread.
|
||||
|
||||
The executor can use the helper like this:
|
||||
|
||||
callback = dispatcher(incoming)
|
||||
callback.prepare()
|
||||
thread = MyWhateverThread()
|
||||
thread.on_done(callback.done)
|
||||
thread.run(callback.run)
|
||||
|
||||
"""
|
||||
def __init__(self, incoming, dispatch, post=None):
|
||||
self._result = None
|
||||
self._incoming = incoming
|
||||
self._dispatch = dispatch
|
||||
self._post = post
|
||||
|
||||
def run(self):
|
||||
"""The incoming message dispath itself
|
||||
|
||||
Can be run in an other thread/greenlet/corotine if the executor is
|
||||
able to do it.
|
||||
"""
|
||||
try:
|
||||
self._result = self._dispatch(self._incoming)
|
||||
except Exception:
|
||||
msg = _('The dispatcher method must catches all exceptions')
|
||||
LOG.exception(msg)
|
||||
raise RuntimeError(msg)
|
||||
|
||||
def done(self):
|
||||
"""Callback after the incoming message have been dispathed
|
||||
|
||||
Should be ran in the main executor thread/greenlet/corotine
|
||||
"""
|
||||
# FIXME(sileht): this is not currently true, this works only because
|
||||
# the driver connection used for polling write on the wire only to
|
||||
# ack/requeue message, but what if one day, the driver do something
|
||||
# else
|
||||
if self._post is not None:
|
||||
self._post(self._incoming, self._result)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class DispatcherBase(object):
|
||||
"Base class for dispatcher"
|
||||
|
||||
batch_size = 1
|
||||
"Number of messages to wait before calling endpoints callacks"
|
||||
|
||||
batch_timeout = None
|
||||
"Number of seconds to wait before calling endpoints callacks"
|
||||
|
||||
@abc.abstractmethod
|
||||
def _listen(self, transport):
|
||||
"""Initiate the driver Listener
|
||||
def dispatch(self, incoming):
|
||||
"""Dispatch incoming messages to the endpoints and return result
|
||||
|
||||
Usually the driver Listener is start with the transport helper methods:
|
||||
|
||||
* transport._listen()
|
||||
* transport._listen_for_notifications()
|
||||
|
||||
:param transport: the transport object
|
||||
:type transport: oslo_messaging.transport.Transport
|
||||
:returns: a driver Listener object
|
||||
:rtype: oslo_messaging._drivers.base.Listener
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def __call__(self, incoming):
|
||||
"""Called by the executor to get the DispatcherExecutorContext
|
||||
|
||||
:param incoming: list of messages
|
||||
:type incoming: oslo_messging._drivers.base.IncomingMessage
|
||||
:returns: DispatcherExecutorContext
|
||||
:rtype: DispatcherExecutorContext
|
||||
:param incoming: incoming object for dispatching to the endpoint
|
||||
:type incoming: object, depends on endpoint type
|
||||
"""
|
||||
|
@ -19,7 +19,7 @@ import logging
|
||||
|
||||
import six
|
||||
|
||||
from oslo_messaging._i18n import _LE, _LW
|
||||
from oslo_messaging._i18n import _LW
|
||||
from oslo_messaging import dispatcher
|
||||
from oslo_messaging import localcontext
|
||||
from oslo_messaging import serializer as msg_serializer
|
||||
@ -35,14 +35,11 @@ class NotificationResult(object):
|
||||
REQUEUE = 'requeue'
|
||||
|
||||
|
||||
class _NotificationDispatcherBase(dispatcher.DispatcherBase):
|
||||
def __init__(self, targets, endpoints, serializer, allow_requeue,
|
||||
pool=None):
|
||||
self.targets = targets
|
||||
class NotificationDispatcher(dispatcher.DispatcherBase):
|
||||
def __init__(self, endpoints, serializer):
|
||||
|
||||
self.endpoints = endpoints
|
||||
self.serializer = serializer or msg_serializer.NoOpSerializer()
|
||||
self.allow_requeue = allow_requeue
|
||||
self.pool = pool
|
||||
|
||||
self._callbacks_by_priority = {}
|
||||
for endpoint, prio in itertools.product(endpoints, PRIORITIES):
|
||||
@ -52,42 +49,77 @@ class _NotificationDispatcherBase(dispatcher.DispatcherBase):
|
||||
self._callbacks_by_priority.setdefault(prio, []).append(
|
||||
(screen, method))
|
||||
|
||||
priorities = self._callbacks_by_priority.keys()
|
||||
self._targets_priorities = set(itertools.product(self.targets,
|
||||
priorities))
|
||||
@property
|
||||
def supported_priorities(self):
|
||||
return self._callbacks_by_priority.keys()
|
||||
|
||||
def _listen(self, transport):
|
||||
transport._require_driver_features(requeue=self.allow_requeue)
|
||||
return transport._listen_for_notifications(self._targets_priorities,
|
||||
pool=self.pool)
|
||||
|
||||
def __call__(self, incoming):
|
||||
return dispatcher.DispatcherExecutorContext(
|
||||
incoming, self._dispatch_and_handle_error,
|
||||
post=self._post_dispatch)
|
||||
|
||||
def _post_dispatch(self, incoming, requeues):
|
||||
for m in incoming:
|
||||
try:
|
||||
if requeues and m in requeues:
|
||||
m.requeue()
|
||||
else:
|
||||
m.acknowledge()
|
||||
except Exception:
|
||||
LOG.error(_LE("Fail to ack/requeue message"), exc_info=True)
|
||||
|
||||
def _dispatch_and_handle_error(self, incoming):
|
||||
"""Dispatch a notification message to the appropriate endpoint method.
|
||||
|
||||
:param incoming: the incoming notification message
|
||||
:type ctxt: IncomingMessage
|
||||
def dispatch(self, incoming):
|
||||
"""Dispatch notification messages to the appropriate endpoint method.
|
||||
"""
|
||||
try:
|
||||
return self._dispatch(incoming)
|
||||
except Exception:
|
||||
LOG.error(_LE('Exception during message handling'), exc_info=True)
|
||||
priority, raw_message, message = self._extract_user_message(incoming)
|
||||
|
||||
def _dispatch(self, incoming):
|
||||
if priority not in PRIORITIES:
|
||||
LOG.warning(_LW('Unknown priority "%s"'), priority)
|
||||
return
|
||||
|
||||
for screen, callback in self._callbacks_by_priority.get(priority,
|
||||
[]):
|
||||
if screen and not screen.match(message["ctxt"],
|
||||
message["publisher_id"],
|
||||
message["event_type"],
|
||||
message["metadata"],
|
||||
message["payload"]):
|
||||
continue
|
||||
|
||||
ret = self._exec_callback(callback, message)
|
||||
if ret == NotificationResult.REQUEUE:
|
||||
return ret
|
||||
return NotificationResult.HANDLED
|
||||
|
||||
def _exec_callback(self, callback, message):
|
||||
localcontext._set_local_context(message["ctxt"])
|
||||
|
||||
try:
|
||||
return callback(message["ctxt"],
|
||||
message["publisher_id"],
|
||||
message["event_type"],
|
||||
message["payload"],
|
||||
message["metadata"])
|
||||
except Exception:
|
||||
LOG.exception("Callback raised an exception.")
|
||||
return NotificationResult.REQUEUE
|
||||
finally:
|
||||
localcontext._clear_local_context()
|
||||
|
||||
def _extract_user_message(self, incoming):
|
||||
ctxt = self.serializer.deserialize_context(incoming.ctxt)
|
||||
message = incoming.message
|
||||
|
||||
publisher_id = message.get('publisher_id')
|
||||
event_type = message.get('event_type')
|
||||
metadata = {
|
||||
'message_id': message.get('message_id'),
|
||||
'timestamp': message.get('timestamp')
|
||||
}
|
||||
priority = message.get('priority', '').lower()
|
||||
payload = self.serializer.deserialize_entity(ctxt,
|
||||
message.get('payload'))
|
||||
return priority, incoming, dict(ctxt=ctxt,
|
||||
publisher_id=publisher_id,
|
||||
event_type=event_type,
|
||||
payload=payload,
|
||||
metadata=metadata)
|
||||
|
||||
|
||||
class BatchNotificationDispatcher(NotificationDispatcher):
|
||||
"""A message dispatcher which understands Notification messages.
|
||||
|
||||
A MessageHandlingServer is constructed by passing a callable dispatcher
|
||||
which is invoked with a list of message dictionaries each time 'batch_size'
|
||||
messages are received or 'batch_timeout' seconds is reached.
|
||||
"""
|
||||
|
||||
def dispatch(self, incoming):
|
||||
"""Dispatch notification messages to the appropriate endpoint method.
|
||||
"""
|
||||
|
||||
@ -120,70 +152,14 @@ class _NotificationDispatcherBase(dispatcher.DispatcherBase):
|
||||
continue
|
||||
|
||||
ret = self._exec_callback(callback, filtered_messages)
|
||||
if self.allow_requeue and ret == NotificationResult.REQUEUE:
|
||||
if ret == NotificationResult.REQUEUE:
|
||||
requeues.update(raw_messages)
|
||||
break
|
||||
return requeues
|
||||
|
||||
def _exec_callback(self, callback, *args):
|
||||
ret = callback(*args)
|
||||
return NotificationResult.HANDLED if ret is None else ret
|
||||
|
||||
def _extract_user_message(self, incoming):
|
||||
ctxt = self.serializer.deserialize_context(incoming.ctxt)
|
||||
message = incoming.message
|
||||
|
||||
publisher_id = message.get('publisher_id')
|
||||
event_type = message.get('event_type')
|
||||
metadata = {
|
||||
'message_id': message.get('message_id'),
|
||||
'timestamp': message.get('timestamp')
|
||||
}
|
||||
priority = message.get('priority', '').lower()
|
||||
payload = self.serializer.deserialize_entity(ctxt,
|
||||
message.get('payload'))
|
||||
return priority, incoming, dict(ctxt=ctxt,
|
||||
publisher_id=publisher_id,
|
||||
event_type=event_type,
|
||||
payload=payload,
|
||||
metadata=metadata)
|
||||
|
||||
|
||||
class NotificationDispatcher(_NotificationDispatcherBase):
|
||||
"""A message dispatcher which understands Notification messages.
|
||||
|
||||
A MessageHandlingServer is constructed by passing a callable dispatcher
|
||||
which is invoked with context and message dictionaries each time a message
|
||||
is received.
|
||||
"""
|
||||
def _exec_callback(self, callback, messages):
|
||||
localcontext._set_local_context(
|
||||
messages[0]["ctxt"])
|
||||
try:
|
||||
return super(NotificationDispatcher, self)._exec_callback(
|
||||
callback,
|
||||
messages[0]["ctxt"],
|
||||
messages[0]["publisher_id"],
|
||||
messages[0]["event_type"],
|
||||
messages[0]["payload"],
|
||||
messages[0]["metadata"])
|
||||
finally:
|
||||
localcontext._clear_local_context()
|
||||
|
||||
|
||||
class BatchNotificationDispatcher(_NotificationDispatcherBase):
|
||||
"""A message dispatcher which understands Notification messages.
|
||||
|
||||
A MessageHandlingServer is constructed by passing a callable dispatcher
|
||||
which is invoked with a list of message dictionaries each time 'batch_size'
|
||||
messages are received or 'batch_timeout' seconds is reached.
|
||||
"""
|
||||
|
||||
def __init__(self, targets, endpoints, serializer, allow_requeue,
|
||||
pool=None, batch_size=None, batch_timeout=None):
|
||||
super(BatchNotificationDispatcher, self).__init__(targets, endpoints,
|
||||
serializer,
|
||||
allow_requeue,
|
||||
pool)
|
||||
self.batch_size = batch_size
|
||||
self.batch_timeout = batch_timeout
|
||||
return callback(messages)
|
||||
except Exception:
|
||||
LOG.exception("Callback raised an exception.")
|
||||
return NotificationResult.REQUEUE
|
||||
|
@ -103,10 +103,89 @@ by passing allow_requeue=True to get_notification_listener(). If the driver
|
||||
does not support requeueing, it will raise NotImplementedError at this point.
|
||||
|
||||
"""
|
||||
import itertools
|
||||
import logging
|
||||
|
||||
from oslo_messaging._i18n import _LE
|
||||
from oslo_messaging.notify import dispatcher as notify_dispatcher
|
||||
from oslo_messaging import server as msg_server
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NotificationServer(msg_server.MessageHandlingServer):
|
||||
def __init__(self, transport, targets, dispatcher, executor='blocking',
|
||||
allow_requeue=True, pool=None):
|
||||
super(NotificationServer, self).__init__(transport, dispatcher,
|
||||
executor)
|
||||
self._allow_requeue = allow_requeue
|
||||
self._pool = pool
|
||||
self.targets = targets
|
||||
self._targets_priorities = set(
|
||||
itertools.product(self.targets,
|
||||
self.dispatcher.supported_priorities)
|
||||
)
|
||||
|
||||
def _create_listener(self):
|
||||
return msg_server.SingleMessageListenerAdapter(
|
||||
self.transport._listen_for_notifications(
|
||||
self._targets_priorities, self._pool
|
||||
)
|
||||
)
|
||||
|
||||
def _process_incoming(self, incoming):
|
||||
res = notify_dispatcher.NotificationResult.REQUEUE
|
||||
try:
|
||||
res = self.dispatcher.dispatch(incoming)
|
||||
except Exception:
|
||||
LOG.error(_LE('Exception during message handling'), exc_info=True)
|
||||
|
||||
try:
|
||||
if (res == notify_dispatcher.NotificationResult.REQUEUE and
|
||||
self._allow_requeue):
|
||||
incoming.requeue()
|
||||
else:
|
||||
incoming.acknowledge()
|
||||
except Exception:
|
||||
LOG.error(_LE("Fail to ack/requeue message"), exc_info=True)
|
||||
|
||||
|
||||
class BatchNotificationServer(NotificationServer):
|
||||
def __init__(self, transport, targets, dispatcher, executor='blocking',
|
||||
allow_requeue=True, pool=None, batch_size=1,
|
||||
batch_timeout=None):
|
||||
super(BatchNotificationServer, self).__init__(
|
||||
transport=transport, targets=targets, dispatcher=dispatcher,
|
||||
executor=executor, allow_requeue=allow_requeue, pool=pool
|
||||
)
|
||||
|
||||
self._batch_size = batch_size
|
||||
self._batch_timeout = batch_timeout
|
||||
|
||||
def _create_listener(self):
|
||||
return msg_server.BatchMessageListenerAdapter(
|
||||
self.transport._listen_for_notifications(
|
||||
self._targets_priorities, self._pool
|
||||
),
|
||||
timeout=self._batch_timeout,
|
||||
batch_size=self._batch_size
|
||||
)
|
||||
|
||||
def _process_incoming(self, incoming):
|
||||
try:
|
||||
not_processed_messages = self.dispatcher.dispatch(incoming)
|
||||
except Exception:
|
||||
not_processed_messages = set(incoming)
|
||||
LOG.error(_LE('Exception during message handling'), exc_info=True)
|
||||
for m in incoming:
|
||||
try:
|
||||
if m in not_processed_messages and self._allow_requeue:
|
||||
m.requeue()
|
||||
else:
|
||||
m.acknowledge()
|
||||
except Exception:
|
||||
LOG.error(_LE("Fail to ack/requeue message"), exc_info=True)
|
||||
|
||||
|
||||
def get_notification_listener(transport, targets, endpoints,
|
||||
executor='blocking', serializer=None,
|
||||
@ -137,10 +216,10 @@ def get_notification_listener(transport, targets, endpoints,
|
||||
:type pool: str
|
||||
:raises: NotImplementedError
|
||||
"""
|
||||
dispatcher = notify_dispatcher.NotificationDispatcher(targets, endpoints,
|
||||
serializer,
|
||||
allow_requeue, pool)
|
||||
return msg_server.MessageHandlingServer(transport, dispatcher, executor)
|
||||
dispatcher = notify_dispatcher.NotificationDispatcher(endpoints,
|
||||
serializer)
|
||||
return NotificationServer(transport, targets, dispatcher, executor,
|
||||
allow_requeue, pool)
|
||||
|
||||
|
||||
def get_batch_notification_listener(transport, targets, endpoints,
|
||||
@ -180,6 +259,8 @@ def get_batch_notification_listener(transport, targets, endpoints,
|
||||
:raises: NotImplementedError
|
||||
"""
|
||||
dispatcher = notify_dispatcher.BatchNotificationDispatcher(
|
||||
targets, endpoints, serializer, allow_requeue, pool,
|
||||
batch_size, batch_timeout)
|
||||
return msg_server.MessageHandlingServer(transport, dispatcher, executor)
|
||||
endpoints, serializer)
|
||||
return BatchNotificationServer(
|
||||
transport, targets, dispatcher, executor, allow_requeue, pool,
|
||||
batch_size, batch_timeout
|
||||
)
|
||||
|
@ -29,7 +29,6 @@ import sys
|
||||
|
||||
import six
|
||||
|
||||
from oslo_messaging._i18n import _LE
|
||||
from oslo_messaging import _utils as utils
|
||||
from oslo_messaging import dispatcher
|
||||
from oslo_messaging import localcontext
|
||||
@ -94,20 +93,16 @@ class RPCDispatcher(dispatcher.DispatcherBase):
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, target, endpoints, serializer):
|
||||
def __init__(self, endpoints, serializer):
|
||||
"""Construct a rpc server dispatcher.
|
||||
|
||||
:param target: the exchange, topic and server to listen on
|
||||
:type target: Target
|
||||
:param endpoints: list of endpoint objects for dispatching to
|
||||
:param serializer: optional message serializer
|
||||
"""
|
||||
|
||||
self.endpoints = endpoints
|
||||
self.serializer = serializer or msg_serializer.NoOpSerializer()
|
||||
self._default_target = msg_target.Target()
|
||||
self._target = target
|
||||
|
||||
def _listen(self, transport):
|
||||
return transport._listen(self._target)
|
||||
|
||||
@staticmethod
|
||||
def _is_namespace(target, namespace):
|
||||
@ -127,43 +122,16 @@ class RPCDispatcher(dispatcher.DispatcherBase):
|
||||
result = func(ctxt, **new_args)
|
||||
return self.serializer.serialize_entity(ctxt, result)
|
||||
|
||||
def __call__(self, incoming):
|
||||
incoming[0].acknowledge()
|
||||
return dispatcher.DispatcherExecutorContext(
|
||||
incoming[0], self._dispatch_and_reply)
|
||||
|
||||
def _dispatch_and_reply(self, incoming):
|
||||
try:
|
||||
incoming.reply(self._dispatch(incoming.ctxt,
|
||||
incoming.message))
|
||||
except ExpectedException as e:
|
||||
LOG.debug(u'Expected exception during message handling (%s)',
|
||||
e.exc_info[1])
|
||||
incoming.reply(failure=e.exc_info, log_failure=False)
|
||||
except Exception as e:
|
||||
# current sys.exc_info() content can be overriden
|
||||
# by another exception raise by a log handler during
|
||||
# LOG.exception(). So keep a copy and delete it later.
|
||||
exc_info = sys.exc_info()
|
||||
try:
|
||||
LOG.error(_LE('Exception during message handling: %s'), e,
|
||||
exc_info=exc_info)
|
||||
incoming.reply(failure=exc_info)
|
||||
finally:
|
||||
# NOTE(dhellmann): Remove circular object reference
|
||||
# between the current stack frame and the traceback in
|
||||
# exc_info.
|
||||
del exc_info
|
||||
|
||||
def _dispatch(self, ctxt, message):
|
||||
def dispatch(self, incoming):
|
||||
"""Dispatch an RPC message to the appropriate endpoint method.
|
||||
|
||||
:param ctxt: the request context
|
||||
:type ctxt: dict
|
||||
:param message: the message payload
|
||||
:type message: dict
|
||||
:param incoming: incoming message
|
||||
:type incoming: IncomingMessage
|
||||
:raises: NoSuchMethod, UnsupportedVersion
|
||||
"""
|
||||
message = incoming.message
|
||||
ctxt = incoming.ctxt
|
||||
|
||||
method = message.get('method')
|
||||
args = message.get('args', {})
|
||||
namespace = message.get('namespace')
|
||||
|
@ -102,9 +102,53 @@ __all__ = [
|
||||
'expected_exceptions',
|
||||
]
|
||||
|
||||
import logging
|
||||
import sys
|
||||
|
||||
from oslo_messaging._i18n import _LE
|
||||
from oslo_messaging.rpc import dispatcher as rpc_dispatcher
|
||||
from oslo_messaging import server as msg_server
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RPCServer(msg_server.MessageHandlingServer):
|
||||
def __init__(self, transport, target, dispatcher, executor='blocking'):
|
||||
super(RPCServer, self).__init__(transport, dispatcher, executor)
|
||||
self._target = target
|
||||
|
||||
def _create_listener(self):
|
||||
return msg_server.SingleMessageListenerAdapter(
|
||||
self.transport._listen(self._target)
|
||||
)
|
||||
|
||||
def _process_incoming(self, incoming):
|
||||
incoming.acknowledge()
|
||||
try:
|
||||
res = self.dispatcher.dispatch(incoming)
|
||||
except rpc_dispatcher.ExpectedException as e:
|
||||
LOG.debug(u'Expected exception during message handling (%s)',
|
||||
e.exc_info[1])
|
||||
incoming.reply(failure=e.exc_info)
|
||||
except Exception as e:
|
||||
# current sys.exc_info() content can be overriden
|
||||
# by another exception raise by a log handler during
|
||||
# LOG.exception(). So keep a copy and delete it later.
|
||||
exc_info = sys.exc_info()
|
||||
try:
|
||||
LOG.exception(_LE('Exception during message handling: %s'), e)
|
||||
incoming.reply(failure=exc_info)
|
||||
finally:
|
||||
# NOTE(dhellmann): Remove circular object reference
|
||||
# between the current stack frame and the traceback in
|
||||
# exc_info.
|
||||
del exc_info
|
||||
else:
|
||||
try:
|
||||
incoming.reply(res)
|
||||
except Exception:
|
||||
LOG.Exception("Can not send reply for message %s", incoming)
|
||||
|
||||
|
||||
def get_rpc_server(transport, target, endpoints,
|
||||
executor='blocking', serializer=None):
|
||||
@ -129,8 +173,8 @@ def get_rpc_server(transport, target, endpoints,
|
||||
:param serializer: an optional entity serializer
|
||||
:type serializer: Serializer
|
||||
"""
|
||||
dispatcher = rpc_dispatcher.RPCDispatcher(target, endpoints, serializer)
|
||||
return msg_server.MessageHandlingServer(transport, dispatcher, executor)
|
||||
dispatcher = rpc_dispatcher.RPCDispatcher(endpoints, serializer)
|
||||
return RPCServer(transport, target, dispatcher, executor)
|
||||
|
||||
|
||||
def expected_exceptions(*exceptions):
|
||||
|
@ -23,6 +23,7 @@ __all__ = [
|
||||
'ServerListenError',
|
||||
]
|
||||
|
||||
import abc
|
||||
import functools
|
||||
import inspect
|
||||
import logging
|
||||
@ -34,6 +35,7 @@ from oslo_service import service
|
||||
from oslo_utils import eventletutils
|
||||
from oslo_utils import excutils
|
||||
from oslo_utils import timeutils
|
||||
import six
|
||||
from stevedore import driver
|
||||
|
||||
from oslo_messaging._drivers import base as driver_base
|
||||
@ -295,6 +297,42 @@ def ordered(after=None, reset_after=None):
|
||||
return _ordered
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class MessageListenerAdapter(object):
|
||||
def __init__(self, driver_listener, timeout=None):
|
||||
self._driver_listener = driver_listener
|
||||
self._timeout = timeout
|
||||
|
||||
@abc.abstractmethod
|
||||
def poll(self):
|
||||
"""Poll incoming and return incoming request"""
|
||||
|
||||
def stop(self):
|
||||
self._driver_listener.stop()
|
||||
|
||||
def cleanup(self):
|
||||
self._driver_listener.cleanup()
|
||||
|
||||
|
||||
class SingleMessageListenerAdapter(MessageListenerAdapter):
|
||||
def poll(self):
|
||||
msgs = self._driver_listener.poll(prefetch_size=1,
|
||||
timeout=self._timeout)
|
||||
return msgs[0] if msgs else None
|
||||
|
||||
|
||||
class BatchMessageListenerAdapter(MessageListenerAdapter):
|
||||
def __init__(self, driver_listener, timeout=None, batch_size=1):
|
||||
super(BatchMessageListenerAdapter, self).__init__(driver_listener,
|
||||
timeout)
|
||||
self._batch_size = batch_size
|
||||
|
||||
def poll(self):
|
||||
return self._driver_listener.poll(prefetch_size=self._batch_size,
|
||||
timeout=self._timeout)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
|
||||
"""Server for handling messages.
|
||||
|
||||
@ -345,9 +383,18 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
|
||||
|
||||
super(MessageHandlingServer, self).__init__()
|
||||
|
||||
def _submit_work(self, callback):
|
||||
fut = self._work_executor.submit(callback.run)
|
||||
fut.add_done_callback(lambda f: callback.done())
|
||||
@abc.abstractmethod
|
||||
def _process_incoming(self, incoming):
|
||||
"""Process incoming request
|
||||
|
||||
:param incoming: incoming request.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def _create_listener(self):
|
||||
"""Creates listener object for polling requests
|
||||
:return: MessageListenerAdapter
|
||||
"""
|
||||
|
||||
@ordered(reset_after='stop')
|
||||
def start(self, override_pool_size=None):
|
||||
@ -374,7 +421,7 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
|
||||
self._started = True
|
||||
|
||||
try:
|
||||
self.listener = self.dispatcher._listen(self.transport)
|
||||
self.listener = self._create_listener()
|
||||
except driver_base.TransportDriverError as ex:
|
||||
raise ServerListenError(self.target, ex)
|
||||
|
||||
@ -412,22 +459,18 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
|
||||
@excutils.forever_retry_uncaught_exceptions
|
||||
def _runner(self):
|
||||
while self._started:
|
||||
incoming = self.listener.poll(
|
||||
timeout=self.dispatcher.batch_timeout,
|
||||
prefetch_size=self.dispatcher.batch_size)
|
||||
incoming = self.listener.poll()
|
||||
|
||||
if incoming:
|
||||
self._submit_work(self.dispatcher(incoming))
|
||||
self._work_executor.submit(self._process_incoming, incoming)
|
||||
|
||||
# listener is stopped but we need to process all already consumed
|
||||
# messages
|
||||
while True:
|
||||
incoming = self.listener.poll(
|
||||
timeout=self.dispatcher.batch_timeout,
|
||||
prefetch_size=self.dispatcher.batch_size)
|
||||
incoming = self.listener.poll()
|
||||
|
||||
if incoming:
|
||||
self._submit_work(self.dispatcher(incoming))
|
||||
self._work_executor.submit(self._process_incoming, incoming)
|
||||
else:
|
||||
return
|
||||
|
||||
|
@ -13,8 +13,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import itertools
|
||||
|
||||
from oslo_utils import timeutils
|
||||
import testscenarios
|
||||
|
||||
@ -25,7 +23,6 @@ from six.moves import mock
|
||||
|
||||
load_tests = testscenarios.load_tests_apply_scenarios
|
||||
|
||||
|
||||
notification_msg = dict(
|
||||
publisher_id="publisher_id",
|
||||
event_type="compute.start",
|
||||
@ -96,20 +93,21 @@ class TestDispatcher(test_utils.BaseTestCase):
|
||||
msg = notification_msg.copy()
|
||||
msg['priority'] = self.priority
|
||||
|
||||
targets = [oslo_messaging.Target(topic='notifications')]
|
||||
dispatcher = notify_dispatcher.NotificationDispatcher(
|
||||
targets, endpoints, None, allow_requeue=True, pool=None)
|
||||
|
||||
# check it listen on wanted topics
|
||||
self.assertEqual(sorted(set((targets[0], prio)
|
||||
for prio in itertools.chain.from_iterable(
|
||||
self.endpoints))),
|
||||
sorted(dispatcher._targets_priorities))
|
||||
dispatcher = notify_dispatcher.NotificationDispatcher(endpoints, None)
|
||||
|
||||
incoming = mock.Mock(ctxt={}, message=msg)
|
||||
callback = dispatcher([incoming])
|
||||
callback.run()
|
||||
callback.done()
|
||||
|
||||
res = dispatcher.dispatch(incoming)
|
||||
|
||||
expected_res = (
|
||||
notify_dispatcher.NotificationResult.REQUEUE
|
||||
if (self.return_value ==
|
||||
notify_dispatcher.NotificationResult.REQUEUE or
|
||||
self.ex is not None)
|
||||
else notify_dispatcher.NotificationResult.HANDLED
|
||||
)
|
||||
|
||||
self.assertEqual(expected_res, res)
|
||||
|
||||
# check endpoint callbacks are called or not
|
||||
for i, endpoint_methods in enumerate(self.endpoints):
|
||||
@ -127,26 +125,14 @@ class TestDispatcher(test_utils.BaseTestCase):
|
||||
else:
|
||||
self.assertEqual(0, endpoints[i].call_count)
|
||||
|
||||
if self.ex:
|
||||
self.assertEqual(1, incoming.acknowledge.call_count)
|
||||
self.assertEqual(0, incoming.requeue.call_count)
|
||||
elif self.return_value == oslo_messaging.NotificationResult.HANDLED \
|
||||
or self.return_value is None:
|
||||
self.assertEqual(1, incoming.acknowledge.call_count)
|
||||
self.assertEqual(0, incoming.requeue.call_count)
|
||||
elif self.return_value == oslo_messaging.NotificationResult.REQUEUE:
|
||||
self.assertEqual(0, incoming.acknowledge.call_count)
|
||||
self.assertEqual(1, incoming.requeue.call_count)
|
||||
|
||||
@mock.patch('oslo_messaging.notify.dispatcher.LOG')
|
||||
def test_dispatcher_unknown_prio(self, mylog):
|
||||
msg = notification_msg.copy()
|
||||
msg['priority'] = 'what???'
|
||||
dispatcher = notify_dispatcher.NotificationDispatcher(
|
||||
[mock.Mock()], [mock.Mock()], None, allow_requeue=True, pool=None)
|
||||
callback = dispatcher([mock.Mock(ctxt={}, message=msg)])
|
||||
callback.run()
|
||||
callback.done()
|
||||
[mock.Mock()], None)
|
||||
res = dispatcher.dispatch(mock.Mock(ctxt={}, message=msg))
|
||||
self.assertEqual(None, res)
|
||||
mylog.warning.assert_called_once_with('Unknown priority "%s"',
|
||||
'what???')
|
||||
|
||||
@ -236,9 +222,8 @@ class TestDispatcherFilter(test_utils.BaseTestCase):
|
||||
**self.filter_rule)
|
||||
endpoint = mock.Mock(spec=['info'], filter_rule=notification_filter)
|
||||
|
||||
targets = [oslo_messaging.Target(topic='notifications')]
|
||||
dispatcher = notify_dispatcher.NotificationDispatcher(
|
||||
targets, [endpoint], serializer=None, allow_requeue=True)
|
||||
[endpoint], serializer=None)
|
||||
message = {'payload': {'state': 'active'},
|
||||
'priority': 'info',
|
||||
'publisher_id': self.publisher_id,
|
||||
@ -246,9 +231,7 @@ class TestDispatcherFilter(test_utils.BaseTestCase):
|
||||
'timestamp': '2014-03-03 18:21:04.369234',
|
||||
'message_id': '99863dda-97f0-443a-a0c1-6ed317b7fd45'}
|
||||
incoming = mock.Mock(ctxt=self.context, message=message)
|
||||
callback = dispatcher([incoming])
|
||||
callback.run()
|
||||
callback.done()
|
||||
dispatcher.dispatch(incoming)
|
||||
|
||||
if self.match:
|
||||
self.assertEqual(1, endpoint.info.call_count)
|
||||
|
@ -351,8 +351,7 @@ class TestLogNotifier(test_utils.BaseTestCase):
|
||||
logger = mock.MagicMock()
|
||||
logger.info = mock.MagicMock()
|
||||
message = {'password': 'passw0rd', 'event_type': 'foo'}
|
||||
json_str = jsonutils.dumps(message)
|
||||
mask_str = strutils.mask_password(json_str)
|
||||
mask_str = jsonutils.dumps(strutils.mask_dict_password(message))
|
||||
|
||||
with mock.patch.object(logging, 'getLogger') as gl:
|
||||
gl.return_value = logger
|
||||
|
@ -109,33 +109,29 @@ class TestDispatcher(test_utils.BaseTestCase):
|
||||
for e in self.endpoints]
|
||||
|
||||
serializer = None
|
||||
target = oslo_messaging.Target()
|
||||
dispatcher = oslo_messaging.RPCDispatcher(target, endpoints,
|
||||
serializer)
|
||||
|
||||
def check_reply(reply=None, failure=None, log_failure=True):
|
||||
if self.ex and failure is not None:
|
||||
ex = failure[1]
|
||||
self.assertFalse(self.success, ex)
|
||||
self.assertIsNotNone(self.ex, ex)
|
||||
self.assertIsInstance(ex, self.ex, ex)
|
||||
if isinstance(ex, oslo_messaging.NoSuchMethod):
|
||||
self.assertEqual(self.msg.get('method'), ex.method)
|
||||
elif isinstance(ex, oslo_messaging.UnsupportedVersion):
|
||||
self.assertEqual(self.msg.get('version', '1.0'),
|
||||
ex.version)
|
||||
if ex.method:
|
||||
self.assertEqual(self.msg.get('method'), ex.method)
|
||||
else:
|
||||
self.assertTrue(self.success, failure)
|
||||
self.assertIsNone(failure)
|
||||
dispatcher = oslo_messaging.RPCDispatcher(endpoints, serializer)
|
||||
|
||||
incoming = mock.Mock(ctxt=self.ctxt, message=self.msg)
|
||||
incoming.reply.side_effect = check_reply
|
||||
|
||||
callback = dispatcher([incoming])
|
||||
callback.run()
|
||||
callback.done()
|
||||
res = None
|
||||
|
||||
try:
|
||||
res = dispatcher.dispatch(incoming)
|
||||
except Exception as ex:
|
||||
self.assertFalse(self.success, ex)
|
||||
self.assertIsNotNone(self.ex, ex)
|
||||
self.assertIsInstance(ex, self.ex, ex)
|
||||
if isinstance(ex, oslo_messaging.NoSuchMethod):
|
||||
self.assertEqual(self.msg.get('method'), ex.method)
|
||||
elif isinstance(ex, oslo_messaging.UnsupportedVersion):
|
||||
self.assertEqual(self.msg.get('version', '1.0'),
|
||||
ex.version)
|
||||
if ex.method:
|
||||
self.assertEqual(self.msg.get('method'), ex.method)
|
||||
else:
|
||||
self.assertTrue(self.success,
|
||||
"Not expected success of operation durung testing")
|
||||
self.assertIsNotNone(res)
|
||||
|
||||
for n, endpoint in enumerate(endpoints):
|
||||
for method_name in ['foo', 'bar']:
|
||||
@ -147,8 +143,6 @@ class TestDispatcher(test_utils.BaseTestCase):
|
||||
else:
|
||||
self.assertEqual(0, method.call_count)
|
||||
|
||||
self.assertEqual(1, incoming.reply.call_count)
|
||||
|
||||
|
||||
class TestSerializer(test_utils.BaseTestCase):
|
||||
|
||||
@ -165,9 +159,7 @@ class TestSerializer(test_utils.BaseTestCase):
|
||||
def test_serializer(self):
|
||||
endpoint = _FakeEndpoint()
|
||||
serializer = msg_serializer.NoOpSerializer()
|
||||
target = oslo_messaging.Target()
|
||||
dispatcher = oslo_messaging.RPCDispatcher(target, [endpoint],
|
||||
serializer)
|
||||
dispatcher = oslo_messaging.RPCDispatcher([endpoint], serializer)
|
||||
|
||||
self.mox.StubOutWithMock(endpoint, 'foo')
|
||||
args = dict([(k, 'd' + v) for k, v in self.args.items()])
|
||||
@ -187,7 +179,9 @@ class TestSerializer(test_utils.BaseTestCase):
|
||||
|
||||
self.mox.ReplayAll()
|
||||
|
||||
retval = dispatcher._dispatch(self.ctxt, dict(method='foo',
|
||||
args=self.args))
|
||||
incoming = mock.Mock()
|
||||
incoming.ctxt = self.ctxt
|
||||
incoming.message = dict(method='foo', args=self.args)
|
||||
retval = dispatcher.dispatch(incoming)
|
||||
if self.retval is not None:
|
||||
self.assertEqual('s' + self.retval, retval)
|
||||
|
@ -149,7 +149,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
|
||||
serializer=serializer)
|
||||
# Mocking executor
|
||||
server._executor_cls = MagicMockIgnoreArgs
|
||||
server.listener = MagicMockIgnoreArgs()
|
||||
server._create_listener = MagicMockIgnoreArgs()
|
||||
server.dispatcher = MagicMockIgnoreArgs()
|
||||
# Here assigning executor's listener object to listener variable
|
||||
# before calling wait method, because in wait method we are
|
||||
@ -551,7 +551,6 @@ class TestServerLocking(test_utils.BaseTestCase):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self._lock = threading.Lock()
|
||||
self._calls = []
|
||||
self.listener = mock.MagicMock()
|
||||
executors.append(self)
|
||||
|
||||
submit = _logmethod('submit')
|
||||
@ -559,9 +558,16 @@ class TestServerLocking(test_utils.BaseTestCase):
|
||||
|
||||
self.executors = executors
|
||||
|
||||
self.server = oslo_messaging.MessageHandlingServer(mock.Mock(),
|
||||
mock.Mock())
|
||||
class MessageHandlingServerImpl(oslo_messaging.MessageHandlingServer):
|
||||
def _create_listener(self):
|
||||
pass
|
||||
|
||||
def _process_incoming(self, incoming):
|
||||
pass
|
||||
|
||||
self.server = MessageHandlingServerImpl(mock.Mock(), mock.Mock())
|
||||
self.server._executor_cls = FakeExecutor
|
||||
self.server._create_listener = mock.Mock()
|
||||
|
||||
def test_start_stop_wait(self):
|
||||
# Test a simple execution of start, stop, wait in order
|
||||
|
@ -61,6 +61,14 @@ class OptsTestCase(test_utils.BaseTestCase):
|
||||
def test_defaults(self):
|
||||
transport = mock.Mock()
|
||||
transport.conf = self.conf
|
||||
server.MessageHandlingServer(transport, mock.Mock())
|
||||
|
||||
class MessageHandlingServerImpl(server.MessageHandlingServer):
|
||||
def _create_listener(self):
|
||||
pass
|
||||
|
||||
def _process_incoming(self, incoming):
|
||||
pass
|
||||
|
||||
MessageHandlingServerImpl(transport, mock.Mock())
|
||||
opts.set_defaults(self.conf, executor_thread_pool_size=100)
|
||||
self.assertEqual(100, self.conf.executor_thread_pool_size)
|
||||
|
Loading…
Reference in New Issue
Block a user