From 6f6a0ae5bcaf3c37b03534e96925ac161b0ef184 Mon Sep 17 00:00:00 2001
From: Alexi Yelistratov <ayelistratov@mirantis.com>
Date: Fri, 25 Dec 2015 18:36:21 +0300
Subject: [PATCH] [zmq] Added redis sentinel HA implementation to zmq driver

List of redis sentinel hosts is now supported in order to
use automatic failover when redis master goes down.

Change-Id: I5fad4c9b6c6aea4f8f382f7469899a7d05c068c1
Closes-Bug: #1518292
---
 .../zmq_driver/matchmaker/matchmaker_redis.py | 65 +++++++++++++++++--
 test-requirements.txt                         |  1 +
 2 files changed, 61 insertions(+), 5 deletions(-)

diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py
index 1c12934d8..36ed3ef6c 100644
--- a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py
+++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py
@@ -11,6 +11,7 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import inspect
 import logging
 
 from oslo_config import cfg
@@ -18,8 +19,10 @@ from oslo_utils import importutils
 
 from oslo_messaging._drivers.zmq_driver.matchmaker import base
 from oslo_messaging._drivers.zmq_driver import zmq_address
+from retrying import retry
 
 redis = importutils.try_import('redis')
+redis_sentinel = importutils.try_import('redis.sentinel')
 LOG = logging.getLogger(__name__)
 
 
@@ -34,22 +37,74 @@ matchmaker_redis_opts = [
                default='',
                secret=True,
                help='Password for Redis server (optional).'),
+    cfg.ListOpt('sentinel_hosts',
+                default=[],
+                help='List of Redis Sentinel hosts (fault tolerance mode) e.g.\
+                [host:port, host1:port ... ]'),
+    cfg.StrOpt('sentinel_group_name',
+               default='oslo-messaging-zeromq',
+               help='Redis replica set name.'),
+    cfg.IntOpt('wait_timeout',
+               default=500,
+               help='Time in ms to wait between connection attempts.'),
+    cfg.IntOpt('check_timeout',
+               default=20000,
+               help='Time in ms to wait before the transaction is killed.'),
+    cfg.IntOpt('socket_timeout',
+               default=1000,
+               help='Timeout in ms on blocking socket operations'),
 ]
 
 _PUBLISHERS_KEY = "PUBLISHERS"
 
 
+def retry_if_connection_error(ex):
+        return isinstance(ex, redis.ConnectionError)
+
+
+def apply_retrying(obj, cfg):
+    for attr_name, attr in inspect.getmembers(obj):
+        if not (inspect.ismethod(attr) or inspect.isfunction(attr)):
+            continue
+        if attr_name.startswith("_"):
+            continue
+        setattr(
+            obj,
+            attr_name,
+            retry(
+                wait_fixed=cfg.matchmaker_redis.wait_timeout,
+                stop_max_delay=cfg.matchmaker_redis.check_timeout,
+                retry_on_exception=retry_if_connection_error
+            )(attr))
+
+
 class RedisMatchMaker(base.MatchMakerBase):
 
     def __init__(self, conf, *args, **kwargs):
         super(RedisMatchMaker, self).__init__(conf, *args, **kwargs)
         self.conf.register_opts(matchmaker_redis_opts, "matchmaker_redis")
 
-        self._redis = redis.StrictRedis(
-            host=self.conf.matchmaker_redis.host,
-            port=self.conf.matchmaker_redis.port,
-            password=self.conf.matchmaker_redis.password,
-        )
+        if not self.conf.matchmaker_redis.sentinel_hosts:
+            self._redis = redis.StrictRedis(
+                host=self.conf.matchmaker_redis.host,
+                port=self.conf.matchmaker_redis.port,
+                password=self.conf.matchmaker_redis.password,
+            )
+        else:
+            socket_timeout = self.conf.matchmaker_redis.socket_timeout / 1000.
+            s = self.conf.matchmaker_redis.sentinel_hosts
+            sentinel_hosts = [tuple(i.split(":")) for i in s]
+            sentinel = redis.sentinel.Sentinel(
+                sentinels=sentinel_hosts,
+                socket_timeout=socket_timeout
+            )
+
+            self._redis = sentinel.master_for(
+                self.conf.matchmaker_redis.sentinel_group_name,
+                socket_timeout=socket_timeout
+            )
+
+        apply_retrying(self, self.conf)
 
     def register_publisher(self, hostname):
         host_str = ",".join(hostname)
diff --git a/test-requirements.txt b/test-requirements.txt
index 1387e1a6b..309fbe766 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -17,6 +17,7 @@ oslotest>=1.10.0 # Apache-2.0
 
 # for test_matchmaker_redis
 redis>=2.10.0
+retrying>=1.2.3,!=1.3.0 # Apache-2.0
 
 # for test_impl_zmq
 pyzmq>=14.3.1 # LGPL+BSD