From e39be47b0621342d595fec4ac019b0a11b65cb7d Mon Sep 17 00:00:00 2001
From: ozamiatin <ozamiatin@mirantis.com>
Date: Sat, 31 Dec 2016 03:24:44 +0200
Subject: [PATCH] [zmq] Redis TTL for values

Redis as a key-value storage has limitations on values
expiration by ttl, only keys can expire. Regarding this
we need to introduce some redundancy and register hostnames
as keys along with general target keys to make them expire
on ttl timeout.

Change-Id: Ic0b06d8ec27b63ca49afe941a32020b5e532598c
---
 .../matchmaker/zmq_matchmaker_redis.py        | 36 ++++++++++++++++---
 1 file changed, 32 insertions(+), 4 deletions(-)

diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py
index a7191f22e..5b066af0c 100644
--- a/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py
+++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py
@@ -160,13 +160,21 @@ class MatchmakerRedisBase(zmq_matchmaker_base.MatchmakerBase):
     def _smembers(self, key):
         pass
 
+    @abc.abstractmethod
+    def _ttl(self, key):
+        pass
+
     @no_reraise
     def register_publisher(self, hostname, expire=-1):
-        self._sadd(_PUBLISHERS_KEY, ','.join(hostname), expire)
+        hostname = ','.join(hostname)
+        self._sadd(_PUBLISHERS_KEY, hostname, expire)
+        self._sadd(hostname, ' ', expire)
 
     @no_reraise
     def unregister_publisher(self, hostname):
-        self._srem(_PUBLISHERS_KEY, ','.join(hostname))
+        hostname = ','.join(hostname)
+        self._srem(_PUBLISHERS_KEY, hostname)
+        self._srem(hostname, ' ')
 
     @empty_list_on_error
     def get_publishers(self):
@@ -176,10 +184,12 @@ class MatchmakerRedisBase(zmq_matchmaker_base.MatchmakerBase):
     @no_reraise
     def register_router(self, hostname, expire=-1):
         self._sadd(_ROUTERS_KEY, hostname, expire)
+        self._sadd(hostname, ' ', expire)
 
     @no_reraise
     def unregister_router(self, hostname):
         self._srem(_ROUTERS_KEY, hostname)
+        self._srem(hostname, ' ')
 
     @empty_list_on_error
     def get_routers(self):
@@ -192,18 +202,22 @@ class MatchmakerRedisBase(zmq_matchmaker_base.MatchmakerBase):
         if target.server:
             key = zmq_address.target_to_key(target, listener_type)
             self._sadd(key, hostname, expire)
+            self._sadd(hostname, ' ', expire)
 
         key = zmq_address.prefix_str(target.topic, listener_type)
         self._sadd(key, hostname, expire)
+        self._sadd(hostname, ' ', expire)
 
     @no_reraise
     def unregister(self, target, hostname, listener_type):
         if target.server:
             key = zmq_address.target_to_key(target, listener_type)
             self._srem(key, hostname)
+            self._srem(hostname, ' ')
 
         key = zmq_address.prefix_str(target.topic, listener_type)
         self._srem(key, hostname)
+        self._srem(hostname, ' ')
 
     def get_hosts(self, target, listener_type):
         hosts = []
@@ -315,9 +329,19 @@ class MatchmakerRedis(MatchmakerRedisBase):
     def _srem(self, redis_instance, key, value):
         redis_instance.srem(key, value)
 
+    @read_from_redis_connection_warn
+    def _ttl(self, redis_instance, key):
+        # NOTE(ozamiatin): If the specialized key doesn't exist,
+        # TTL fuction would return -2. If key exists,
+        # but doesn't have expiration associated,
+        # TTL func would return -1. For more information,
+        # please visit http://redis.io/commands/ttl
+        return redis_instance.ttl(key)
+
     @read_from_redis_connection_warn
     def _smembers(self, redis_instance, key):
-        return redis_instance.smembers(key)
+        hosts = redis_instance.smembers(key)
+        return [host for host in hosts if redis_instance.ttl(host) >= -1]
 
 
 class MatchmakerRedisAvailabilityUpdater(zmq_updater.UpdaterBase):
@@ -421,4 +445,8 @@ class MatchmakerSentinel(MatchmakerRedisBase):
         self._redis_master.srem(key, value)
 
     def _smembers(self, key):
-        return self._redis_slave.smembers(key)
+        hosts = self._redis_slave.smembers(key)
+        return [host for host in hosts if self._ttl(host) >= -1]
+
+    def _ttl(self, key):
+        return self._redis_slave.ttl(key)