From 0aca222d960f8e9b9acaaa1806a0dae462628cfc Mon Sep 17 00:00:00 2001
From: dukhlov <dukhlov@mirantis.com>
Date: Thu, 3 Mar 2016 11:00:50 -0500
Subject: [PATCH] Fix problems during unstable network

In this patch start and stop method is not raised exceptions
if connectivity problem is detected. Rasing exceptions there
exceptions are not expected by MessageHandlerServer.
It hangs server on start

Closes-Bug: #1553168

Change-Id: I891abab2a1184fa65b496ea2f7fc54894bc0b421
---
 oslo_messaging/_drivers/impl_pika.py          | 12 +--
 .../_drivers/pika_driver/pika_commons.py      | 44 +++++++++++
 .../_drivers/pika_driver/pika_engine.py       | 40 ++++------
 .../_drivers/pika_driver/pika_listener.py     | 12 +--
 .../_drivers/pika_driver/pika_poller.py       | 75 ++++++++++---------
 .../tests/drivers/pika/test_message.py        |  4 +-
 .../tests/drivers/pika/test_poller.py         | 13 ++++
 7 files changed, 120 insertions(+), 80 deletions(-)
 create mode 100644 oslo_messaging/_drivers/pika_driver/pika_commons.py

diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py
index 25047e107..a9434818b 100644
--- a/oslo_messaging/_drivers/impl_pika.py
+++ b/oslo_messaging/_drivers/impl_pika.py
@@ -195,7 +195,7 @@ class PikaDriver(base.BaseDriver):
                     self._declare_rpc_exchange(exchange,
                                                expiration_time - time.time())
                 except pika_drv_exc.ConnectionException as e:
-                    LOG.warning("Problem during declaring exchange. %", e)
+                    LOG.warning("Problem during declaring exchange. %s", e)
                 return True
             elif isinstance(ex, (pika_drv_exc.ConnectionException,
                                  exceptions.MessageDeliveryFailure)):
@@ -240,7 +240,7 @@ class PikaDriver(base.BaseDriver):
                 self._declare_rpc_exchange(exchange,
                                            expiration_time - time.time())
             except pika_drv_exc.ConnectionException as e:
-                LOG.warning("Problem during declaring exchange. %", e)
+                LOG.warning("Problem during declaring exchange. %s", e)
             raise ex
 
         if reply is not None:
@@ -269,7 +269,7 @@ class PikaDriver(base.BaseDriver):
                     exchange, expiration_time - time.time()
                 )
             except pika_drv_exc.ConnectionException as e:
-                LOG.warning("Problem during declaring exchange. %", e)
+                LOG.warning("Problem during declaring exchange. %s", e)
 
     def _declare_notification_queue_binding(self, target, timeout=None):
         if timeout is not None and timeout < 0:
@@ -303,16 +303,16 @@ class PikaDriver(base.BaseDriver):
         def on_exception(ex):
             if isinstance(ex, (pika_drv_exc.ExchangeNotFoundException,
                                pika_drv_exc.RoutingException)):
-                LOG.warning("Problem during sending notification. %", ex)
+                LOG.warning("Problem during sending notification. %s", ex)
                 try:
                     self._declare_notification_queue_binding(target)
                 except pika_drv_exc.ConnectionException as e:
                     LOG.warning("Problem during declaring notification queue "
-                                "binding. %", e)
+                                "binding. %s", e)
                 return True
             elif isinstance(ex, (pika_drv_exc.ConnectionException,
                                  pika_drv_exc.MessageRejectedException)):
-                LOG.warning("Problem during sending notification. %", ex)
+                LOG.warning("Problem during sending notification. %s", ex)
                 return True
             else:
                 return False
diff --git a/oslo_messaging/_drivers/pika_driver/pika_commons.py b/oslo_messaging/_drivers/pika_driver/pika_commons.py
new file mode 100644
index 000000000..a076f40ad
--- /dev/null
+++ b/oslo_messaging/_drivers/pika_driver/pika_commons.py
@@ -0,0 +1,44 @@
+#    Copyright 2015 Mirantis, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import select
+import socket
+import sys
+
+from pika import exceptions as pika_exceptions
+import six
+
+
+PIKA_CONNECTIVITY_ERRORS = (
+    pika_exceptions.AMQPConnectionError,
+    pika_exceptions.ConnectionClosed,
+    pika_exceptions.ChannelClosed,
+    socket.timeout,
+    select.error
+)
+
+EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins'
+
+
+def is_eventlet_monkey_patched(module):
+    """Determines safely is eventlet patching for module enabled or not
+
+    :param module: String, module name
+    :return Bool, True if module is pathed, False otherwise
+    """
+
+    if 'eventlet.patcher' not in sys.modules:
+        return False
+    import eventlet.patcher
+    return eventlet.patcher.is_monkey_patched(module)
diff --git a/oslo_messaging/_drivers/pika_driver/pika_engine.py b/oslo_messaging/_drivers/pika_driver/pika_engine.py
index c09936576..3b762c327 100644
--- a/oslo_messaging/_drivers/pika_driver/pika_engine.py
+++ b/oslo_messaging/_drivers/pika_driver/pika_engine.py
@@ -14,7 +14,6 @@
 
 import random
 import socket
-import sys
 import threading
 import time
 
@@ -22,30 +21,16 @@ from oslo_log import log as logging
 import pika
 from pika.adapters import select_connection
 from pika import credentials as pika_credentials
+
 import pika_pool
-import six
 
 import uuid
 
+from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
 from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
 
 LOG = logging.getLogger(__name__)
 
-_EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins'
-
-
-def _is_eventlet_monkey_patched(module):
-    """Determines safely is eventlet patching for module enabled or not
-
-    :param module: String, module name
-    :return Bool, True if module is pathed, False otherwise
-    """
-
-    if 'eventlet.patcher' not in sys.modules:
-        return False
-    import eventlet.patcher
-    return eventlet.patcher.is_monkey_patched(module)
-
 
 def _create_select_poller_connection_impl(
         parameters, on_open_callback, on_open_error_callback,
@@ -99,7 +84,9 @@ class PikaEngine(object):
                  allowed_remote_exmods=None):
         self.conf = conf
 
-        self._force_select_poller_use = _is_eventlet_monkey_patched('select')
+        self._force_select_poller_use = (
+            pika_drv_cmns.is_eventlet_monkey_patched('select')
+        )
 
         # processing rpc options
         self.default_rpc_exchange = (
@@ -109,7 +96,7 @@ class PikaEngine(object):
             conf.oslo_messaging_pika.rpc_reply_exchange
         )
 
-        self.allowed_remote_exmods = [_EXCEPTIONS_MODULE]
+        self.allowed_remote_exmods = [pika_drv_cmns.EXCEPTIONS_MODULE]
         if allowed_remote_exmods:
             self.allowed_remote_exmods.extend(allowed_remote_exmods)
 
@@ -359,8 +346,8 @@ class PikaEngine(object):
                     self.HOST_CONNECTION_LAST_TRY_TIME
                 ] = cur_time
 
-    @staticmethod
-    def declare_exchange_by_channel(channel, exchange, exchange_type, durable):
+    def declare_exchange_by_channel(self, channel, exchange, exchange_type,
+                                    durable):
         """Declare exchange using already created channel, if they don't exist
 
         :param channel: Channel for communication with RabbitMQ
@@ -373,7 +360,7 @@ class PikaEngine(object):
             channel.exchange_declare(
                 exchange, exchange_type, auto_delete=True, durable=durable
             )
-        except pika_pool.Connection.connectivity_errors as e:
+        except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as e:
             raise pika_drv_exc.ConnectionException(
                 "Connectivity problem detected during declaring exchange: "
                 "exchange:{}, exchange_type: {}, durable: {}. {}".format(
@@ -381,10 +368,9 @@ class PikaEngine(object):
                 )
             )
 
-    @staticmethod
-    def declare_queue_binding_by_channel(channel, exchange, queue, routing_key,
-                                         exchange_type, queue_expiration,
-                                         durable):
+    def declare_queue_binding_by_channel(self, channel, exchange, queue,
+                                         routing_key, exchange_type,
+                                         queue_expiration, durable):
         """Declare exchange, queue and bind them using already created
         channel, if they don't exist
 
@@ -410,7 +396,7 @@ class PikaEngine(object):
             channel.queue_declare(queue, durable=durable, arguments=arguments)
 
             channel.queue_bind(queue, exchange, routing_key)
-        except pika_pool.Connection.connectivity_errors as e:
+        except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as e:
             raise pika_drv_exc.ConnectionException(
                 "Connectivity problem detected during declaring queue "
                 "binding: exchange:{}, queue: {}, routing_key: {}, "
diff --git a/oslo_messaging/_drivers/pika_driver/pika_listener.py b/oslo_messaging/_drivers/pika_driver/pika_listener.py
index 54ede1230..1e52969b7 100644
--- a/oslo_messaging/_drivers/pika_driver/pika_listener.py
+++ b/oslo_messaging/_drivers/pika_driver/pika_listener.py
@@ -13,13 +13,11 @@
 #    under the License.
 
 import threading
-import time
 import uuid
 
 from concurrent import futures
 from oslo_log import log as logging
 
-from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
 from oslo_messaging._drivers.pika_driver import pika_poller as pika_drv_poller
 
 LOG = logging.getLogger(__name__)
@@ -97,15 +95,7 @@ class RpcReplyPikaListener(object):
         """
         while self._reply_poller:
             try:
-                try:
-                    messages = self._reply_poller.poll()
-                except pika_drv_exc.EstablishConnectionException:
-                    LOG.exception("Problem during establishing connection for "
-                                  "reply polling")
-                    time.sleep(
-                        self._pika_engine.host_connection_reconnect_delay
-                    )
-                    continue
+                messages = self._reply_poller.poll()
 
                 for message in messages:
                     try:
diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py
index a5279fa8e..dc3d27912 100644
--- a/oslo_messaging/_drivers/pika_driver/pika_poller.py
+++ b/oslo_messaging/_drivers/pika_driver/pika_poller.py
@@ -13,13 +13,14 @@
 #    under the License.
 
 import threading
+import time
 
 from oslo_log import log as logging
 from oslo_utils import timeutils
-import pika_pool
 import six
 
 from oslo_messaging._drivers import base
+from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
 from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
 from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
 
@@ -143,21 +144,17 @@ class PikaPoller(base.Listener):
         """Cleanup allocated resources (channel, connection, etc). It is unsafe
         method for internal use only
         """
-        if self._channel:
-            try:
-                self._channel.close()
-            except Exception as ex:
-                if not pika_pool.Connection.is_connection_invalidated(ex):
-                    LOG.exception("Unexpected error during closing channel")
-            self._channel = None
-
         if self._connection:
             try:
                 self._connection.close()
-            except Exception as ex:
-                if not pika_pool.Connection.is_connection_invalidated(ex):
-                    LOG.exception("Unexpected error during closing connection")
-            self._connection = None
+            except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS:
+                # expected errors
+                pass
+            except Exception:
+                LOG.exception("Unexpected error during closing connection")
+            finally:
+                self._channel = None
+                self._connection = None
 
         for i in six.moves.range(len(self._message_queue) - 1, -1, -1):
             message = self._message_queue[i]
@@ -201,15 +198,25 @@ class PikaPoller(base.Listener):
                         else:
                             # consumer is stopped so we don't expect new
                             # messages, just process already sent events
-                            self._connection.process_data_events(
-                                time_limit=0
-                            )
+                            if self._channel is not None:
+                                self._connection.process_data_events(
+                                    time_limit=0
+                                )
                             # and return result if we don't see new messages
                             if last_queue_size == len(self._message_queue):
                                 result = self._message_queue[:prefetch_size]
                                 del self._message_queue[:prefetch_size]
                                 return result
-                    except pika_pool.Connection.connectivity_errors:
+                    except pika_drv_exc.EstablishConnectionException as e:
+                        LOG.warn("Problem during establishing connection for"
+                                 "pika poller %s", e, exc_info=True)
+                        time.sleep(
+                            self._pika_engine.host_connection_reconnect_delay
+                        )
+                    except pika_drv_exc.ConnectionException:
+                        self._cleanup()
+                        raise
+                    except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS:
                         self._cleanup()
                         raise
 
@@ -220,19 +227,24 @@ class PikaPoller(base.Listener):
         with self._lock:
             if self._started:
                 return
-            self._started = True
 
-            self._cleanup()
             try:
                 self._reconnect()
-            except Exception as exc:
+            except pika_drv_exc.EstablishConnectionException as exc:
+                LOG.warn("Can not establishing connection during pika "
+                         "Conecting required during first poll() call. %s",
+                         exc, exc_info=True)
+            except pika_drv_exc.ConnectionException as exc:
                 self._cleanup()
-                if isinstance(exc, pika_pool.Connection.connectivity_errors):
-                    raise pika_drv_exc.ConnectionException(
-                        "Connectivity problem detected during establishing "
-                        "poller's connection. " + str(exc))
-                else:
-                    raise exc
+                LOG.warn("Connectivity problem during pika poller's start(). "
+                         "Reconnecting required during first poll() call. %s",
+                         exc, exc_info=True)
+            except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as exc:
+                self._cleanup()
+                LOG.warn("Connectivity problem during pika poller's start(). "
+                         "Reconnecting required during first poll() call. %s",
+                         exc, exc_info=True)
+            self._started = True
 
     def stop(self):
         """Stops poller. Should be called when polling is not needed anymore to
@@ -246,15 +258,10 @@ class PikaPoller(base.Listener):
             if self._queues_to_consume and self._channel:
                 try:
                     self._stop_consuming()
-                except Exception as exc:
+                except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as exc:
                     self._cleanup()
-                    if isinstance(exc,
-                                  pika_pool.Connection.connectivity_errors):
-                        raise pika_drv_exc.ConnectionException(
-                            "Connectivity problem detected during "
-                            "consumer canceling. " + str(exc))
-                    else:
-                        raise exc
+                    LOG.warn("Connectivity problem detected during consumer "
+                             "cancellation. %s", exc, exc_info=True)
             self._started = False
 
     def cleanup(self):
diff --git a/oslo_messaging/tests/drivers/pika/test_message.py b/oslo_messaging/tests/drivers/pika/test_message.py
index 0cc1b869d..7cddb082c 100644
--- a/oslo_messaging/tests/drivers/pika/test_message.py
+++ b/oslo_messaging/tests/drivers/pika/test_message.py
@@ -22,7 +22,7 @@ from oslo_serialization import jsonutils
 import pika
 
 import oslo_messaging
-from oslo_messaging._drivers.pika_driver import pika_engine
+from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
 from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
 
 
@@ -252,7 +252,7 @@ class RpcReplyPikaIncomingMessageTestCase(unittest.TestCase):
     def setUp(self):
         self._pika_engine = mock.Mock()
         self._pika_engine.allowed_remote_exmods = [
-            pika_engine._EXCEPTIONS_MODULE, "oslo_messaging.exceptions"
+            pika_drv_cmns.EXCEPTIONS_MODULE, "oslo_messaging.exceptions"
         ]
 
         self._channel = mock.Mock()
diff --git a/oslo_messaging/tests/drivers/pika/test_poller.py b/oslo_messaging/tests/drivers/pika/test_poller.py
index 1209d131a..23667ab7e 100644
--- a/oslo_messaging/tests/drivers/pika/test_poller.py
+++ b/oslo_messaging/tests/drivers/pika/test_poller.py
@@ -12,6 +12,7 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import socket
 import time
 import unittest
 
@@ -33,6 +34,18 @@ class PikaPollerTestCase(unittest.TestCase):
         )
         self._prefetch_count = 123
 
+    def test_start_when_connection_unavailable(self):
+        incoming_message_class_mock = mock.Mock()
+        poller = pika_poller.PikaPoller(
+            self._pika_engine, self._prefetch_count,
+            incoming_message_class=incoming_message_class_mock
+        )
+
+        self._pika_engine.create_connection.side_effect = socket.timeout()
+
+        # start() should not raise socket.timeout exception
+        poller.start()
+
     @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller."
                 "_declare_queue_binding")
     def test_poll(self, declare_queue_binding_mock):