Remove the deprecated blocking executor

The blocking executor has been deprecated in Pike and marked for removal
in Rocky, but some user like Mistral asked us to wait before. We decided
to remove this executor for Train or next cycle, now we are in the
Ussuri and after some researchs on usage I think we can go ahead.

This patch drop the deprecation warnings, related unit tests and
set the server with the threading executor is the default executor.

Change-Id: If07bab61ee2b148658b88be98b12f8539f274efe
Closes-Bug: #1715141
This commit is contained in:
Hervé Beraud 2020-01-17 16:24:29 +01:00
parent 3359c520d3
commit fed48aea44
8 changed files with 83 additions and 57 deletions

View File

@ -13,6 +13,12 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import logging
from oslo_utils import eventletutils
LOG = logging.getLogger(__name__)
def version_is_compatible(imp_version, version): def version_is_compatible(imp_version, version):
"""Determine whether versions are compatible. """Determine whether versions are compatible.
@ -59,3 +65,11 @@ class DummyLock(object):
def __exit__(self, type, value, traceback): def __exit__(self, type, value, traceback):
self.release() self.release()
def get_executor_with_context():
if eventletutils.is_monkey_patched('thread'):
LOG.debug("Threading is patched, using an eventlet executor")
return 'eventlet'
LOG.debug("Using a threading executor")
return 'threading'

View File

@ -143,7 +143,7 @@ LOG = logging.getLogger(__name__)
class NotificationServerBase(msg_server.MessageHandlingServer): class NotificationServerBase(msg_server.MessageHandlingServer):
def __init__(self, transport, targets, dispatcher, executor='blocking', def __init__(self, transport, targets, dispatcher, executor=None,
allow_requeue=True, pool=None, batch_size=1, allow_requeue=True, pool=None, batch_size=1,
batch_timeout=None): batch_timeout=None):
super(NotificationServerBase, self).__init__(transport, dispatcher, super(NotificationServerBase, self).__init__(transport, dispatcher,
@ -167,7 +167,7 @@ class NotificationServerBase(msg_server.MessageHandlingServer):
class NotificationServer(NotificationServerBase): class NotificationServer(NotificationServerBase):
def __init__(self, transport, targets, dispatcher, executor='blocking', def __init__(self, transport, targets, dispatcher, executor=None,
allow_requeue=True, pool=None): allow_requeue=True, pool=None):
if not isinstance(transport, msg_transport.NotificationTransport): if not isinstance(transport, msg_transport.NotificationTransport):
LOG.warning("Using RPC transport for notifications. Please use " LOG.warning("Using RPC transport for notifications. Please use "
@ -216,7 +216,7 @@ class BatchNotificationServer(NotificationServerBase):
def get_notification_listener(transport, targets, endpoints, def get_notification_listener(transport, targets, endpoints,
executor='blocking', serializer=None, executor=None, serializer=None,
allow_requeue=False, pool=None): allow_requeue=False, pool=None):
"""Construct a notification listener """Construct a notification listener
@ -250,7 +250,7 @@ def get_notification_listener(transport, targets, endpoints,
def get_batch_notification_listener(transport, targets, endpoints, def get_batch_notification_listener(transport, targets, endpoints,
executor='blocking', serializer=None, executor=None, serializer=None,
allow_requeue=False, pool=None, allow_requeue=False, pool=None,
batch_size=None, batch_timeout=None): batch_size=None, batch_timeout=None):
"""Construct a batch notification listener """Construct a batch notification listener

View File

@ -138,7 +138,7 @@ LOG = logging.getLogger(__name__)
class RPCServer(msg_server.MessageHandlingServer): class RPCServer(msg_server.MessageHandlingServer):
def __init__(self, transport, target, dispatcher, executor='blocking'): def __init__(self, transport, target, dispatcher, executor=None):
super(RPCServer, self).__init__(transport, dispatcher, executor) super(RPCServer, self).__init__(transport, dispatcher, executor)
if not isinstance(transport, msg_transport.RPCTransport): if not isinstance(transport, msg_transport.RPCTransport):
LOG.warning("Using notification transport for RPC. Please use " LOG.warning("Using notification transport for RPC. Please use "
@ -200,7 +200,7 @@ class RPCServer(msg_server.MessageHandlingServer):
def get_rpc_server(transport, target, endpoints, def get_rpc_server(transport, target, endpoints,
executor='blocking', serializer=None, access_policy=None): executor=None, serializer=None, access_policy=None):
"""Construct an RPC server. """Construct an RPC server.
:param transport: the messaging transport :param transport: the messaging transport

View File

@ -23,7 +23,6 @@ import logging
import threading import threading
import traceback import traceback
import debtcollector
from oslo_config import cfg from oslo_config import cfg
from oslo_service import service from oslo_service import service
from oslo_utils import eventletutils from oslo_utils import eventletutils
@ -32,6 +31,7 @@ import six
from stevedore import driver from stevedore import driver
from oslo_messaging._drivers import base as driver_base from oslo_messaging._drivers import base as driver_base
from oslo_messaging import _utils as utils
from oslo_messaging import exceptions from oslo_messaging import exceptions
__all__ = [ __all__ = [
@ -306,16 +306,17 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
new tasks. new tasks.
""" """
def __init__(self, transport, dispatcher, executor='blocking'): def __init__(self, transport, dispatcher, executor=None):
"""Construct a message handling server. """Construct a message handling server.
The dispatcher parameter is a DispatcherBase instance which is used The dispatcher parameter is a DispatcherBase instance which is used
for routing request to endpoint for processing. for routing request to endpoint for processing.
The executor parameter controls how incoming messages will be received The executor parameter controls how incoming messages will be received
and dispatched. By default, the most simple executor is used - the and dispatched. Executor is automatically detected from
blocking executor. It handles only one message at once. It's execution environment.
recommended to use threading or eventlet. It handles many message in parallel. If your application need
asynchronism then you need to consider to use the eventlet executor.
:param transport: the messaging transport :param transport: the messaging transport
:type transport: Transport :type transport: Transport
@ -326,19 +327,20 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
'eventlet' and 'threading' 'eventlet' and 'threading'
:type executor: str :type executor: str
""" """
if executor and executor not in ("threading", "eventlet"):
raise ExecutorLoadFailure(
executor,
"Executor should be None or 'eventlet' and 'threading'")
if not executor:
executor = utils.get_executor_with_context()
self.conf = transport.conf self.conf = transport.conf
self.conf.register_opts(_pool_opts) self.conf.register_opts(_pool_opts)
self.transport = transport self.transport = transport
self.dispatcher = dispatcher self.dispatcher = dispatcher
self.executor_type = executor self.executor_type = executor
if self.executor_type == 'blocking': if self.executor_type == "eventlet":
debtcollector.deprecate(
'blocking executor is deprecated. Executor default will be '
'removed. Use explicitly threading or eventlet instead',
version="pike", removal_version="rocky",
category=FutureWarning)
elif self.executor_type == "eventlet":
eventletutils.warn_eventlet_not_patched( eventletutils.warn_eventlet_not_patched(
expected_patched_modules=['thread'], expected_patched_modules=['thread'],
what="the 'oslo.messaging eventlet executor'") what="the 'oslo.messaging eventlet executor'")
@ -403,7 +405,6 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
executor_opts = {} executor_opts = {}
if self.executor_type in ("threading", "eventlet"):
executor_opts["max_workers"] = ( executor_opts["max_workers"] = (
override_pool_size or self.conf.executor_thread_pool_size override_pool_size or self.conf.executor_thread_pool_size
) )

View File

@ -135,27 +135,21 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
'oslo_messaging._drivers.impl_fake.FakeExchangeManager._exchanges', 'oslo_messaging._drivers.impl_fake.FakeExchangeManager._exchanges',
new_value={})) new_value={}))
@mock.patch('debtcollector.deprecate') def test_constructor(self):
def test_constructor(self, deprecate):
transport = msg_notifier.get_notification_transport( transport = msg_notifier.get_notification_transport(
self.conf, url='fake:') self.conf, url='fake:')
target = oslo_messaging.Target(topic='foo') target = oslo_messaging.Target(topic='foo')
endpoints = [object()] endpoints = [object()]
listener = oslo_messaging.get_notification_listener( listener = oslo_messaging.get_notification_listener(
transport, [target], endpoints) transport, [target], endpoints, executor='threading')
self.assertIs(listener.conf, self.conf) self.assertIs(listener.conf, self.conf)
self.assertIs(listener.transport, transport) self.assertIs(listener.transport, transport)
self.assertIsInstance(listener.dispatcher, self.assertIsInstance(listener.dispatcher,
dispatcher.NotificationDispatcher) dispatcher.NotificationDispatcher)
self.assertIs(listener.dispatcher.endpoints, endpoints) self.assertIs(listener.dispatcher.endpoints, endpoints)
self.assertEqual('blocking', listener.executor_type) self.assertEqual('threading', listener.executor_type)
deprecate.assert_called_once_with(
'blocking executor is deprecated. Executor default will be '
'removed. Use explicitly threading or eventlet instead',
removal_version='rocky', version='pike',
category=FutureWarning)
def test_no_target_topic(self): def test_no_target_topic(self):
transport = msg_notifier.get_notification_transport( transport = msg_notifier.get_notification_transport(

View File

@ -14,7 +14,6 @@
# under the License. # under the License.
import threading import threading
import warnings
import eventlet import eventlet
import fixtures import fixtures
@ -120,51 +119,62 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
'oslo_messaging._drivers.impl_fake.FakeExchangeManager._exchanges', 'oslo_messaging._drivers.impl_fake.FakeExchangeManager._exchanges',
new_value={})) new_value={}))
@mock.patch('warnings.warn') def test_constructor(self):
def test_constructor(self, warn):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(topic='foo', server='bar') target = oslo_messaging.Target(topic='foo', server='bar')
endpoints = [object()] endpoints = [object()]
serializer = object() serializer = object()
access_policy = dispatcher.DefaultRPCAccessPolicy access_policy = dispatcher.DefaultRPCAccessPolicy
warnings.simplefilter("always", FutureWarning)
server = oslo_messaging.get_rpc_server(transport, server = oslo_messaging.get_rpc_server(transport,
target, target,
endpoints, endpoints,
serializer=serializer, serializer=serializer,
access_policy=access_policy) access_policy=access_policy,
executor='threading')
self.assertIs(server.conf, self.conf) self.assertIs(server.conf, self.conf)
self.assertIs(server.transport, transport) self.assertIs(server.transport, transport)
self.assertIsInstance(server.dispatcher, oslo_messaging.RPCDispatcher) self.assertIsInstance(server.dispatcher, oslo_messaging.RPCDispatcher)
self.assertIs(server.dispatcher.endpoints, endpoints) self.assertIs(server.dispatcher.endpoints, endpoints)
self.assertIs(server.dispatcher.serializer, serializer) self.assertIs(server.dispatcher.serializer, serializer)
self.assertEqual('blocking', server.executor_type) self.assertEqual('threading', server.executor_type)
self.assertEqual([
mock.call("blocking executor is deprecated. Executor default will "
"be removed. Use explicitly threading or eventlet "
"instead in version 'pike' and will be removed in "
"version 'rocky'",
category=FutureWarning, stacklevel=3)
], warn.mock_calls)
@mock.patch('warnings.warn') def test_constructor_with_eventlet_executor(self):
def test_constructor_without_explicit_RPCAccessPolicy(self, warn):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(topic='foo', server='bar') target = oslo_messaging.Target(topic='foo', server='bar')
endpoints = [object()] endpoints = [object()]
serializer = object() serializer = object()
access_policy = dispatcher.DefaultRPCAccessPolicy
warnings.simplefilter("always", FutureWarning) server = oslo_messaging.get_rpc_server(transport,
oslo_messaging.get_rpc_server(transport, target, target,
endpoints, serializer=serializer) endpoints,
self.assertEqual([ serializer=serializer,
mock.call("blocking executor is deprecated. Executor default will " access_policy=access_policy,
"be removed. Use explicitly threading or eventlet " executor='eventlet')
"instead in version 'pike' and will be removed in " self.assertIs(server.conf, self.conf)
"version 'rocky'", self.assertIs(server.transport, transport)
category=FutureWarning, stacklevel=3) self.assertIsInstance(server.dispatcher, oslo_messaging.RPCDispatcher)
], warn.mock_calls) self.assertIs(server.dispatcher.endpoints, endpoints)
self.assertIs(server.dispatcher.serializer, serializer)
self.assertEqual('eventlet', server.executor_type)
def test_constructor_with_unrecognized_executor(self):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(topic='foo', server='bar')
endpoints = [object()]
serializer = object()
access_policy = dispatcher.DefaultRPCAccessPolicy
self.assertRaises(
server_module.ExecutorLoadFailure,
oslo_messaging.get_rpc_server,
transport=transport,
target=target,
endpoints=endpoints,
serializer=serializer,
access_policy=access_policy,
executor='boom')
def test_server_wait_method(self): def test_server_wait_method(self):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')

View File

@ -0,0 +1,8 @@
---
upgrade:
- |
The blocking executor has been deprecated for removal in Rocky and support
is now dropped in Ussuri. Its usage was never recommended for applications,
and it has no test coverage.
Applications should choose the appropriate threading model that maps to
their usage instead.

View File

@ -51,7 +51,6 @@ oslo.messaging.drivers =
fake = oslo_messaging._drivers.impl_fake:FakeDriver fake = oslo_messaging._drivers.impl_fake:FakeDriver
oslo.messaging.executors = oslo.messaging.executors =
blocking = futurist:SynchronousExecutor
eventlet = futurist:GreenThreadPoolExecutor eventlet = futurist:GreenThreadPoolExecutor
threading = futurist:ThreadPoolExecutor threading = futurist:ThreadPoolExecutor