diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py index b4b6d4288..843753648 100644 --- a/oslo_messaging/_drivers/impl_pika.py +++ b/oslo_messaging/_drivers/impl_pika.py @@ -47,7 +47,7 @@ pika_opts = [ cfg.FloatOpt('tcp_user_timeout', default=0.25, help="Set TCP_USER_TIMEOUT in seconds for connection's " "socket"), - cfg.FloatOpt('host_connection_reconnect_delay', default=5, + cfg.FloatOpt('host_connection_reconnect_delay', default=0.25, help="Set delay for reconnection to some host which has " "connection error") ] diff --git a/oslo_messaging/_drivers/pika_driver/pika_engine.py b/oslo_messaging/_drivers/pika_driver/pika_engine.py index 03ccccdf0..9d203bfce 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_engine.py +++ b/oslo_messaging/_drivers/pika_driver/pika_engine.py @@ -121,7 +121,7 @@ class PikaEngine(object): "integer") self._tcp_user_timeout = self.conf.oslo_messaging_pika.tcp_user_timeout - self._host_connection_reconnect_delay = ( + self.host_connection_reconnect_delay = ( self.conf.oslo_messaging_pika.host_connection_reconnect_delay ) @@ -249,7 +249,7 @@ class PikaEngine(object): ] if (last_time != last_success_time and cur_time - last_time < - self._host_connection_reconnect_delay): + self.host_connection_reconnect_delay): raise pika_drv_exc.HostConnectionNotAllowedException( "Connection to host #{} is not allowed now because of " "previous failure".format(host_index) diff --git a/oslo_messaging/_drivers/pika_driver/pika_listener.py b/oslo_messaging/_drivers/pika_driver/pika_listener.py index 493adf1ea..88fc2586c 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_listener.py +++ b/oslo_messaging/_drivers/pika_driver/pika_listener.py @@ -14,13 +14,14 @@ from oslo_log import log as logging +from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc from oslo_messaging._drivers.pika_driver import pika_poller as pika_drv_poller - import threading import time import uuid + LOG = logging.getLogger(__name__) @@ -32,7 +33,7 @@ class RpcReplyPikaListener(object): self._reply_queue = None self._reply_poller = None - self._reply_waiting_future_list = [] + self._reply_waiting_futures = {} self._reply_consumer_enabled = False self._reply_consumer_thread_run_flag = True @@ -83,27 +84,21 @@ class RpcReplyPikaListener(object): if message is None: continue message.acknowledge() - i = 0 - curtime = time.time() - while (i < len(self._reply_waiting_future_list) and - self._reply_consumer_thread_run_flag): - msg_id, future, expiration = ( - self._reply_waiting_future_list[i] - ) - if expiration and expiration < curtime: - del self._reply_waiting_future_list[i] - elif msg_id == message.msg_id: - del self._reply_waiting_future_list[i] - future.set_result(message) - else: - i += 1 + future = self._reply_waiting_futures.pop(message.msg_id, None) + if future is not None: + future.set_result(message) + except pika_drv_exc.EstablishConnectionException: + LOG.exception("Problem during establishing connection for " + "reply polling") + time.sleep(self._pika_engine.host_connection_reconnect_delay) except BaseException: - LOG.exception("Exception during reply polling") + LOG.exception("Unexpected exception during reply polling") - def register_reply_waiter(self, msg_id, future, expiration_time): - self._reply_waiting_future_list.append( - (msg_id, future, expiration_time) - ) + def register_reply_waiter(self, msg_id, future): + self._reply_waiting_futures[msg_id] = future + + def unregister_reply_waiter(self, msg_id): + self._reply_waiting_futures.pop(msg_id, None) def cleanup(self): with self._reply_consumer_lock: diff --git a/oslo_messaging/_drivers/pika_driver/pika_message.py b/oslo_messaging/_drivers/pika_driver/pika_message.py index f771b7cab..da87c7d01 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_message.py +++ b/oslo_messaging/_drivers/pika_driver/pika_message.py @@ -310,8 +310,7 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage): future = futures.Future() reply_listener.register_reply_waiter( - msg_id=msg_id, future=future, - expiration_time=expiration_time + msg_id=msg_id, future=future ) self._do_send( @@ -324,6 +323,8 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage): return future.result(expiration_time - time.time()) except futures.TimeoutError: raise exceptions.MessagingTimeout() + finally: + reply_listener.unregister_reply_waiter(self.msg_id) else: self._do_send( exchange=exchange, routing_key=queue, msg_dict=msg_dict, diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py index c7c152609..d34a9c11a 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_poller.py +++ b/oslo_messaging/_drivers/pika_driver/pika_poller.py @@ -101,7 +101,8 @@ class PikaPoller(object): self._connection = None def poll(self, timeout=None): - start = time.time() + expiration_time = time.time() + timeout if timeout else None + while not self._message_queue: with self._lock: if not self._started: @@ -110,12 +111,16 @@ class PikaPoller(object): try: if self._channel is None: self._reconnect() - self._connection.process_data_events() + self._connection.process_data_events( + time_limit=timeout + ) except Exception: self._cleanup() raise - if timeout and time.time() - start > timeout: - return None + if timeout is not None: + timeout = expiration_time - time.time() + if timeout <= 0: + return None return self._message_queue.popleft()