Merge "deprecated blocking executor"
This commit is contained in:
commit
aaaf58269c
oslo_messaging
@ -63,10 +63,7 @@ The message is acknowledged only if all endpoints either return
|
||||
oslo_messaging.NotificationResult.HANDLED or None.
|
||||
|
||||
Each notification listener is associated with an executor which controls how
|
||||
incoming notification messages will be received and dispatched. By default, the
|
||||
most simple executor is used - the blocking executor. This executor processes
|
||||
inbound notifications on the server's thread, blocking it from processing
|
||||
additional notifications until it finishes with the current one. Refer to the
|
||||
incoming notification messages will be received and dispatched. Refer to the
|
||||
Executor documentation for descriptions of the other types of executors.
|
||||
|
||||
*Note:* If the "eventlet" executor is used, the threading and time library need
|
||||
@ -213,8 +210,7 @@ def get_notification_listener(transport, targets, endpoints,
|
||||
"""Construct a notification listener
|
||||
|
||||
The executor parameter controls how incoming messages will be received and
|
||||
dispatched. By default, the most simple executor is used - the blocking
|
||||
executor.
|
||||
dispatched.
|
||||
|
||||
If the eventlet executor is used, the threading and time library need to be
|
||||
monkeypatched.
|
||||
@ -226,7 +222,7 @@ def get_notification_listener(transport, targets, endpoints,
|
||||
:param endpoints: a list of endpoint objects
|
||||
:type endpoints: list
|
||||
:param executor: name of message executor - available values are
|
||||
'eventlet', 'blocking' and 'threading'
|
||||
'eventlet' and 'threading'
|
||||
:type executor: str
|
||||
:param serializer: an optional entity serializer
|
||||
:type serializer: Serializer
|
||||
@ -249,8 +245,7 @@ def get_batch_notification_listener(transport, targets, endpoints,
|
||||
"""Construct a batch notification listener
|
||||
|
||||
The executor parameter controls how incoming messages will be received and
|
||||
dispatched. By default, the most simple executor is used - the blocking
|
||||
executor.
|
||||
dispatched.
|
||||
|
||||
If the eventlet executor is used, the threading and time library need to be
|
||||
monkeypatched.
|
||||
@ -262,7 +257,7 @@ def get_batch_notification_listener(transport, targets, endpoints,
|
||||
:param endpoints: a list of endpoint objects
|
||||
:type endpoints: list
|
||||
:param executor: name of message executor - available values are
|
||||
'eventlet', 'blocking' and 'threading'
|
||||
'eventlet' and 'threading'
|
||||
:type executor: str
|
||||
:param serializer: an optional entity serializer
|
||||
:type serializer: Serializer
|
||||
|
@ -47,12 +47,8 @@ Server will send the returned value back to the requesting client via the
|
||||
transport.
|
||||
|
||||
The executor parameter controls how incoming messages will be received and
|
||||
dispatched. By default, the most simple executor is used - the blocking
|
||||
executor. This executor processes inbound RPC requests on the server's thread,
|
||||
blocking it from processing additional requests until it finishes with the
|
||||
current request. This includes time spent sending the reply message to the
|
||||
transport if the method returns a result. Refer to the Executor documentation
|
||||
for descriptions of the other types of executors.
|
||||
dispatched. Refer to the Executor documentation for descriptions of the types
|
||||
of executors.
|
||||
|
||||
*Note:* If the "eventlet" executor is used, the threading and time library need
|
||||
to be monkeypatched.
|
||||
@ -105,7 +101,7 @@ A simple example of an RPC server with multiple endpoints might be::
|
||||
TestEndpoint(),
|
||||
]
|
||||
server = oslo_messaging.get_rpc_server(transport, target, endpoints,
|
||||
executor='blocking')
|
||||
executor='eventlet')
|
||||
try:
|
||||
server.start()
|
||||
while True:
|
||||
@ -199,7 +195,7 @@ def get_rpc_server(transport, target, endpoints,
|
||||
:param endpoints: a list of endpoint objects
|
||||
:type endpoints: list
|
||||
:param executor: name of message executor - available values are
|
||||
'eventlet', 'blocking' and 'threading'
|
||||
'eventlet' and 'threading'
|
||||
:type executor: str
|
||||
:param serializer: an optional entity serializer
|
||||
:type serializer: Serializer
|
||||
|
@ -30,6 +30,7 @@ import logging
|
||||
import threading
|
||||
import traceback
|
||||
|
||||
import debtcollector
|
||||
from oslo_config import cfg
|
||||
from oslo_service import service
|
||||
from oslo_utils import eventletutils
|
||||
@ -38,7 +39,7 @@ import six
|
||||
from stevedore import driver
|
||||
|
||||
from oslo_messaging._drivers import base as driver_base
|
||||
from oslo_messaging._i18n import _LW, _LI
|
||||
from oslo_messaging._i18n import _LW
|
||||
from oslo_messaging import exceptions
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -323,7 +324,7 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
|
||||
incoming request
|
||||
:type dispatcher: DispatcherBase
|
||||
:param executor: name of message executor - available values are
|
||||
'eventlet', 'blocking' and 'threading'
|
||||
'eventlet' and 'threading'
|
||||
:type executor: str
|
||||
"""
|
||||
self.conf = transport.conf
|
||||
@ -333,13 +334,11 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
|
||||
self.dispatcher = dispatcher
|
||||
self.executor_type = executor
|
||||
if self.executor_type == 'blocking':
|
||||
# NOTE(sileht): We keep blocking as default to not enforce the
|
||||
# application to use threading or eventlet. Because application
|
||||
# have to be preprepared accordingly for each one (monkeypatching,
|
||||
# threadsafe, ...)
|
||||
LOG.info(_LI("blocking executor handles only one message at "
|
||||
"once. threading or eventlet executor is "
|
||||
"recommended."))
|
||||
debtcollector.deprecate(
|
||||
'blocking executor is deprecated. Executor default will be '
|
||||
'removed. Use explicitly threading or eventlet instead',
|
||||
version="pike", removal_version="rocky",
|
||||
category=FutureWarning)
|
||||
elif self.executor_type == "eventlet":
|
||||
eventletutils.warn_eventlet_not_patched(
|
||||
expected_patched_modules=['thread'],
|
||||
|
@ -132,7 +132,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
|
||||
super(TestNotifyListener, self).setUp(conf=cfg.ConfigOpts())
|
||||
ListenerSetupMixin.setUp(self)
|
||||
|
||||
def test_constructor(self):
|
||||
@mock.patch('debtcollector.deprecate')
|
||||
def test_constructor(self, deprecate):
|
||||
transport = msg_notifier.get_notification_transport(
|
||||
self.conf, url='fake:')
|
||||
target = oslo_messaging.Target(topic='foo')
|
||||
@ -147,6 +148,11 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
|
||||
dispatcher.NotificationDispatcher)
|
||||
self.assertIs(listener.dispatcher.endpoints, endpoints)
|
||||
self.assertEqual('blocking', listener.executor_type)
|
||||
deprecate.assert_called_once_with(
|
||||
'blocking executor is deprecated. Executor default will be '
|
||||
'removed. Use explicitly threading or eventlet instead',
|
||||
removal_version='rocky', version='pike',
|
||||
category=FutureWarning)
|
||||
|
||||
def test_no_target_topic(self):
|
||||
transport = msg_notifier.get_notification_transport(
|
||||
|
@ -112,40 +112,52 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
|
||||
def setUp(self):
|
||||
super(TestRPCServer, self).setUp(conf=cfg.ConfigOpts())
|
||||
|
||||
def test_constructor(self):
|
||||
@mock.patch('warnings.warn')
|
||||
def test_constructor(self, warn):
|
||||
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
|
||||
target = oslo_messaging.Target(topic='foo', server='bar')
|
||||
endpoints = [object()]
|
||||
serializer = object()
|
||||
access_policy = dispatcher.DefaultRPCAccessPolicy
|
||||
|
||||
with warnings.catch_warnings(record=True) as capture:
|
||||
warnings.simplefilter("always", FutureWarning)
|
||||
server = oslo_messaging.get_rpc_server(transport,
|
||||
target,
|
||||
endpoints,
|
||||
serializer=serializer,
|
||||
access_policy=access_policy)
|
||||
self.assertEqual(0, len(capture))
|
||||
warnings.simplefilter("always", FutureWarning)
|
||||
server = oslo_messaging.get_rpc_server(transport,
|
||||
target,
|
||||
endpoints,
|
||||
serializer=serializer,
|
||||
access_policy=access_policy)
|
||||
self.assertIs(server.conf, self.conf)
|
||||
self.assertIs(server.transport, transport)
|
||||
self.assertIsInstance(server.dispatcher, oslo_messaging.RPCDispatcher)
|
||||
self.assertIs(server.dispatcher.endpoints, endpoints)
|
||||
self.assertIs(server.dispatcher.serializer, serializer)
|
||||
self.assertEqual('blocking', server.executor_type)
|
||||
self.assertEqual([
|
||||
mock.call("blocking executor is deprecated. Executor default will "
|
||||
"be removed. Use explicitly threading or eventlet "
|
||||
"instead in version 'pike' and will be removed in "
|
||||
"version 'rocky'",
|
||||
category=FutureWarning, stacklevel=3)
|
||||
], warn.mock_calls)
|
||||
|
||||
def test_constructor_without_explicit_RPCAccessPolicy(self):
|
||||
@mock.patch('warnings.warn')
|
||||
def test_constructor_without_explicit_RPCAccessPolicy(self, warn):
|
||||
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
|
||||
target = oslo_messaging.Target(topic='foo', server='bar')
|
||||
endpoints = [object()]
|
||||
serializer = object()
|
||||
with warnings.catch_warnings(record=True) as capture:
|
||||
warnings.simplefilter("always", FutureWarning)
|
||||
oslo_messaging.get_rpc_server(transport, target,
|
||||
endpoints, serializer=serializer)
|
||||
self.assertEqual(1, len(capture))
|
||||
w = capture[0]
|
||||
self.assertEqual(FutureWarning, w.category)
|
||||
|
||||
warnings.simplefilter("always", FutureWarning)
|
||||
oslo_messaging.get_rpc_server(transport, target,
|
||||
endpoints, serializer=serializer)
|
||||
self.assertEqual([
|
||||
mock.call(mock.ANY, category=FutureWarning, stacklevel=3),
|
||||
mock.call("blocking executor is deprecated. Executor default will "
|
||||
"be removed. Use explicitly threading or eventlet "
|
||||
"instead in version 'pike' and will be removed in "
|
||||
"version 'rocky'",
|
||||
category=FutureWarning, stacklevel=3)
|
||||
], warn.mock_calls)
|
||||
|
||||
def test_server_wait_method(self):
|
||||
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
|
||||
|
Loading…
x
Reference in New Issue
Block a user