Removes additional select module patching
Change-Id: I9ad8a3ffec2c41ef7b88a522e0d643d29be86af0
This commit is contained in:
parent
e24f4faf96
commit
8caa4bef84
oslo_messaging/_drivers
@ -12,10 +12,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import retrying
|
||||
import sys
|
||||
import time
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
@ -29,6 +25,9 @@ from oslo_messaging._drivers.pika_driver import pika_poller as pika_drv_poller
|
||||
|
||||
from oslo_messaging import exceptions
|
||||
|
||||
import retrying
|
||||
import time
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
pika_opts = [
|
||||
@ -133,31 +132,9 @@ rpc_opts = [
|
||||
]
|
||||
|
||||
|
||||
def _is_eventlet_monkey_patched():
|
||||
if 'eventlet.patcher' not in sys.modules:
|
||||
return False
|
||||
import eventlet.patcher
|
||||
return eventlet.patcher.is_monkey_patched('thread')
|
||||
|
||||
|
||||
class PikaDriver(object):
|
||||
def __init__(self, conf, url, default_exchange=None,
|
||||
allowed_remote_exmods=None):
|
||||
if 'eventlet.patcher' in sys.modules:
|
||||
import eventlet.patcher
|
||||
if eventlet.patcher.is_monkey_patched('select'):
|
||||
import select
|
||||
|
||||
try:
|
||||
del select.poll
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
try:
|
||||
del select.epoll
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
opt_group = cfg.OptGroup(name='oslo_messaging_pika',
|
||||
title='Pika driver options')
|
||||
conf.register_group(opt_group)
|
||||
|
@ -11,8 +11,6 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import socket
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from oslo_messaging import exceptions
|
||||
@ -20,18 +18,41 @@ from oslo_messaging import exceptions
|
||||
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
|
||||
|
||||
import pika
|
||||
from pika import adapters as pika_adapters
|
||||
from pika.adapters import select_connection
|
||||
from pika import credentials as pika_credentials
|
||||
|
||||
import pika_pool
|
||||
|
||||
import socket
|
||||
import sys
|
||||
|
||||
import threading
|
||||
import time
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PooledConnectionWithConfirmations(pika_pool.Connection):
|
||||
def _is_eventlet_monkey_patched(module):
|
||||
if 'eventlet.patcher' not in sys.modules:
|
||||
return False
|
||||
import eventlet.patcher
|
||||
return eventlet.patcher.is_monkey_patched(module)
|
||||
|
||||
|
||||
def _create__select_poller_connection_impl(
|
||||
parameters, on_open_callback, on_open_error_callback,
|
||||
on_close_callback, stop_ioloop_on_close):
|
||||
return select_connection.SelectConnection(
|
||||
parameters=parameters,
|
||||
on_open_callback=on_open_callback,
|
||||
on_open_error_callback=on_open_error_callback,
|
||||
on_close_callback=on_close_callback,
|
||||
stop_ioloop_on_close=stop_ioloop_on_close,
|
||||
custom_ioloop=select_connection.SelectPoller()
|
||||
)
|
||||
|
||||
|
||||
class _PooledConnectionWithConfirmations(pika_pool.Connection):
|
||||
@property
|
||||
def channel(self):
|
||||
if self.fairy.channel is None:
|
||||
@ -49,6 +70,8 @@ class PikaEngine(object):
|
||||
def __init__(self, conf, url, default_exchange=None):
|
||||
self.conf = conf
|
||||
|
||||
self._force_select_poller_use = _is_eventlet_monkey_patched('select')
|
||||
|
||||
# processing rpc options
|
||||
|
||||
self.default_rpc_exchange = (
|
||||
@ -177,7 +200,7 @@ class PikaEngine(object):
|
||||
)
|
||||
|
||||
self.connection_with_confirmation_pool.Connection = (
|
||||
PooledConnectionWithConfirmations
|
||||
_PooledConnectionWithConfirmations
|
||||
)
|
||||
|
||||
def _next_connection_num(self):
|
||||
@ -258,14 +281,16 @@ class PikaEngine(object):
|
||||
try:
|
||||
base_host_params = self._connection_host_param_list[host_index]
|
||||
|
||||
connection = pika_adapters.BlockingConnection(
|
||||
pika.ConnectionParameters(
|
||||
connection = pika.BlockingConnection(
|
||||
parameters=pika.ConnectionParameters(
|
||||
heartbeat_interval=(
|
||||
self.conf.oslo_messaging_pika.heartbeat_interval
|
||||
if for_listening else None
|
||||
),
|
||||
**base_host_params
|
||||
)
|
||||
),
|
||||
_impl_class=(_create__select_poller_connection_impl
|
||||
if self._force_select_poller_use else None)
|
||||
)
|
||||
|
||||
self._set_tcp_user_timeout(connection._impl.socket)
|
||||
|
Loading…
x
Reference in New Issue
Block a user