diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py index 843753648..bdab583e2 100644 --- a/oslo_messaging/_drivers/impl_pika.py +++ b/oslo_messaging/_drivers/impl_pika.py @@ -12,10 +12,6 @@ # License for the specific language governing permissions and limitations # under the License. -import retrying -import sys -import time - from oslo_config import cfg from oslo_log import log as logging @@ -29,6 +25,9 @@ from oslo_messaging._drivers.pika_driver import pika_poller as pika_drv_poller from oslo_messaging import exceptions +import retrying +import time + LOG = logging.getLogger(__name__) pika_opts = [ @@ -133,31 +132,9 @@ rpc_opts = [ ] -def _is_eventlet_monkey_patched(): - if 'eventlet.patcher' not in sys.modules: - return False - import eventlet.patcher - return eventlet.patcher.is_monkey_patched('thread') - - class PikaDriver(object): def __init__(self, conf, url, default_exchange=None, allowed_remote_exmods=None): - if 'eventlet.patcher' in sys.modules: - import eventlet.patcher - if eventlet.patcher.is_monkey_patched('select'): - import select - - try: - del select.poll - except AttributeError: - pass - - try: - del select.epoll - except AttributeError: - pass - opt_group = cfg.OptGroup(name='oslo_messaging_pika', title='Pika driver options') conf.register_group(opt_group) diff --git a/oslo_messaging/_drivers/pika_driver/pika_engine.py b/oslo_messaging/_drivers/pika_driver/pika_engine.py index 9d203bfce..745c2da1c 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_engine.py +++ b/oslo_messaging/_drivers/pika_driver/pika_engine.py @@ -11,8 +11,6 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -import socket - from oslo_log import log as logging from oslo_messaging import exceptions @@ -20,18 +18,41 @@ from oslo_messaging import exceptions from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc import pika -from pika import adapters as pika_adapters +from pika.adapters import select_connection from pika import credentials as pika_credentials import pika_pool +import socket +import sys + import threading import time LOG = logging.getLogger(__name__) -class PooledConnectionWithConfirmations(pika_pool.Connection): +def _is_eventlet_monkey_patched(module): + if 'eventlet.patcher' not in sys.modules: + return False + import eventlet.patcher + return eventlet.patcher.is_monkey_patched(module) + + +def _create__select_poller_connection_impl( + parameters, on_open_callback, on_open_error_callback, + on_close_callback, stop_ioloop_on_close): + return select_connection.SelectConnection( + parameters=parameters, + on_open_callback=on_open_callback, + on_open_error_callback=on_open_error_callback, + on_close_callback=on_close_callback, + stop_ioloop_on_close=stop_ioloop_on_close, + custom_ioloop=select_connection.SelectPoller() + ) + + +class _PooledConnectionWithConfirmations(pika_pool.Connection): @property def channel(self): if self.fairy.channel is None: @@ -49,6 +70,8 @@ class PikaEngine(object): def __init__(self, conf, url, default_exchange=None): self.conf = conf + self._force_select_poller_use = _is_eventlet_monkey_patched('select') + # processing rpc options self.default_rpc_exchange = ( @@ -177,7 +200,7 @@ class PikaEngine(object): ) self.connection_with_confirmation_pool.Connection = ( - PooledConnectionWithConfirmations + _PooledConnectionWithConfirmations ) def _next_connection_num(self): @@ -258,14 +281,16 @@ class PikaEngine(object): try: base_host_params = self._connection_host_param_list[host_index] - connection = pika_adapters.BlockingConnection( - pika.ConnectionParameters( + connection = pika.BlockingConnection( + parameters=pika.ConnectionParameters( heartbeat_interval=( self.conf.oslo_messaging_pika.heartbeat_interval if for_listening else None ), **base_host_params - ) + ), + _impl_class=(_create__select_poller_connection_impl + if self._force_select_poller_use else None) ) self._set_tcp_user_timeout(connection._impl.socket)