diff --git a/oslo_messaging/_drivers/pika_driver/pika_listener.py b/oslo_messaging/_drivers/pika_driver/pika_listener.py
index 1e52969b7..a58e54236 100644
--- a/oslo_messaging/_drivers/pika_driver/pika_listener.py
+++ b/oslo_messaging/_drivers/pika_driver/pika_listener.py
@@ -40,6 +40,7 @@ class RpcReplyPikaListener(object):
         self._reply_consumer_initialized = False
         self._reply_consumer_initialization_lock = threading.Lock()
         self._poller_thread = None
+        self._shutdown = False
 
     def get_reply_qname(self, expiration_time=None):
         """As result return reply queue name, shared for whole process,
@@ -75,7 +76,7 @@ class RpcReplyPikaListener(object):
                     )
                 )
 
-                self._reply_poller.start()
+            self._reply_poller.start()
 
             # start reply poller job thread if needed
             if self._poller_thread is None:
@@ -93,9 +94,11 @@ class RpcReplyPikaListener(object):
         """Reply polling job. Poll replies in infinite loop and notify
         registered features
         """
-        while self._reply_poller:
+        while True:
             try:
                 messages = self._reply_poller.poll()
+                if not messages and self._shutdown:
+                    break
 
                 for message in messages:
                     try:
@@ -132,6 +135,8 @@ class RpcReplyPikaListener(object):
 
     def cleanup(self):
         """Stop replies consuming and cleanup resources"""
+        self._shutdown = True
+
         if self._reply_poller:
             self._reply_poller.stop()
             self._reply_poller.cleanup()
diff --git a/oslo_messaging/_drivers/pika_driver/pika_message.py b/oslo_messaging/_drivers/pika_driver/pika_message.py
index 4ee4e9a12..1f687ff68 100644
--- a/oslo_messaging/_drivers/pika_driver/pika_message.py
+++ b/oslo_messaging/_drivers/pika_driver/pika_message.py
@@ -387,7 +387,7 @@ class PikaOutgoingMessage(object):
             )
         except pika_exceptions.UnroutableError as e:
             raise pika_drv_exc.RoutingException(
-                "Can not deliver message:[body:{}, properties: {}] to any"
+                "Can not deliver message:[body:{}, properties: {}] to any "
                 "queue using target: [exchange:{}, "
                 "routing_key:{}]. {}".format(
                     body, properties, exchange, routing_key, str(e)
diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py
index dc3d27912..69f73deb1 100644
--- a/oslo_messaging/_drivers/pika_driver/pika_poller.py
+++ b/oslo_messaging/_drivers/pika_driver/pika_poller.py
@@ -94,13 +94,13 @@ class PikaPoller(base.Listener):
             for queue_info in self._queues_to_consume:
                 no_ack = queue_info["no_ack"]
 
-                on_message_no_ack_callback = (
+                on_message_callback = (
                     self._on_message_no_ack_callback if no_ack
                     else self._on_message_with_ack_callback
                 )
 
                 queue_info["consumer_tag"] = self._channel.basic_consume(
-                    on_message_no_ack_callback, queue_info["queue_name"],
+                    on_message_callback, queue_info["queue_name"],
                     no_ack=no_ack
                 )
         except Exception:
@@ -208,8 +208,10 @@ class PikaPoller(base.Listener):
                                 del self._message_queue[:prefetch_size]
                                 return result
                     except pika_drv_exc.EstablishConnectionException as e:
-                        LOG.warn("Problem during establishing connection for"
-                                 "pika poller %s", e, exc_info=True)
+                        LOG.warning(
+                            "Problem during establishing connection for pika "
+                            "poller %s", e, exc_info=True
+                        )
                         time.sleep(
                             self._pika_engine.host_connection_reconnect_delay
                         )
@@ -231,19 +233,25 @@ class PikaPoller(base.Listener):
             try:
                 self._reconnect()
             except pika_drv_exc.EstablishConnectionException as exc:
-                LOG.warn("Can not establishing connection during pika "
-                         "Conecting required during first poll() call. %s",
-                         exc, exc_info=True)
+                LOG.warning(
+                    "Can not establish connection during pika poller's "
+                    "start(). Connecting is required during first poll() "
+                    "call. %s", exc, exc_info=True
+                )
             except pika_drv_exc.ConnectionException as exc:
                 self._cleanup()
-                LOG.warn("Connectivity problem during pika poller's start(). "
-                         "Reconnecting required during first poll() call. %s",
-                         exc, exc_info=True)
+                LOG.warning(
+                    "Connectivity problem during pika poller's start(). "
+                    "Reconnecting required during first poll() call. %s",
+                    exc, exc_info=True
+                )
             except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as exc:
                 self._cleanup()
-                LOG.warn("Connectivity problem during pika poller's start(). "
-                         "Reconnecting required during first poll() call. %s",
-                         exc, exc_info=True)
+                LOG.warning(
+                    "Connectivity problem during pika poller's start(). "
+                    "Reconnecting required during first poll() call. %s",
+                    exc, exc_info=True
+                )
             self._started = True
 
     def stop(self):
@@ -260,8 +268,10 @@ class PikaPoller(base.Listener):
                     self._stop_consuming()
                 except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as exc:
                     self._cleanup()
-                    LOG.warn("Connectivity problem detected during consumer "
-                             "cancellation. %s", exc, exc_info=True)
+                    LOG.warning(
+                        "Connectivity problem detected during consumer "
+                        "cancellation. %s", exc, exc_info=True
+                    )
             self._started = False
 
     def cleanup(self):