deprecated blocking executor
Using blocking executor is not recommended for application. But it looks like some use it just because it's the default and are not aware their should change it despite of documentation and logging message. Choosing the application threading model is an important step of an application. This change deprecates it, in the future we will just make executor mandatory. This will ensure that application make a choice. Also this will reduce headache of oslo.messaging developers to make the driver code working in a sync and async. And to finish test coverage of blocking executor is 0%... This rework some tests to remove logging.captureWarnings() that can catch unwanted warning of other tests. Tests mocks warning instead. Related-bug: #694728 Change-Id: Ic67164d12e7a9bed76d6e64ca2ced12e3984ff5f
This commit is contained in:
parent
8b44bfd538
commit
2566be199a
oslo_messaging
@ -63,10 +63,7 @@ The message is acknowledged only if all endpoints either return
|
|||||||
oslo_messaging.NotificationResult.HANDLED or None.
|
oslo_messaging.NotificationResult.HANDLED or None.
|
||||||
|
|
||||||
Each notification listener is associated with an executor which controls how
|
Each notification listener is associated with an executor which controls how
|
||||||
incoming notification messages will be received and dispatched. By default, the
|
incoming notification messages will be received and dispatched. Refer to the
|
||||||
most simple executor is used - the blocking executor. This executor processes
|
|
||||||
inbound notifications on the server's thread, blocking it from processing
|
|
||||||
additional notifications until it finishes with the current one. Refer to the
|
|
||||||
Executor documentation for descriptions of the other types of executors.
|
Executor documentation for descriptions of the other types of executors.
|
||||||
|
|
||||||
*Note:* If the "eventlet" executor is used, the threading and time library need
|
*Note:* If the "eventlet" executor is used, the threading and time library need
|
||||||
@ -213,8 +210,7 @@ def get_notification_listener(transport, targets, endpoints,
|
|||||||
"""Construct a notification listener
|
"""Construct a notification listener
|
||||||
|
|
||||||
The executor parameter controls how incoming messages will be received and
|
The executor parameter controls how incoming messages will be received and
|
||||||
dispatched. By default, the most simple executor is used - the blocking
|
dispatched.
|
||||||
executor.
|
|
||||||
|
|
||||||
If the eventlet executor is used, the threading and time library need to be
|
If the eventlet executor is used, the threading and time library need to be
|
||||||
monkeypatched.
|
monkeypatched.
|
||||||
@ -226,7 +222,7 @@ def get_notification_listener(transport, targets, endpoints,
|
|||||||
:param endpoints: a list of endpoint objects
|
:param endpoints: a list of endpoint objects
|
||||||
:type endpoints: list
|
:type endpoints: list
|
||||||
:param executor: name of message executor - available values are
|
:param executor: name of message executor - available values are
|
||||||
'eventlet', 'blocking' and 'threading'
|
'eventlet' and 'threading'
|
||||||
:type executor: str
|
:type executor: str
|
||||||
:param serializer: an optional entity serializer
|
:param serializer: an optional entity serializer
|
||||||
:type serializer: Serializer
|
:type serializer: Serializer
|
||||||
@ -249,8 +245,7 @@ def get_batch_notification_listener(transport, targets, endpoints,
|
|||||||
"""Construct a batch notification listener
|
"""Construct a batch notification listener
|
||||||
|
|
||||||
The executor parameter controls how incoming messages will be received and
|
The executor parameter controls how incoming messages will be received and
|
||||||
dispatched. By default, the most simple executor is used - the blocking
|
dispatched.
|
||||||
executor.
|
|
||||||
|
|
||||||
If the eventlet executor is used, the threading and time library need to be
|
If the eventlet executor is used, the threading and time library need to be
|
||||||
monkeypatched.
|
monkeypatched.
|
||||||
@ -262,7 +257,7 @@ def get_batch_notification_listener(transport, targets, endpoints,
|
|||||||
:param endpoints: a list of endpoint objects
|
:param endpoints: a list of endpoint objects
|
||||||
:type endpoints: list
|
:type endpoints: list
|
||||||
:param executor: name of message executor - available values are
|
:param executor: name of message executor - available values are
|
||||||
'eventlet', 'blocking' and 'threading'
|
'eventlet' and 'threading'
|
||||||
:type executor: str
|
:type executor: str
|
||||||
:param serializer: an optional entity serializer
|
:param serializer: an optional entity serializer
|
||||||
:type serializer: Serializer
|
:type serializer: Serializer
|
||||||
|
@ -47,12 +47,8 @@ Server will send the returned value back to the requesting client via the
|
|||||||
transport.
|
transport.
|
||||||
|
|
||||||
The executor parameter controls how incoming messages will be received and
|
The executor parameter controls how incoming messages will be received and
|
||||||
dispatched. By default, the most simple executor is used - the blocking
|
dispatched. Refer to the Executor documentation for descriptions of the types
|
||||||
executor. This executor processes inbound RPC requests on the server's thread,
|
of executors.
|
||||||
blocking it from processing additional requests until it finishes with the
|
|
||||||
current request. This includes time spent sending the reply message to the
|
|
||||||
transport if the method returns a result. Refer to the Executor documentation
|
|
||||||
for descriptions of the other types of executors.
|
|
||||||
|
|
||||||
*Note:* If the "eventlet" executor is used, the threading and time library need
|
*Note:* If the "eventlet" executor is used, the threading and time library need
|
||||||
to be monkeypatched.
|
to be monkeypatched.
|
||||||
@ -105,7 +101,7 @@ A simple example of an RPC server with multiple endpoints might be::
|
|||||||
TestEndpoint(),
|
TestEndpoint(),
|
||||||
]
|
]
|
||||||
server = oslo_messaging.get_rpc_server(transport, target, endpoints,
|
server = oslo_messaging.get_rpc_server(transport, target, endpoints,
|
||||||
executor='blocking')
|
executor='eventlet')
|
||||||
try:
|
try:
|
||||||
server.start()
|
server.start()
|
||||||
while True:
|
while True:
|
||||||
@ -199,7 +195,7 @@ def get_rpc_server(transport, target, endpoints,
|
|||||||
:param endpoints: a list of endpoint objects
|
:param endpoints: a list of endpoint objects
|
||||||
:type endpoints: list
|
:type endpoints: list
|
||||||
:param executor: name of message executor - available values are
|
:param executor: name of message executor - available values are
|
||||||
'eventlet', 'blocking' and 'threading'
|
'eventlet' and 'threading'
|
||||||
:type executor: str
|
:type executor: str
|
||||||
:param serializer: an optional entity serializer
|
:param serializer: an optional entity serializer
|
||||||
:type serializer: Serializer
|
:type serializer: Serializer
|
||||||
|
@ -30,6 +30,7 @@ import logging
|
|||||||
import threading
|
import threading
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
|
import debtcollector
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_service import service
|
from oslo_service import service
|
||||||
from oslo_utils import eventletutils
|
from oslo_utils import eventletutils
|
||||||
@ -38,7 +39,7 @@ import six
|
|||||||
from stevedore import driver
|
from stevedore import driver
|
||||||
|
|
||||||
from oslo_messaging._drivers import base as driver_base
|
from oslo_messaging._drivers import base as driver_base
|
||||||
from oslo_messaging._i18n import _LW, _LI
|
from oslo_messaging._i18n import _LW
|
||||||
from oslo_messaging import exceptions
|
from oslo_messaging import exceptions
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
@ -323,7 +324,7 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
|
|||||||
incoming request
|
incoming request
|
||||||
:type dispatcher: DispatcherBase
|
:type dispatcher: DispatcherBase
|
||||||
:param executor: name of message executor - available values are
|
:param executor: name of message executor - available values are
|
||||||
'eventlet', 'blocking' and 'threading'
|
'eventlet' and 'threading'
|
||||||
:type executor: str
|
:type executor: str
|
||||||
"""
|
"""
|
||||||
self.conf = transport.conf
|
self.conf = transport.conf
|
||||||
@ -333,13 +334,11 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
|
|||||||
self.dispatcher = dispatcher
|
self.dispatcher = dispatcher
|
||||||
self.executor_type = executor
|
self.executor_type = executor
|
||||||
if self.executor_type == 'blocking':
|
if self.executor_type == 'blocking':
|
||||||
# NOTE(sileht): We keep blocking as default to not enforce the
|
debtcollector.deprecate(
|
||||||
# application to use threading or eventlet. Because application
|
'blocking executor is deprecated. Executor default will be '
|
||||||
# have to be preprepared accordingly for each one (monkeypatching,
|
'removed. Use explicitly threading or eventlet instead',
|
||||||
# threadsafe, ...)
|
version="pike", removal_version="rocky",
|
||||||
LOG.info(_LI("blocking executor handles only one message at "
|
category=FutureWarning)
|
||||||
"once. threading or eventlet executor is "
|
|
||||||
"recommended."))
|
|
||||||
elif self.executor_type == "eventlet":
|
elif self.executor_type == "eventlet":
|
||||||
eventletutils.warn_eventlet_not_patched(
|
eventletutils.warn_eventlet_not_patched(
|
||||||
expected_patched_modules=['thread'],
|
expected_patched_modules=['thread'],
|
||||||
|
@ -132,7 +132,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
|
|||||||
super(TestNotifyListener, self).setUp(conf=cfg.ConfigOpts())
|
super(TestNotifyListener, self).setUp(conf=cfg.ConfigOpts())
|
||||||
ListenerSetupMixin.setUp(self)
|
ListenerSetupMixin.setUp(self)
|
||||||
|
|
||||||
def test_constructor(self):
|
@mock.patch('debtcollector.deprecate')
|
||||||
|
def test_constructor(self, deprecate):
|
||||||
transport = msg_notifier.get_notification_transport(
|
transport = msg_notifier.get_notification_transport(
|
||||||
self.conf, url='fake:')
|
self.conf, url='fake:')
|
||||||
target = oslo_messaging.Target(topic='foo')
|
target = oslo_messaging.Target(topic='foo')
|
||||||
@ -147,6 +148,11 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
|
|||||||
dispatcher.NotificationDispatcher)
|
dispatcher.NotificationDispatcher)
|
||||||
self.assertIs(listener.dispatcher.endpoints, endpoints)
|
self.assertIs(listener.dispatcher.endpoints, endpoints)
|
||||||
self.assertEqual('blocking', listener.executor_type)
|
self.assertEqual('blocking', listener.executor_type)
|
||||||
|
deprecate.assert_called_once_with(
|
||||||
|
'blocking executor is deprecated. Executor default will be '
|
||||||
|
'removed. Use explicitly threading or eventlet instead',
|
||||||
|
removal_version='rocky', version='pike',
|
||||||
|
category=FutureWarning)
|
||||||
|
|
||||||
def test_no_target_topic(self):
|
def test_no_target_topic(self):
|
||||||
transport = msg_notifier.get_notification_transport(
|
transport = msg_notifier.get_notification_transport(
|
||||||
|
@ -112,40 +112,52 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
|
|||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestRPCServer, self).setUp(conf=cfg.ConfigOpts())
|
super(TestRPCServer, self).setUp(conf=cfg.ConfigOpts())
|
||||||
|
|
||||||
def test_constructor(self):
|
@mock.patch('warnings.warn')
|
||||||
|
def test_constructor(self, warn):
|
||||||
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
|
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
|
||||||
target = oslo_messaging.Target(topic='foo', server='bar')
|
target = oslo_messaging.Target(topic='foo', server='bar')
|
||||||
endpoints = [object()]
|
endpoints = [object()]
|
||||||
serializer = object()
|
serializer = object()
|
||||||
access_policy = dispatcher.DefaultRPCAccessPolicy
|
access_policy = dispatcher.DefaultRPCAccessPolicy
|
||||||
|
|
||||||
with warnings.catch_warnings(record=True) as capture:
|
warnings.simplefilter("always", FutureWarning)
|
||||||
warnings.simplefilter("always", FutureWarning)
|
server = oslo_messaging.get_rpc_server(transport,
|
||||||
server = oslo_messaging.get_rpc_server(transport,
|
target,
|
||||||
target,
|
endpoints,
|
||||||
endpoints,
|
serializer=serializer,
|
||||||
serializer=serializer,
|
access_policy=access_policy)
|
||||||
access_policy=access_policy)
|
|
||||||
self.assertEqual(0, len(capture))
|
|
||||||
self.assertIs(server.conf, self.conf)
|
self.assertIs(server.conf, self.conf)
|
||||||
self.assertIs(server.transport, transport)
|
self.assertIs(server.transport, transport)
|
||||||
self.assertIsInstance(server.dispatcher, oslo_messaging.RPCDispatcher)
|
self.assertIsInstance(server.dispatcher, oslo_messaging.RPCDispatcher)
|
||||||
self.assertIs(server.dispatcher.endpoints, endpoints)
|
self.assertIs(server.dispatcher.endpoints, endpoints)
|
||||||
self.assertIs(server.dispatcher.serializer, serializer)
|
self.assertIs(server.dispatcher.serializer, serializer)
|
||||||
self.assertEqual('blocking', server.executor_type)
|
self.assertEqual('blocking', server.executor_type)
|
||||||
|
self.assertEqual([
|
||||||
|
mock.call("blocking executor is deprecated. Executor default will "
|
||||||
|
"be removed. Use explicitly threading or eventlet "
|
||||||
|
"instead in version 'pike' and will be removed in "
|
||||||
|
"version 'rocky'",
|
||||||
|
category=FutureWarning, stacklevel=3)
|
||||||
|
], warn.mock_calls)
|
||||||
|
|
||||||
def test_constructor_without_explicit_RPCAccessPolicy(self):
|
@mock.patch('warnings.warn')
|
||||||
|
def test_constructor_without_explicit_RPCAccessPolicy(self, warn):
|
||||||
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
|
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
|
||||||
target = oslo_messaging.Target(topic='foo', server='bar')
|
target = oslo_messaging.Target(topic='foo', server='bar')
|
||||||
endpoints = [object()]
|
endpoints = [object()]
|
||||||
serializer = object()
|
serializer = object()
|
||||||
with warnings.catch_warnings(record=True) as capture:
|
|
||||||
warnings.simplefilter("always", FutureWarning)
|
warnings.simplefilter("always", FutureWarning)
|
||||||
oslo_messaging.get_rpc_server(transport, target,
|
oslo_messaging.get_rpc_server(transport, target,
|
||||||
endpoints, serializer=serializer)
|
endpoints, serializer=serializer)
|
||||||
self.assertEqual(1, len(capture))
|
self.assertEqual([
|
||||||
w = capture[0]
|
mock.call(mock.ANY, category=FutureWarning, stacklevel=3),
|
||||||
self.assertEqual(FutureWarning, w.category)
|
mock.call("blocking executor is deprecated. Executor default will "
|
||||||
|
"be removed. Use explicitly threading or eventlet "
|
||||||
|
"instead in version 'pike' and will be removed in "
|
||||||
|
"version 'rocky'",
|
||||||
|
category=FutureWarning, stacklevel=3)
|
||||||
|
], warn.mock_calls)
|
||||||
|
|
||||||
def test_server_wait_method(self):
|
def test_server_wait_method(self):
|
||||||
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
|
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
|
||||||
|
Loading…
x
Reference in New Issue
Block a user