diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index 529063def..21118d790 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -32,8 +32,6 @@ from oslo_messaging import server RPCException = rpc_common.RPCException _MATCHMAKER_BACKENDS = ('redis', 'dummy') _MATCHMAKER_DEFAULT = 'redis' -_CONCURRENCY_CHOICES = ('eventlet', 'native') -_CONCURRENCY_DEFAULT = 'eventlet' LOG = logging.getLogger(__name__) @@ -48,10 +46,6 @@ zmq_opts = [ choices=_MATCHMAKER_BACKENDS, help='MatchMaker driver.'), - cfg.StrOpt('rpc_zmq_concurrency', default=_CONCURRENCY_DEFAULT, - choices=_CONCURRENCY_CHOICES, - help='Type of concurrency used. Either "native" or "eventlet"'), - cfg.IntOpt('rpc_zmq_contexts', default=1, help='Number of ZeroMQ contexts, defaults to 1.'), diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_proxy.py index 12b6e969e..4ee368813 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_proxy.py @@ -19,7 +19,7 @@ from stevedore import driver from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._i18n import _LI -zmq = zmq_async.import_zmq(zmq_concurrency='native') +zmq = zmq_async.import_zmq() LOG = logging.getLogger(__name__) diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py index 215f0a347..9d1c411ca 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py @@ -24,7 +24,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_socket from oslo_messaging._i18n import _LI -zmq = zmq_async.import_zmq(zmq_concurrency='native') +zmq = zmq_async.import_zmq() LOG = logging.getLogger(__name__) @@ -35,7 +35,7 @@ class UniversalQueueProxy(object): self.context = context super(UniversalQueueProxy, self).__init__() self.matchmaker = matchmaker - self.poller = zmq_async.get_poller(zmq_concurrency='native') + self.poller = zmq_async.get_poller() self.fe_router_socket = zmq_socket.ZmqRandomPortSocket( conf, context, zmq.ROUTER) diff --git a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py index 85f142d8d..41677f655 100644 --- a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py @@ -15,21 +15,13 @@ import logging import threading -from oslo_utils import eventletutils - from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_poller -zmq = zmq_async.import_zmq(zmq_concurrency='native') +zmq = zmq_async.import_zmq() LOG = logging.getLogger(__name__) -_threading = threading - -if eventletutils.EVENTLET_AVAILABLE: - import eventlet - _threading = eventlet.patcher.original('threading') - class ThreadingPoller(zmq_poller.ZmqPoller): @@ -69,8 +61,8 @@ class ThreadingExecutor(zmq_poller.Executor): def __init__(self, method): self._method = method super(ThreadingExecutor, self).__init__( - _threading.Thread(target=self._loop)) - self._stop = _threading.Event() + threading.Thread(target=self._loop)) + self._stop = threading.Event() def _loop(self): while not self._stop.is_set(): diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_async.py b/oslo_messaging/_drivers/zmq_driver/zmq_async.py index 259f51248..a24805974 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_async.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_async.py @@ -12,30 +12,20 @@ # License for the specific language governing permissions and limitations # under the License. -from oslo_messaging._i18n import _ +from oslo_utils import eventletutils from oslo_utils import importutils -# Map zmq_concurrency config option names to the actual module name. -ZMQ_MODULES = { - 'native': 'zmq', - 'eventlet': 'eventlet.green.zmq', -} - - -def import_zmq(zmq_concurrency='eventlet'): - _raise_error_if_invalid_config_value(zmq_concurrency) - - imported_zmq = importutils.try_import(ZMQ_MODULES[zmq_concurrency], - default=None) - +def import_zmq(): + imported_zmq = importutils.try_import( + 'eventlet.green.zmq' if eventletutils.is_monkey_patched('thread') else + 'zmq', default=None + ) return imported_zmq -def get_poller(zmq_concurrency='eventlet'): - _raise_error_if_invalid_config_value(zmq_concurrency) - - if zmq_concurrency == 'eventlet' and _is_eventlet_zmq_available(): +def get_poller(): + if eventletutils.is_monkey_patched('thread'): from oslo_messaging._drivers.zmq_driver.poller import green_poller return green_poller.GreenPoller() @@ -43,10 +33,8 @@ def get_poller(zmq_concurrency='eventlet'): return threading_poller.ThreadingPoller() -def get_executor(method, zmq_concurrency='eventlet'): - _raise_error_if_invalid_config_value(zmq_concurrency) - - if zmq_concurrency == 'eventlet' and _is_eventlet_zmq_available(): +def get_executor(method): + if eventletutils.is_monkey_patched('thread'): from oslo_messaging._drivers.zmq_driver.poller import green_poller return green_poller.GreenExecutor(method) @@ -54,26 +42,10 @@ def get_executor(method, zmq_concurrency='eventlet'): return threading_poller.ThreadingExecutor(method) -def is_eventlet_concurrency(conf): - return _is_eventlet_zmq_available() and conf.rpc_zmq_concurrency == \ - 'eventlet' - - -def _is_eventlet_zmq_available(): - return importutils.try_import('eventlet.green.zmq') - - -def _raise_error_if_invalid_config_value(zmq_concurrency): - if zmq_concurrency not in ZMQ_MODULES: - errmsg = _('Invalid zmq_concurrency value: %s') - raise ValueError(errmsg % zmq_concurrency) - - -def get_queue(zmq_concurrency='eventlet'): - _raise_error_if_invalid_config_value(zmq_concurrency) - if zmq_concurrency == 'eventlet' and _is_eventlet_zmq_available(): +def get_queue(): + if eventletutils.is_monkey_patched('thread'): import eventlet return eventlet.queue.Queue(), eventlet.queue.Empty - else: - import six - return six.moves.queue.Queue(), six.moves.queue.Empty + + import six + return six.moves.queue.Queue(), six.moves.queue.Empty diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_async.py b/oslo_messaging/tests/drivers/zmq/test_zmq_async.py index f929ab3dc..a2caf1261 100644 --- a/oslo_messaging/tests/drivers/zmq/test_zmq_async.py +++ b/oslo_messaging/tests/drivers/zmq/test_zmq_async.py @@ -27,20 +27,9 @@ class TestImportZmq(test_utils.BaseTestCase): def setUp(self): super(TestImportZmq, self).setUp() - def test_config_short_names_are_converted_to_correct_module_names(self): - mock_try_import = mock.Mock() - zmq_async.importutils.try_import = mock_try_import + def test_when_eventlet_is_available_then_load_eventlet_green_zmq(self): + zmq_async.eventletutils.is_monkey_patched = lambda _: True - zmq_async.importutils.try_import.return_value = 'mock zmq module' - self.assertEqual('mock zmq module', zmq_async.import_zmq('native')) - mock_try_import.assert_called_with('zmq', default=None) - - zmq_async.importutils.try_import.return_value = 'mock eventlet module' - self.assertEqual('mock eventlet module', - zmq_async.import_zmq('eventlet')) - mock_try_import.assert_called_with('eventlet.green.zmq', default=None) - - def test_when_no_args_then_default_zmq_module_is_loaded(self): mock_try_import = mock.Mock() zmq_async.importutils.try_import = mock_try_import @@ -48,12 +37,15 @@ class TestImportZmq(test_utils.BaseTestCase): mock_try_import.assert_called_with('eventlet.green.zmq', default=None) - def test_invalid_config_value_raise_ValueError(self): - invalid_opt = 'x' + def test_when_evetlet_is_unavailable_then_load_zmq(self): + zmq_async.eventletutils.is_monkey_patched = lambda _: False - errmsg = 'Invalid zmq_concurrency value: x' - with self.assertRaisesRegexp(ValueError, errmsg): - zmq_async.import_zmq(invalid_opt) + mock_try_import = mock.Mock() + zmq_async.importutils.try_import = mock_try_import + + zmq_async.import_zmq() + + mock_try_import.assert_called_with('zmq', default=None) class TestGetPoller(test_utils.BaseTestCase): @@ -62,39 +54,20 @@ class TestGetPoller(test_utils.BaseTestCase): def setUp(self): super(TestGetPoller, self).setUp() - def test_when_no_arg_to_get_poller_then_return_default_poller(self): - zmq_async._is_eventlet_zmq_available = lambda: True + def test_when_eventlet_is_available_then_return_GreenPoller(self): + zmq_async.eventletutils.is_monkey_patched = lambda _: True actual = zmq_async.get_poller() self.assertTrue(isinstance(actual, green_poller.GreenPoller)) - def test_when_native_poller_requested_then_return_ThreadingPoller(self): - actual = zmq_async.get_poller('native') - - self.assertTrue(isinstance(actual, threading_poller.ThreadingPoller)) - def test_when_eventlet_is_unavailable_then_return_ThreadingPoller(self): - zmq_async._is_eventlet_zmq_available = lambda: False + zmq_async.eventletutils.is_monkey_patched = lambda _: False - actual = zmq_async.get_poller('eventlet') + actual = zmq_async.get_poller() self.assertTrue(isinstance(actual, threading_poller.ThreadingPoller)) - def test_when_eventlet_is_available_then_return_GreenPoller(self): - zmq_async._is_eventlet_zmq_available = lambda: True - - actual = zmq_async.get_poller('eventlet') - - self.assertTrue(isinstance(actual, green_poller.GreenPoller)) - - def test_invalid_config_value_raise_ValueError(self): - invalid_opt = 'x' - - errmsg = 'Invalid zmq_concurrency value: x' - with self.assertRaisesRegexp(ValueError, errmsg): - zmq_async.get_poller(invalid_opt) - class TestGetReplyPoller(test_utils.BaseTestCase): @@ -102,34 +75,20 @@ class TestGetReplyPoller(test_utils.BaseTestCase): def setUp(self): super(TestGetReplyPoller, self).setUp() - def test_default_reply_poller_is_HoldReplyPoller(self): - zmq_async._is_eventlet_zmq_available = lambda: True + def test_when_eventlet_is_available_then_return_HoldReplyPoller(self): + zmq_async.eventletutils.is_monkey_patched = lambda _: True actual = zmq_async.get_poller() self.assertTrue(isinstance(actual, green_poller.GreenPoller)) - def test_when_eventlet_is_available_then_return_HoldReplyPoller(self): - zmq_async._is_eventlet_zmq_available = lambda: True - - actual = zmq_async.get_poller('eventlet') - - self.assertTrue(isinstance(actual, green_poller.GreenPoller)) - def test_when_eventlet_is_unavailable_then_return_ThreadingPoller(self): - zmq_async._is_eventlet_zmq_available = lambda: False + zmq_async.eventletutils.is_monkey_patched = lambda _: False - actual = zmq_async.get_poller('eventlet') + actual = zmq_async.get_poller() self.assertTrue(isinstance(actual, threading_poller.ThreadingPoller)) - def test_invalid_config_value_raise_ValueError(self): - invalid_opt = 'x' - - errmsg = 'Invalid zmq_concurrency value: x' - with self.assertRaisesRegexp(ValueError, errmsg): - zmq_async.get_poller(invalid_opt) - class TestGetExecutor(test_utils.BaseTestCase): @@ -137,34 +96,19 @@ class TestGetExecutor(test_utils.BaseTestCase): def setUp(self): super(TestGetExecutor, self).setUp() - def test_default_executor_is_GreenExecutor(self): - zmq_async._is_eventlet_zmq_available = lambda: True + def test_when_eventlet_module_is_available_then_return_GreenExecutor(self): + zmq_async.eventletutils.is_monkey_patched = lambda _: True executor = zmq_async.get_executor('any method') self.assertTrue(isinstance(executor, green_poller.GreenExecutor)) self.assertEqual('any method', executor._method) - def test_when_eventlet_module_is_available_then_return_GreenExecutor(self): - zmq_async._is_eventlet_zmq_available = lambda: True - - executor = zmq_async.get_executor('any method', 'eventlet') - - self.assertTrue(isinstance(executor, green_poller.GreenExecutor)) - self.assertEqual('any method', executor._method) - def test_when_eventlet_is_unavailable_then_return_ThreadingExecutor(self): - zmq_async._is_eventlet_zmq_available = lambda: False + zmq_async.eventletutils.is_monkey_patched = lambda _: False - executor = zmq_async.get_executor('any method', 'eventlet') + executor = zmq_async.get_executor('any method') self.assertTrue(isinstance(executor, threading_poller.ThreadingExecutor)) self.assertEqual('any method', executor._method) - - def test_invalid_config_value_raise_ValueError(self): - invalid_opt = 'x' - - errmsg = 'Invalid zmq_concurrency value: x' - with self.assertRaisesRegexp(ValueError, errmsg): - zmq_async.get_executor('any method', invalid_opt)