diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py index 215f0a347..bbf17fe80 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py @@ -65,7 +65,7 @@ class UniversalQueueProxy(object): {"router": self.be_router_address}) def run(self): - message, socket = self.poller.poll(self.conf.rpc_poll_timeout) + message, socket = self.poller.poll() if message is None: return diff --git a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py index 85f142d8d..1fcfaba73 100644 --- a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py @@ -46,9 +46,12 @@ class ThreadingPoller(zmq_poller.ZmqPoller): self.poller.register(socket, zmq.POLLIN) def poll(self, timeout=None): + if timeout is not None and timeout > 0: + timeout *= 1000 # convert seconds to milliseconds + sockets = {} try: - sockets = dict(self.poller.poll()) + sockets = dict(self.poller.poll(timeout=timeout)) except zmq.ZMQError as e: LOG.debug("Polling terminated with error: %s", e)