Remove executor callback
Revert the change I556b112371bec2ec29cea4dc254bb3f9c6d2c07a: the executor callback API was only used by the aioeventlet executor which was just removed. Change-Id: I1223f32594d8c1be28cc43fdd9bf102c86d75325
This commit is contained in:
parent
d260744773
commit
22fea728cc
oslo_messaging
@ -20,9 +20,6 @@ import six
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class ExecutorBase(object):
|
||||
|
||||
# Executor can override how we run the application callback
|
||||
_executor_callback = None
|
||||
|
||||
def __init__(self, conf, listener, dispatcher):
|
||||
self.conf = conf
|
||||
self.listener = listener
|
||||
|
@ -99,7 +99,7 @@ class PooledExecutor(base.ExecutorBase):
|
||||
|
||||
if not incoming:
|
||||
continue
|
||||
callback = self.dispatcher(incoming, self._executor_callback)
|
||||
callback = self.dispatcher(incoming)
|
||||
was_submitted = self._do_submit(callback)
|
||||
if not was_submitted:
|
||||
break
|
||||
|
@ -43,13 +43,11 @@ class DispatcherExecutorContext(object):
|
||||
thread.run(callback.run)
|
||||
|
||||
"""
|
||||
def __init__(self, incoming, dispatch, executor_callback=None,
|
||||
post=None):
|
||||
def __init__(self, incoming, dispatch, post=None):
|
||||
self._result = None
|
||||
self._incoming = incoming
|
||||
self._dispatch = dispatch
|
||||
self._post = post
|
||||
self._executor_callback = executor_callback
|
||||
|
||||
def run(self):
|
||||
"""The incoming message dispath itself
|
||||
@ -58,8 +56,7 @@ class DispatcherExecutorContext(object):
|
||||
able to do it.
|
||||
"""
|
||||
try:
|
||||
self._result = self._dispatch(self._incoming,
|
||||
self._executor_callback)
|
||||
self._result = self._dispatch(self._incoming)
|
||||
except Exception:
|
||||
msg = _('The dispatcher method must catches all exceptions')
|
||||
LOG.exception(msg)
|
||||
@ -104,7 +101,7 @@ class DispatcherBase(object):
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def __call__(self, incoming, executor_callback=None):
|
||||
def __call__(self, incoming):
|
||||
"""Called by the executor to get the DispatcherExecutorContext
|
||||
|
||||
:param incoming: list of messages
|
||||
|
@ -61,10 +61,9 @@ class _NotificationDispatcherBase(dispatcher.DispatcherBase):
|
||||
return transport._listen_for_notifications(self._targets_priorities,
|
||||
pool=self.pool)
|
||||
|
||||
def __call__(self, incoming, executor_callback=None):
|
||||
def __call__(self, incoming):
|
||||
return dispatcher.DispatcherExecutorContext(
|
||||
incoming, self._dispatch_and_handle_error,
|
||||
executor_callback=executor_callback,
|
||||
post=self._post_dispatch)
|
||||
|
||||
def _post_dispatch(self, incoming, requeues):
|
||||
@ -77,18 +76,18 @@ class _NotificationDispatcherBase(dispatcher.DispatcherBase):
|
||||
except Exception:
|
||||
LOG.error(_LE("Fail to ack/requeue message"), exc_info=True)
|
||||
|
||||
def _dispatch_and_handle_error(self, incoming, executor_callback):
|
||||
def _dispatch_and_handle_error(self, incoming):
|
||||
"""Dispatch a notification message to the appropriate endpoint method.
|
||||
|
||||
:param incoming: the incoming notification message
|
||||
:type ctxt: IncomingMessage
|
||||
"""
|
||||
try:
|
||||
return self._dispatch(incoming, executor_callback)
|
||||
return self._dispatch(incoming)
|
||||
except Exception:
|
||||
LOG.error(_LE('Exception during message handling'), exc_info=True)
|
||||
|
||||
def _dispatch(self, incoming, executor_callback=None):
|
||||
def _dispatch(self, incoming):
|
||||
"""Dispatch notification messages to the appropriate endpoint method.
|
||||
"""
|
||||
|
||||
@ -120,18 +119,14 @@ class _NotificationDispatcherBase(dispatcher.DispatcherBase):
|
||||
if not filtered_messages:
|
||||
continue
|
||||
|
||||
ret = self._exec_callback(executor_callback, callback,
|
||||
filtered_messages)
|
||||
ret = self._exec_callback(callback, filtered_messages)
|
||||
if self.allow_requeue and ret == NotificationResult.REQUEUE:
|
||||
requeues.update(raw_messages)
|
||||
break
|
||||
return requeues
|
||||
|
||||
def _exec_callback(self, executor_callback, callback, *args):
|
||||
if executor_callback:
|
||||
ret = executor_callback(callback, *args)
|
||||
else:
|
||||
ret = callback(*args)
|
||||
def _exec_callback(self, callback, *args):
|
||||
ret = callback(*args)
|
||||
return NotificationResult.HANDLED if ret is None else ret
|
||||
|
||||
def _extract_user_message(self, incoming):
|
||||
@ -161,12 +156,12 @@ class NotificationDispatcher(_NotificationDispatcherBase):
|
||||
which is invoked with context and message dictionaries each time a message
|
||||
is received.
|
||||
"""
|
||||
def _exec_callback(self, executor_callback, callback, messages):
|
||||
def _exec_callback(self, callback, messages):
|
||||
localcontext._set_local_context(
|
||||
messages[0]["ctxt"])
|
||||
try:
|
||||
return super(NotificationDispatcher, self)._exec_callback(
|
||||
executor_callback, callback,
|
||||
callback,
|
||||
messages[0]["ctxt"],
|
||||
messages[0]["publisher_id"],
|
||||
messages[0]["event_type"],
|
||||
|
@ -118,29 +118,24 @@ class RPCDispatcher(dispatcher.DispatcherBase):
|
||||
endpoint_version = target.version or '1.0'
|
||||
return utils.version_is_compatible(endpoint_version, version)
|
||||
|
||||
def _do_dispatch(self, endpoint, method, ctxt, args, executor_callback):
|
||||
def _do_dispatch(self, endpoint, method, ctxt, args):
|
||||
ctxt = self.serializer.deserialize_context(ctxt)
|
||||
new_args = dict()
|
||||
for argname, arg in six.iteritems(args):
|
||||
new_args[argname] = self.serializer.deserialize_entity(ctxt, arg)
|
||||
func = getattr(endpoint, method)
|
||||
if executor_callback:
|
||||
result = executor_callback(func, ctxt, **new_args)
|
||||
else:
|
||||
result = func(ctxt, **new_args)
|
||||
result = func(ctxt, **new_args)
|
||||
return self.serializer.serialize_entity(ctxt, result)
|
||||
|
||||
def __call__(self, incoming, executor_callback=None):
|
||||
def __call__(self, incoming):
|
||||
incoming[0].acknowledge()
|
||||
return dispatcher.DispatcherExecutorContext(
|
||||
incoming[0], self._dispatch_and_reply,
|
||||
executor_callback=executor_callback)
|
||||
incoming[0], self._dispatch_and_reply)
|
||||
|
||||
def _dispatch_and_reply(self, incoming, executor_callback):
|
||||
def _dispatch_and_reply(self, incoming):
|
||||
try:
|
||||
incoming.reply(self._dispatch(incoming.ctxt,
|
||||
incoming.message,
|
||||
executor_callback))
|
||||
incoming.message))
|
||||
except ExpectedException as e:
|
||||
LOG.debug(u'Expected exception during message handling (%s)',
|
||||
e.exc_info[1])
|
||||
@ -158,7 +153,7 @@ class RPCDispatcher(dispatcher.DispatcherBase):
|
||||
# exc_info.
|
||||
del exc_info
|
||||
|
||||
def _dispatch(self, ctxt, message, executor_callback=None):
|
||||
def _dispatch(self, ctxt, message):
|
||||
"""Dispatch an RPC message to the appropriate endpoint method.
|
||||
|
||||
:param ctxt: the request context
|
||||
@ -185,8 +180,7 @@ class RPCDispatcher(dispatcher.DispatcherBase):
|
||||
if hasattr(endpoint, method):
|
||||
localcontext._set_local_context(ctxt)
|
||||
try:
|
||||
return self._do_dispatch(endpoint, method, ctxt, args,
|
||||
executor_callback)
|
||||
return self._do_dispatch(endpoint, method, ctxt, args)
|
||||
finally:
|
||||
localcontext._clear_local_context()
|
||||
|
||||
|
@ -82,20 +82,15 @@ class TestExecutor(test_utils.BaseTestCase):
|
||||
def _listen(self, transport):
|
||||
pass
|
||||
|
||||
def callback(self, incoming, executor_callback):
|
||||
if executor_callback is None:
|
||||
result = self.endpoint(incoming.ctxt,
|
||||
incoming.message)
|
||||
else:
|
||||
result = executor_callback(self.endpoint,
|
||||
incoming.ctxt,
|
||||
incoming.message)
|
||||
def callback(self, incoming):
|
||||
result = self.endpoint(incoming.ctxt,
|
||||
incoming.message)
|
||||
self.result = result
|
||||
return result
|
||||
|
||||
def __call__(self, incoming, executor_callback=None):
|
||||
def __call__(self, incoming):
|
||||
return dispatcher_base.DispatcherExecutorContext(
|
||||
incoming[0], self.callback, executor_callback)
|
||||
incoming[0], self.callback)
|
||||
|
||||
return Dispatcher(endpoint), endpoint, event, run_executor
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user