Fix delay before host reconnecting

Change-Id: Ifd66b3fe44f5451ef4b5e03d06f214199474ca0b
This commit is contained in:
Dmitriy Ukhlov 2015-11-19 14:51:02 +02:00 committed by dukhlov
parent cc3db22c6a
commit e24f4faf96
5 changed files with 31 additions and 30 deletions

@ -47,7 +47,7 @@ pika_opts = [
cfg.FloatOpt('tcp_user_timeout', default=0.25,
help="Set TCP_USER_TIMEOUT in seconds for connection's "
"socket"),
cfg.FloatOpt('host_connection_reconnect_delay', default=5,
cfg.FloatOpt('host_connection_reconnect_delay', default=0.25,
help="Set delay for reconnection to some host which has "
"connection error")
]

@ -121,7 +121,7 @@ class PikaEngine(object):
"integer")
self._tcp_user_timeout = self.conf.oslo_messaging_pika.tcp_user_timeout
self._host_connection_reconnect_delay = (
self.host_connection_reconnect_delay = (
self.conf.oslo_messaging_pika.host_connection_reconnect_delay
)
@ -249,7 +249,7 @@ class PikaEngine(object):
]
if (last_time != last_success_time and
cur_time - last_time <
self._host_connection_reconnect_delay):
self.host_connection_reconnect_delay):
raise pika_drv_exc.HostConnectionNotAllowedException(
"Connection to host #{} is not allowed now because of "
"previous failure".format(host_index)

@ -14,13 +14,14 @@
from oslo_log import log as logging
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
from oslo_messaging._drivers.pika_driver import pika_poller as pika_drv_poller
import threading
import time
import uuid
LOG = logging.getLogger(__name__)
@ -32,7 +33,7 @@ class RpcReplyPikaListener(object):
self._reply_queue = None
self._reply_poller = None
self._reply_waiting_future_list = []
self._reply_waiting_futures = {}
self._reply_consumer_enabled = False
self._reply_consumer_thread_run_flag = True
@ -83,27 +84,21 @@ class RpcReplyPikaListener(object):
if message is None:
continue
message.acknowledge()
i = 0
curtime = time.time()
while (i < len(self._reply_waiting_future_list) and
self._reply_consumer_thread_run_flag):
msg_id, future, expiration = (
self._reply_waiting_future_list[i]
)
if expiration and expiration < curtime:
del self._reply_waiting_future_list[i]
elif msg_id == message.msg_id:
del self._reply_waiting_future_list[i]
future.set_result(message)
else:
i += 1
future = self._reply_waiting_futures.pop(message.msg_id, None)
if future is not None:
future.set_result(message)
except pika_drv_exc.EstablishConnectionException:
LOG.exception("Problem during establishing connection for "
"reply polling")
time.sleep(self._pika_engine.host_connection_reconnect_delay)
except BaseException:
LOG.exception("Exception during reply polling")
LOG.exception("Unexpected exception during reply polling")
def register_reply_waiter(self, msg_id, future, expiration_time):
self._reply_waiting_future_list.append(
(msg_id, future, expiration_time)
)
def register_reply_waiter(self, msg_id, future):
self._reply_waiting_futures[msg_id] = future
def unregister_reply_waiter(self, msg_id):
self._reply_waiting_futures.pop(msg_id, None)
def cleanup(self):
with self._reply_consumer_lock:

@ -310,8 +310,7 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage):
future = futures.Future()
reply_listener.register_reply_waiter(
msg_id=msg_id, future=future,
expiration_time=expiration_time
msg_id=msg_id, future=future
)
self._do_send(
@ -324,6 +323,8 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage):
return future.result(expiration_time - time.time())
except futures.TimeoutError:
raise exceptions.MessagingTimeout()
finally:
reply_listener.unregister_reply_waiter(self.msg_id)
else:
self._do_send(
exchange=exchange, routing_key=queue, msg_dict=msg_dict,

@ -101,7 +101,8 @@ class PikaPoller(object):
self._connection = None
def poll(self, timeout=None):
start = time.time()
expiration_time = time.time() + timeout if timeout else None
while not self._message_queue:
with self._lock:
if not self._started:
@ -110,12 +111,16 @@ class PikaPoller(object):
try:
if self._channel is None:
self._reconnect()
self._connection.process_data_events()
self._connection.process_data_events(
time_limit=timeout
)
except Exception:
self._cleanup()
raise
if timeout and time.time() - start > timeout:
return None
if timeout is not None:
timeout = expiration_time - time.time()
if timeout <= 0:
return None
return self._message_queue.popleft()