Merge "Remove usage of contentmanager for executors"
This commit is contained in:
commit
0055d79ac2
@ -35,7 +35,7 @@ from stevedore import driver
|
||||
|
||||
from oslo_messaging._drivers import base
|
||||
from oslo_messaging._drivers import common as rpc_common
|
||||
from oslo_messaging._executors import base as executor_base # FIXME(markmc)
|
||||
from oslo_messaging._executors import impl_pooledexecutor # FIXME(markmc)
|
||||
from oslo_messaging._i18n import _, _LE, _LW
|
||||
from oslo_messaging._drivers import pool
|
||||
|
||||
@ -1001,7 +1001,7 @@ class ZmqDriver(base.BaseDriver):
|
||||
if not zmq:
|
||||
raise ImportError("Failed to import eventlet.green.zmq")
|
||||
conf.register_opts(zmq_opts)
|
||||
conf.register_opts(executor_base._pool_opts)
|
||||
conf.register_opts(impl_pooledexecutor._pool_opts)
|
||||
conf.register_opts(base.base_opts)
|
||||
|
||||
super(ZmqDriver, self).__init__(conf, url, default_exchange,
|
||||
|
@ -14,19 +14,15 @@
|
||||
|
||||
import abc
|
||||
|
||||
from oslo_config import cfg
|
||||
import six
|
||||
|
||||
_pool_opts = [
|
||||
cfg.IntOpt('rpc_thread_pool_size',
|
||||
default=64,
|
||||
help='Size of RPC thread pool.'),
|
||||
]
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class ExecutorBase(object):
|
||||
|
||||
# Executor can override how we run the application callback
|
||||
_executor_callback = None
|
||||
|
||||
def __init__(self, conf, listener, dispatcher):
|
||||
self.conf = conf
|
||||
self.listener = listener
|
||||
@ -43,11 +39,3 @@ class ExecutorBase(object):
|
||||
@abc.abstractmethod
|
||||
def wait(self):
|
||||
"Wait until the executor has stopped polling."
|
||||
|
||||
|
||||
class PooledExecutorBase(ExecutorBase):
|
||||
"""An executor that uses a rpc thread pool of a given size."""
|
||||
|
||||
def __init__(self, conf, listener, callback):
|
||||
super(PooledExecutorBase, self).__init__(conf, listener, callback)
|
||||
self.conf.register_opts(_pool_opts)
|
||||
|
@ -70,6 +70,4 @@ class AsyncioEventletExecutor(impl_eventlet.EventletExecutor):
|
||||
result = aioeventlet.yield_future(result, loop=self._loop)
|
||||
return result
|
||||
|
||||
def _dispatch(self, incoming):
|
||||
ctx = self.dispatcher(incoming, self._coroutine_wrapper)
|
||||
impl_eventlet.spawn_with(ctxt=ctx, pool=self._greenpool)
|
||||
_executor_callback = _coroutine_wrapper
|
||||
|
@ -13,15 +13,28 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import futurist
|
||||
|
||||
from oslo_messaging._executors import base
|
||||
from oslo_messaging._i18n import _
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
from oslo_messaging._executors import impl_pooledexecutor
|
||||
|
||||
|
||||
class BlockingExecutor(base.ExecutorBase):
|
||||
class FakeBlockingThread(object):
|
||||
def __init__(self, target):
|
||||
self._target = target
|
||||
|
||||
def start(self):
|
||||
self._target()
|
||||
|
||||
@staticmethod
|
||||
def join():
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def stop():
|
||||
pass
|
||||
|
||||
|
||||
class BlockingExecutor(impl_pooledexecutor.PooledExecutor):
|
||||
|
||||
"""A message executor which blocks the current thread.
|
||||
|
||||
@ -34,24 +47,5 @@ class BlockingExecutor(base.ExecutorBase):
|
||||
for simple demo programs.
|
||||
"""
|
||||
|
||||
def __init__(self, conf, listener, dispatcher):
|
||||
super(BlockingExecutor, self).__init__(conf, listener, dispatcher)
|
||||
self._running = False
|
||||
|
||||
def start(self):
|
||||
self._running = True
|
||||
while self._running:
|
||||
try:
|
||||
incoming = self.listener.poll()
|
||||
if incoming is not None:
|
||||
with self.dispatcher(incoming) as callback:
|
||||
callback()
|
||||
except Exception:
|
||||
LOG.exception(_("Unexpected exception occurred."))
|
||||
|
||||
def stop(self):
|
||||
self._running = False
|
||||
self.listener.stop()
|
||||
|
||||
def wait(self):
|
||||
pass
|
||||
_executor_cls = lambda __, ___: futurist.SynchronousExecutor()
|
||||
_thread_cls = FakeBlockingThread
|
||||
|
@ -14,50 +14,17 @@
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import sys
|
||||
|
||||
import eventlet
|
||||
from eventlet.green import threading as greenthreading
|
||||
from eventlet import greenpool
|
||||
import greenlet
|
||||
from oslo_utils import excutils
|
||||
import futurist
|
||||
|
||||
from oslo_messaging._executors import base
|
||||
from oslo_messaging._executors import impl_pooledexecutor
|
||||
from oslo_messaging import localcontext
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def spawn_with(ctxt, pool):
|
||||
"""This is the equivalent of a with statement
|
||||
but with the content of the BLOCK statement executed
|
||||
into a greenthread
|
||||
|
||||
exception path grab from:
|
||||
http://www.python.org/dev/peps/pep-0343/
|
||||
"""
|
||||
|
||||
def complete(thread, exit):
|
||||
exc = True
|
||||
try:
|
||||
try:
|
||||
thread.wait()
|
||||
except Exception:
|
||||
exc = False
|
||||
if not exit(*sys.exc_info()):
|
||||
raise
|
||||
finally:
|
||||
if exc:
|
||||
exit(None, None, None)
|
||||
|
||||
callback = ctxt.__enter__()
|
||||
thread = pool.spawn(callback)
|
||||
thread.link(complete, ctxt.__exit__)
|
||||
|
||||
return thread
|
||||
|
||||
|
||||
class EventletExecutor(base.PooledExecutorBase):
|
||||
class EventletExecutor(impl_pooledexecutor.PooledExecutor):
|
||||
|
||||
"""A message executor which integrates with eventlet.
|
||||
|
||||
@ -70,10 +37,6 @@ class EventletExecutor(base.PooledExecutorBase):
|
||||
|
||||
def __init__(self, conf, listener, dispatcher):
|
||||
super(EventletExecutor, self).__init__(conf, listener, dispatcher)
|
||||
self._thread = None
|
||||
self._greenpool = greenpool.GreenPool(self.conf.rpc_thread_pool_size)
|
||||
self._running = False
|
||||
|
||||
if not isinstance(localcontext._STORE, greenthreading.local):
|
||||
LOG.debug('eventlet executor in use but the threading module '
|
||||
'has not been monkeypatched or has been '
|
||||
@ -82,39 +45,7 @@ class EventletExecutor(base.PooledExecutorBase):
|
||||
'behavior. In the future, we will raise a '
|
||||
'RuntimeException in this case.')
|
||||
|
||||
def _dispatch(self, incoming):
|
||||
spawn_with(ctxt=self.dispatcher(incoming), pool=self._greenpool)
|
||||
|
||||
def start(self):
|
||||
if self._thread is not None:
|
||||
return
|
||||
|
||||
@excutils.forever_retry_uncaught_exceptions
|
||||
def _executor_thread():
|
||||
try:
|
||||
while self._running:
|
||||
incoming = self.listener.poll()
|
||||
if incoming is not None:
|
||||
self._dispatch(incoming)
|
||||
except greenlet.GreenletExit:
|
||||
return
|
||||
|
||||
self._running = True
|
||||
self._thread = eventlet.spawn(_executor_thread)
|
||||
|
||||
def stop(self):
|
||||
if self._thread is None:
|
||||
return
|
||||
self._running = False
|
||||
self.listener.stop()
|
||||
self._thread.cancel()
|
||||
|
||||
def wait(self):
|
||||
if self._thread is None:
|
||||
return
|
||||
self._greenpool.waitall()
|
||||
try:
|
||||
self._thread.wait()
|
||||
except greenlet.GreenletExit:
|
||||
pass
|
||||
self._thread = None
|
||||
_executor_cls = futurist.GreenThreadPoolExecutor
|
||||
_lock_cls = greenthreading.Lock
|
||||
_event_cls = greenthreading.Event
|
||||
_thread_cls = greenthreading.Thread
|
||||
|
112
oslo_messaging/_executors/impl_pooledexecutor.py
Normal file
112
oslo_messaging/_executors/impl_pooledexecutor.py
Normal file
@ -0,0 +1,112 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import threading
|
||||
|
||||
from concurrent import futures
|
||||
from oslo.config import cfg
|
||||
from oslo.utils import excutils
|
||||
|
||||
from oslo_messaging._executors import base
|
||||
|
||||
_pool_opts = [
|
||||
cfg.IntOpt('rpc_thread_pool_size',
|
||||
default=64,
|
||||
help='Size of RPC thread pool.'),
|
||||
]
|
||||
|
||||
|
||||
class PooledExecutor(base.ExecutorBase):
|
||||
"""A message executor which integrates with threads.
|
||||
|
||||
A message process that polls for messages from a dispatching thread and
|
||||
on reception of an incoming message places the message to be processed in
|
||||
a thread pool to be executed at a later time.
|
||||
"""
|
||||
|
||||
# NOTE(harlowja): if eventlet is being used and the thread module is monkey
|
||||
# patched this should/is supposed to work the same as the eventlet based
|
||||
# executor.
|
||||
|
||||
# NOTE(harlowja): Make it somewhat easy to change this via
|
||||
# inheritance (since there does exist other executor types that could be
|
||||
# used/tried here).
|
||||
_executor_cls = futures.ThreadPoolExecutor
|
||||
_event_cls = threading.Event
|
||||
_lock_cls = threading.Lock
|
||||
_thread_cls = threading.Thread
|
||||
|
||||
def __init__(self, conf, listener, dispatcher):
|
||||
super(PooledExecutor, self).__init__(conf, listener, dispatcher)
|
||||
self.conf.register_opts(_pool_opts)
|
||||
self._poller = None
|
||||
self._executor = None
|
||||
self._tombstone = self._event_cls()
|
||||
self._incomplete = collections.deque()
|
||||
self._mutator = self._lock_cls()
|
||||
|
||||
@excutils.forever_retry_uncaught_exceptions
|
||||
def _runner(self):
|
||||
while not self._tombstone.is_set():
|
||||
incoming = self.listener.poll()
|
||||
if incoming is None:
|
||||
continue
|
||||
callback = self.dispatcher(incoming, self._executor_callback)
|
||||
try:
|
||||
fut = self._executor.submit(callback.run)
|
||||
except RuntimeError:
|
||||
# This is triggered when the executor has been shutdown...
|
||||
#
|
||||
# TODO(harlowja): should we put whatever we pulled off back
|
||||
# since when this is thrown it means the executor has been
|
||||
# shutdown already??
|
||||
callback.done()
|
||||
return
|
||||
else:
|
||||
with self._mutator:
|
||||
self._incomplete.append(fut)
|
||||
# Run the other post processing of the callback when done...
|
||||
fut.add_done_callback(lambda f: callback.done())
|
||||
|
||||
def start(self):
|
||||
if self._executor is None:
|
||||
self._executor = self._executor_cls(self.conf.rpc_thread_pool_size)
|
||||
self._tombstone.clear()
|
||||
if self._poller is None or not self._poller.is_alive():
|
||||
self._poller = self._thread_cls(target=self._runner)
|
||||
self._poller.daemon = True
|
||||
self._poller.start()
|
||||
|
||||
def stop(self):
|
||||
if self._executor is not None:
|
||||
self._executor.shutdown(wait=False)
|
||||
self._tombstone.set()
|
||||
self.listener.stop()
|
||||
|
||||
def wait(self):
|
||||
# TODO(harlowja): this method really needs a timeout.
|
||||
if self._poller is not None:
|
||||
self._tombstone.wait()
|
||||
self._poller.join()
|
||||
self._poller = None
|
||||
if self._executor is not None:
|
||||
with self._mutator:
|
||||
incomplete_fs = list(self._incomplete)
|
||||
self._incomplete.clear()
|
||||
if incomplete_fs:
|
||||
futures.wait(incomplete_fs, return_when=futures.ALL_COMPLETED)
|
||||
self._executor = None
|
@ -14,118 +14,16 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import functools
|
||||
import sys
|
||||
import threading
|
||||
|
||||
from concurrent import futures
|
||||
from oslo_utils import excutils
|
||||
import six
|
||||
|
||||
from oslo_messaging._executors import base
|
||||
from oslo_messaging._executors import impl_pooledexecutor
|
||||
|
||||
|
||||
class ThreadExecutor(base.PooledExecutorBase):
|
||||
class ThreadExecutor(impl_pooledexecutor.PooledExecutor):
|
||||
"""A message executor which integrates with threads.
|
||||
|
||||
A message process that polls for messages from a dispatching thread and
|
||||
on reception of an incoming message places the message to be processed in
|
||||
a thread pool to be executed at a later time.
|
||||
"""
|
||||
|
||||
# NOTE(harlowja): if eventlet is being used and the thread module is monkey
|
||||
# patched this should/is supposed to work the same as the eventlet based
|
||||
# executor.
|
||||
|
||||
# NOTE(harlowja): Make it somewhat easy to change this via
|
||||
# inheritance (since there does exist other executor types that could be
|
||||
# used/tried here).
|
||||
_executor_cls = futures.ThreadPoolExecutor
|
||||
|
||||
def __init__(self, conf, listener, dispatcher):
|
||||
super(ThreadExecutor, self).__init__(conf, listener, dispatcher)
|
||||
self._poller = None
|
||||
self._executor = None
|
||||
self._tombstone = threading.Event()
|
||||
self._incomplete = collections.deque()
|
||||
self._mutator = threading.Lock()
|
||||
|
||||
def _completer(self, exit_method, fut):
|
||||
"""Completes futures."""
|
||||
try:
|
||||
exc = fut.exception()
|
||||
if exc is not None:
|
||||
exc_type = type(exc)
|
||||
# Not available on < 3.x due to this being an added feature
|
||||
# of pep-3134 (exception chaining and embedded tracebacks).
|
||||
if six.PY3:
|
||||
exc_tb = exc.__traceback__
|
||||
else:
|
||||
exc_tb = None
|
||||
if not exit_method(exc_type, exc, exc_tb):
|
||||
six.reraise(exc_type, exc, tb=exc_tb)
|
||||
else:
|
||||
exit_method(None, None, None)
|
||||
finally:
|
||||
with self._mutator:
|
||||
try:
|
||||
self._incomplete.remove(fut)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
@excutils.forever_retry_uncaught_exceptions
|
||||
def _runner(self):
|
||||
while not self._tombstone.is_set():
|
||||
incoming = self.listener.poll()
|
||||
if incoming is None:
|
||||
continue
|
||||
# This is hacky, needs to be fixed....
|
||||
context = self.dispatcher(incoming)
|
||||
enter_method = context.__enter__()
|
||||
exit_method = context.__exit__
|
||||
try:
|
||||
fut = self._executor.submit(enter_method)
|
||||
except RuntimeError:
|
||||
# This is triggered when the executor has been shutdown...
|
||||
#
|
||||
# TODO(harlowja): should we put whatever we pulled off back
|
||||
# since when this is thrown it means the executor has been
|
||||
# shutdown already??
|
||||
exit_method(*sys.exc_info())
|
||||
return
|
||||
else:
|
||||
with self._mutator:
|
||||
self._incomplete.append(fut)
|
||||
# Run the other half (__exit__) when done...
|
||||
fut.add_done_callback(functools.partial(self._completer,
|
||||
exit_method))
|
||||
|
||||
def start(self):
|
||||
if self._executor is None:
|
||||
self._executor = self._executor_cls(self.conf.rpc_thread_pool_size)
|
||||
self._tombstone.clear()
|
||||
if self._poller is None or not self._poller.is_alive():
|
||||
self._poller = threading.Thread(target=self._runner)
|
||||
self._poller.daemon = True
|
||||
self._poller.start()
|
||||
|
||||
def stop(self):
|
||||
if self._executor is not None:
|
||||
self._executor.shutdown(wait=False)
|
||||
self._tombstone.set()
|
||||
self.listener.stop()
|
||||
|
||||
def wait(self):
|
||||
# TODO(harlowja): this method really needs a timeout.
|
||||
if self._poller is not None:
|
||||
self._tombstone.wait()
|
||||
self._poller.join()
|
||||
self._poller = None
|
||||
if self._executor is not None:
|
||||
with self._mutator:
|
||||
incomplete_fs = list(self._incomplete)
|
||||
self._incomplete.clear()
|
||||
if incomplete_fs:
|
||||
futures.wait(incomplete_fs, return_when=futures.ALL_COMPLETED)
|
||||
self._executor = None
|
||||
|
@ -13,6 +13,10 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def version_is_compatible(imp_version, version):
|
||||
"""Determine whether versions are compatible.
|
||||
@ -39,3 +43,54 @@ def version_is_compatible(imp_version, version):
|
||||
int(rev) > int(imp_rev)): # Revision
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class DispatcherExecutorContext(object):
|
||||
"""Dispatcher executor context helper
|
||||
|
||||
A dispatcher can have work to do before and after the dispatch of the
|
||||
request in the main server thread while the dispatcher itself can be
|
||||
done in its own thread.
|
||||
|
||||
The executor can use the helper like this:
|
||||
|
||||
callback = dispatcher(incoming)
|
||||
callback.prepare()
|
||||
thread = MyWhateverThread()
|
||||
thread.on_done(callback.done)
|
||||
thread.run(callback.run)
|
||||
|
||||
"""
|
||||
def __init__(self, incoming, dispatch, executor_callback=None,
|
||||
post=None):
|
||||
self._result = None
|
||||
self._incoming = incoming
|
||||
self._dispatch = dispatch
|
||||
self._post = post
|
||||
self._executor_callback = executor_callback
|
||||
|
||||
def run(self):
|
||||
"""The incoming message dispath itself
|
||||
|
||||
Can be run in an other thread/greenlet/corotine if the executor is
|
||||
able to do it.
|
||||
"""
|
||||
try:
|
||||
self._result = self._dispatch(self._incoming,
|
||||
self._executor_callback)
|
||||
except Exception:
|
||||
msg = 'The dispatcher method must catches all exceptions'
|
||||
LOG.exception(msg)
|
||||
raise RuntimeError(msg)
|
||||
|
||||
def done(self):
|
||||
"""Callback after the incoming message have been dispathed
|
||||
|
||||
Should be runned in the main executor thread/greenlet/corotine
|
||||
"""
|
||||
# FIXME(sileht): this is not currently true, this works only because
|
||||
# the driver connection used for polling write on the wire only to
|
||||
# ack/requeue message, but what if one day, the driver do something
|
||||
# else
|
||||
if self._post is not None:
|
||||
self._post(self._incoming, self._result)
|
||||
|
@ -14,11 +14,11 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import itertools
|
||||
import logging
|
||||
import sys
|
||||
|
||||
from oslo_messaging import _utils as utils
|
||||
from oslo_messaging import localcontext
|
||||
from oslo_messaging import serializer as msg_serializer
|
||||
|
||||
@ -68,14 +68,15 @@ class NotificationDispatcher(object):
|
||||
return transport._listen_for_notifications(self._targets_priorities,
|
||||
pool=self.pool)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def __call__(self, incoming, executor_callback=None):
|
||||
result_wrapper = []
|
||||
return utils.DispatcherExecutorContext(
|
||||
incoming, self._dispatch_and_handle_error,
|
||||
executor_callback=executor_callback,
|
||||
post=self._post_dispatch)
|
||||
|
||||
yield lambda: result_wrapper.append(
|
||||
self._dispatch_and_handle_error(incoming, executor_callback))
|
||||
|
||||
if result_wrapper[0] == NotificationResult.HANDLED:
|
||||
@staticmethod
|
||||
def _post_dispatch(incoming, result):
|
||||
if result == NotificationResult.HANDLED:
|
||||
incoming.acknowledge()
|
||||
else:
|
||||
incoming.requeue()
|
||||
|
@ -29,7 +29,7 @@ from oslo_messaging._drivers import matchmaker
|
||||
from oslo_messaging._drivers import matchmaker_redis
|
||||
from oslo_messaging._drivers import matchmaker_ring
|
||||
from oslo_messaging._drivers.protocols.amqp import opts as amqp_opts
|
||||
from oslo_messaging._executors import base
|
||||
from oslo_messaging._executors import impl_pooledexecutor
|
||||
from oslo_messaging.notify import notifier
|
||||
from oslo_messaging.rpc import client
|
||||
from oslo_messaging import transport
|
||||
@ -38,7 +38,7 @@ _global_opt_lists = [
|
||||
drivers_base.base_opts,
|
||||
impl_zmq.zmq_opts,
|
||||
matchmaker.matchmaker_opts,
|
||||
base._pool_opts,
|
||||
impl_pooledexecutor._pool_opts,
|
||||
notifier._notifier_opts,
|
||||
client._client_opts,
|
||||
transport._transport_opts,
|
||||
|
@ -24,7 +24,6 @@ __all__ = [
|
||||
'ExpectedException',
|
||||
]
|
||||
|
||||
import contextlib
|
||||
import logging
|
||||
import sys
|
||||
|
||||
@ -130,10 +129,11 @@ class RPCDispatcher(object):
|
||||
result = func(ctxt, **new_args)
|
||||
return self.serializer.serialize_entity(ctxt, result)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def __call__(self, incoming, executor_callback=None):
|
||||
incoming.acknowledge()
|
||||
yield lambda: self._dispatch_and_reply(incoming, executor_callback)
|
||||
return utils.DispatcherExecutorContext(
|
||||
incoming, self._dispatch_and_reply,
|
||||
executor_callback=executor_callback)
|
||||
|
||||
def _dispatch_and_reply(self, incoming, executor_callback):
|
||||
try:
|
||||
|
@ -14,7 +14,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import threading
|
||||
|
||||
# eventlet 0.16 with monkey patching does not work yet on Python 3,
|
||||
@ -28,7 +27,6 @@ try:
|
||||
except ImportError:
|
||||
eventlet = None
|
||||
import testscenarios
|
||||
import testtools
|
||||
try:
|
||||
import trollius
|
||||
except ImportError:
|
||||
@ -45,6 +43,7 @@ try:
|
||||
except ImportError:
|
||||
impl_eventlet = None
|
||||
from oslo_messaging._executors import impl_thread
|
||||
from oslo_messaging import _utils as utils
|
||||
from oslo_messaging.tests import utils as test_utils
|
||||
from six.moves import mock
|
||||
|
||||
@ -106,7 +105,6 @@ class TestExecutor(test_utils.BaseTestCase):
|
||||
|
||||
@trollius.coroutine
|
||||
def simple_coroutine(value):
|
||||
yield None
|
||||
raise trollius.Return(value)
|
||||
|
||||
endpoint = mock.MagicMock(return_value=simple_coroutine('result'))
|
||||
@ -123,30 +121,29 @@ class TestExecutor(test_utils.BaseTestCase):
|
||||
self.endpoint = endpoint
|
||||
self.result = "not set"
|
||||
|
||||
@contextlib.contextmanager
|
||||
def __call__(self, incoming, executor_callback=None):
|
||||
if executor_callback is not None:
|
||||
def callback():
|
||||
result = executor_callback(self.endpoint,
|
||||
incoming.ctxt,
|
||||
incoming.message)
|
||||
self.result = result
|
||||
return result
|
||||
yield callback
|
||||
event.send()
|
||||
def callback(self, incoming, executor_callback):
|
||||
if executor_callback is None:
|
||||
result = self.endpoint(incoming.ctxt,
|
||||
incoming.message)
|
||||
else:
|
||||
def callback():
|
||||
result = self.endpoint(incoming.ctxt, incoming.message)
|
||||
self.result = result
|
||||
return result
|
||||
yield callback
|
||||
result = executor_callback(self.endpoint,
|
||||
incoming.ctxt,
|
||||
incoming.message)
|
||||
if is_aioeventlet:
|
||||
event.send()
|
||||
self.result = result
|
||||
return result
|
||||
|
||||
listener = mock.Mock(spec=['poll'])
|
||||
def __call__(self, incoming, executor_callback=None):
|
||||
return utils.DispatcherExecutorContext(incoming,
|
||||
self.callback,
|
||||
executor_callback)
|
||||
|
||||
listener = mock.Mock(spec=['poll', 'stop'])
|
||||
dispatcher = Dispatcher(endpoint)
|
||||
executor = self.executor(self.conf, listener, dispatcher)
|
||||
|
||||
incoming_message = mock.MagicMock(ctxt={},
|
||||
message={'payload': 'data'})
|
||||
incoming_message = mock.MagicMock(ctxt={}, message={'payload': 'data'})
|
||||
|
||||
def fake_poll(timeout=None):
|
||||
if is_aioeventlet:
|
||||
@ -167,60 +164,3 @@ class TestExecutor(test_utils.BaseTestCase):
|
||||
self.assertEqual(dispatcher.result, 'result')
|
||||
|
||||
TestExecutor.generate_scenarios()
|
||||
|
||||
|
||||
class ExceptedException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class EventletContextManagerSpawnTest(test_utils.BaseTestCase):
|
||||
@testtools.skipIf(impl_eventlet is None, "Eventlet not available")
|
||||
def setUp(self):
|
||||
super(EventletContextManagerSpawnTest, self).setUp()
|
||||
self.before = mock.Mock()
|
||||
self.callback = mock.Mock()
|
||||
self.after = mock.Mock()
|
||||
self.exception_call = mock.Mock()
|
||||
|
||||
@contextlib.contextmanager
|
||||
def context_mgr():
|
||||
self.before()
|
||||
try:
|
||||
yield lambda: self.callback()
|
||||
except ExceptedException:
|
||||
self.exception_call()
|
||||
self.after()
|
||||
|
||||
self.mgr = context_mgr()
|
||||
|
||||
def test_normal_run(self):
|
||||
thread = impl_eventlet.spawn_with(self.mgr, pool=eventlet)
|
||||
thread.wait()
|
||||
self.assertEqual(1, self.before.call_count)
|
||||
self.assertEqual(1, self.callback.call_count)
|
||||
self.assertEqual(1, self.after.call_count)
|
||||
self.assertEqual(0, self.exception_call.call_count)
|
||||
|
||||
def test_excepted_exception(self):
|
||||
self.callback.side_effect = ExceptedException
|
||||
thread = impl_eventlet.spawn_with(self.mgr, pool=eventlet)
|
||||
try:
|
||||
thread.wait()
|
||||
except ExceptedException:
|
||||
pass
|
||||
self.assertEqual(1, self.before.call_count)
|
||||
self.assertEqual(1, self.callback.call_count)
|
||||
self.assertEqual(1, self.after.call_count)
|
||||
self.assertEqual(1, self.exception_call.call_count)
|
||||
|
||||
def test_unexcepted_exception(self):
|
||||
self.callback.side_effect = Exception
|
||||
thread = impl_eventlet.spawn_with(self.mgr, pool=eventlet)
|
||||
try:
|
||||
thread.wait()
|
||||
except Exception:
|
||||
pass
|
||||
self.assertEqual(1, self.before.call_count)
|
||||
self.assertEqual(1, self.callback.call_count)
|
||||
self.assertEqual(0, self.after.call_count)
|
||||
self.assertEqual(0, self.exception_call.call_count)
|
||||
|
@ -107,8 +107,9 @@ class TestDispatcher(test_utils.BaseTestCase):
|
||||
sorted(dispatcher._targets_priorities))
|
||||
|
||||
incoming = mock.Mock(ctxt={}, message=msg)
|
||||
with dispatcher(incoming) as callback:
|
||||
callback()
|
||||
callback = dispatcher(incoming)
|
||||
callback.run()
|
||||
callback.done()
|
||||
|
||||
# check endpoint callbacks are called or not
|
||||
for i, endpoint_methods in enumerate(self.endpoints):
|
||||
@ -143,8 +144,9 @@ class TestDispatcher(test_utils.BaseTestCase):
|
||||
msg['priority'] = 'what???'
|
||||
dispatcher = notify_dispatcher.NotificationDispatcher(
|
||||
[mock.Mock()], [mock.Mock()], None, allow_requeue=True, pool=None)
|
||||
with dispatcher(mock.Mock(ctxt={}, message=msg)) as callback:
|
||||
callback()
|
||||
callback = dispatcher(mock.Mock(ctxt={}, message=msg))
|
||||
callback.run()
|
||||
callback.done()
|
||||
mylog.warning.assert_called_once_with('Unknown priority "%s"',
|
||||
'what???')
|
||||
|
||||
@ -244,8 +246,9 @@ class TestDispatcherFilter(test_utils.BaseTestCase):
|
||||
'timestamp': '2014-03-03 18:21:04.369234',
|
||||
'message_id': '99863dda-97f0-443a-a0c1-6ed317b7fd45'}
|
||||
incoming = mock.Mock(ctxt=self.context, message=message)
|
||||
with dispatcher(incoming) as callback:
|
||||
callback()
|
||||
callback = dispatcher(incoming)
|
||||
callback.run()
|
||||
callback.done()
|
||||
|
||||
if self.match:
|
||||
self.assertEqual(1, endpoint.info.call_count)
|
||||
|
@ -133,8 +133,9 @@ class TestDispatcher(test_utils.BaseTestCase):
|
||||
incoming = mock.Mock(ctxt=self.ctxt, message=self.msg)
|
||||
incoming.reply.side_effect = check_reply
|
||||
|
||||
with dispatcher(incoming) as callback:
|
||||
callback()
|
||||
callback = dispatcher(incoming)
|
||||
callback.run()
|
||||
callback.done()
|
||||
|
||||
for n, endpoint in enumerate(endpoints):
|
||||
for method_name in ['foo', 'bar']:
|
||||
|
@ -4,6 +4,7 @@
|
||||
|
||||
pbr<2.0,>=0.11
|
||||
|
||||
futurist>=0.1.1 # Apache-2.0
|
||||
oslo.config>=1.11.0 # Apache-2.0
|
||||
oslo.context>=0.2.0 # Apache-2.0
|
||||
oslo.log>=1.2.0 # Apache-2.0
|
||||
|
@ -107,8 +107,9 @@ class TestDispatcherScenario(test_utils.BaseTestCase):
|
||||
sorted(dispatcher._targets_priorities))
|
||||
|
||||
incoming = mock.Mock(ctxt={}, message=msg)
|
||||
with dispatcher(incoming) as callback:
|
||||
callback()
|
||||
callback = dispatcher(incoming)
|
||||
callback.run()
|
||||
callback.done()
|
||||
|
||||
# check endpoint callbacks are called or not
|
||||
for i, endpoint_methods in enumerate(self.endpoints):
|
||||
@ -146,8 +147,9 @@ class TestDispatcher(test_utils.BaseTestCase):
|
||||
msg['priority'] = 'what???'
|
||||
dispatcher = notify_dispatcher.NotificationDispatcher(
|
||||
[mock.Mock()], [mock.Mock()], None, allow_requeue=True, pool=None)
|
||||
with dispatcher(mock.Mock(ctxt={}, message=msg)) as callback:
|
||||
callback()
|
||||
callback = dispatcher(mock.Mock(ctxt={}, message=msg))
|
||||
callback.run()
|
||||
callback.done()
|
||||
mylog.warning.assert_called_once_with('Unknown priority "%s"',
|
||||
'what???')
|
||||
|
||||
@ -165,7 +167,8 @@ class TestDispatcher(test_utils.BaseTestCase):
|
||||
|
||||
incoming = mock.Mock(ctxt={}, message=msg)
|
||||
executor_callback = mock.Mock()
|
||||
with dispatcher(incoming, executor_callback) as callback:
|
||||
callback()
|
||||
callback = dispatcher(incoming, executor_callback)
|
||||
callback.run()
|
||||
callback.done()
|
||||
self.assertTrue(executor_callback.called)
|
||||
self.assertEqual(executor_callback.call_args[0][0], endpoint_method)
|
||||
|
@ -120,8 +120,9 @@ class TestDispatcher(test_utils.BaseTestCase):
|
||||
incoming = mock.Mock(ctxt=self.ctxt, message=self.msg)
|
||||
incoming.reply.side_effect = check_reply
|
||||
|
||||
with dispatcher(incoming) as callback:
|
||||
callback()
|
||||
callback = dispatcher(incoming)
|
||||
callback.run()
|
||||
callback.done()
|
||||
|
||||
for n, endpoint in enumerate(endpoints):
|
||||
for method_name in ['foo', 'bar']:
|
||||
|
Loading…
x
Reference in New Issue
Block a user