diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py
index 21118d790..3829fa5e4 100644
--- a/oslo_messaging/_drivers/impl_zmq.py
+++ b/oslo_messaging/_drivers/impl_zmq.py
@@ -72,10 +72,14 @@ zmq_opts = [
                help='The default number of seconds that poll should wait. '
                     'Poll raises timeout exception when timeout expired.'),
 
-    cfg.IntOpt('zmq_target_expire', default=120,
+    cfg.IntOpt('zmq_target_expire', default=300,
                help='Expiration timeout in seconds of a name service record '
                     'about existing target ( < 0 means no timeout).'),
 
+    cfg.IntOpt('zmq_target_update', default=180,
+               help='Update period in seconds of a name service record '
+                    'about existing target.'),
+
     cfg.BoolOpt('use_pub_sub', default=True,
                 help='Use PUB/SUB pattern for fanout methods. '
                      'PUB/SUB always uses proxy.'),
diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py
index 9ed07370a..c75ff4e1f 100644
--- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py
@@ -22,6 +22,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_address
 from oslo_messaging._drivers.zmq_driver import zmq_async
 from oslo_messaging._drivers.zmq_driver import zmq_names
 from oslo_messaging._drivers.zmq_driver import zmq_socket
+from oslo_messaging._drivers.zmq_driver import zmq_updater
 from oslo_messaging._i18n import _LI
 
 zmq = zmq_async.import_zmq()
@@ -55,14 +56,9 @@ class UniversalQueueProxy(object):
         self.pub_publisher = zmq_pub_publisher.PubPublisherProxy(
             conf, matchmaker)
 
-        self.matchmaker.register_publisher(
-            (self.pub_publisher.host, self.fe_router_address))
-        LOG.info(_LI("[PUB:%(pub)s, ROUTER:%(router)s] Run PUB publisher"),
-                 {"pub": self.pub_publisher.host,
-                  "router": self.fe_router_address})
-        self.matchmaker.register_router(self.be_router_address)
-        LOG.info(_LI("[Backend ROUTER:%(router)s] Run ROUTER"),
-                 {"router": self.be_router_address})
+        self._router_updater = RouterUpdater(
+            conf, matchmaker, self.pub_publisher.host, self.fe_router_address,
+            self.be_router_address)
 
     def run(self):
         message, socket = self.poller.poll()
@@ -106,7 +102,7 @@ class UniversalQueueProxy(object):
         socket.send(b'', zmq.SNDMORE)
         socket.send(reply_id, zmq.SNDMORE)
         socket.send(six.b(str(message_type)), zmq.SNDMORE)
-        LOG.debug("Redirecting message %s" % message_id)
+        LOG.debug("Dispatching message %s" % message_id)
         socket.send_multipart(multipart_message)
 
     def cleanup(self):
@@ -116,3 +112,29 @@ class UniversalQueueProxy(object):
         self.matchmaker.unregister_publisher(
             (self.pub_publisher.host, self.fe_router_address))
         self.matchmaker.unregister_router(self.be_router_address)
+
+
+class RouterUpdater(zmq_updater.UpdaterBase):
+    """This entity performs periodic async updates
+    from router proxy to the matchmaker.
+    """
+
+    def __init__(self, conf, matchmaker, publisher_address, fe_router_address,
+                 be_router_address):
+        self.publisher_address = publisher_address
+        self.fe_router_address = fe_router_address
+        self.be_router_address = be_router_address
+        super(RouterUpdater, self).__init__(conf, matchmaker,
+                                            self._update_records)
+
+    def _update_records(self):
+        self.matchmaker.register_publisher(
+            (self.publisher_address, self.fe_router_address),
+            expire=self.conf.zmq_target_expire)
+        LOG.info(_LI("[PUB:%(pub)s, ROUTER:%(router)s] Update PUB publisher"),
+                 {"pub": self.publisher_address,
+                  "router": self.fe_router_address})
+        self.matchmaker.register_router(self.be_router_address,
+                                        expire=self.conf.zmq_target_expire)
+        LOG.info(_LI("[Backend ROUTER:%(router)s] Update ROUTER"),
+                 {"router": self.be_router_address})
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
index 5cba7820a..e446cde21 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
@@ -25,6 +25,7 @@ from oslo_messaging._drivers.zmq_driver.client.publishers \
 from oslo_messaging._drivers.zmq_driver import zmq_address
 from oslo_messaging._drivers.zmq_driver import zmq_async
 from oslo_messaging._drivers.zmq_driver import zmq_names
+from oslo_messaging._drivers.zmq_driver import zmq_updater
 
 zmq = zmq_async.import_zmq()
 
@@ -40,6 +41,8 @@ class DealerPublisherProxy(object):
             conf, matchmaker, zmq.ROUTER, zmq.DEALER)
         self.socket = socket_to_proxy
         self.routing_table = RoutingTable(conf, matchmaker)
+        self.connection_updater = PublisherConnectionUpdater(
+            conf, matchmaker, self.socket)
 
     def send_request(self, request):
         if request.msg_type == zmq_names.CALL_TYPE:
@@ -92,6 +95,8 @@ class CallSenderProxy(zmq_dealer_call_publisher.CallSender):
         self.socket = self.outbound_sockets.get_socket_to_publishers()
         self.reply_waiter.poll_socket(self.socket)
         self.routing_table = RoutingTable(conf, matchmaker)
+        self.connection_updater = PublisherConnectionUpdater(
+            conf, matchmaker, self.socket)
 
     def _connect_socket(self, target):
         return self.socket
@@ -170,3 +175,11 @@ class RoutingTable(object):
     def _renew_routable_hosts(self, target):
         hosts, _ = self.routing_table[str(target)]
         self.routable_hosts[str(target)] = list(hosts)
+
+
+class PublisherConnectionUpdater(zmq_updater.ConnectionUpdater):
+
+    def _update_connection(self):
+        publishers = self.matchmaker.get_publishers()
+        for pub_address, router_address in publishers:
+            self.socket.connect_to_host(router_address)
diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py
index 150cf8a0c..daa1b1746 100644
--- a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py
+++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py
@@ -138,9 +138,14 @@ class RedisMatchMaker(base.MatchMakerBase):
                     "port": self.conf.matchmaker_redis.port,
                     "password": self.conf.matchmaker_redis.password}
 
-    def register_publisher(self, hostname):
+    def _add_key_with_expire(self, key, value, expire):
+        self._redis.sadd(key, value)
+        if expire > 0:
+            self._redis.expire(key, expire)
+
+    def register_publisher(self, hostname, expire=-1):
         host_str = ",".join(hostname)
-        self._redis.sadd(_PUBLISHERS_KEY, host_str)
+        self._add_key_with_expire(_PUBLISHERS_KEY, host_str, expire)
 
     def unregister_publisher(self, hostname):
         host_str = ",".join(hostname)
@@ -153,8 +158,8 @@ class RedisMatchMaker(base.MatchMakerBase):
                       self._get_hosts_by_key(_PUBLISHERS_KEY)])
         return hosts
 
-    def register_router(self, hostname):
-        self._redis.sadd(_ROUTERS_KEY, hostname)
+    def register_router(self, hostname, expire=-1):
+        self._add_key_with_expire(_ROUTERS_KEY, hostname, expire)
 
     def unregister_router(self, hostname):
         self._redis.srem(_ROUTERS_KEY, hostname)
@@ -167,22 +172,22 @@ class RedisMatchMaker(base.MatchMakerBase):
 
     def register(self, target, hostname, listener_type, expire=-1):
 
-        def register_key(key):
-            self._redis.sadd(key, hostname)
-            if expire > 0:
-                self._redis.expire(key, expire)
-
         if target.topic and target.server:
             key = zmq_address.target_to_key(target, listener_type)
-            register_key(key)
+            self._add_key_with_expire(key, hostname, expire)
 
         if target.topic:
             key = zmq_address.prefix_str(target.topic, listener_type)
-            register_key(key)
+            self._add_key_with_expire(key, hostname, expire)
 
     def unregister(self, target, hostname, listener_type):
-        key = zmq_address.target_to_key(target, listener_type)
-        self._redis.srem(key, hostname)
+        if target.topic and target.server:
+            key = zmq_address.target_to_key(target, listener_type)
+            self._redis.srem(key, hostname)
+
+        if target.topic:
+            key = zmq_address.prefix_str(target.topic, listener_type)
+            self._redis.srem(key, hostname)
 
     def get_hosts(self, target, listener_type):
         LOG.debug("[Redis] get_hosts for target %s", target)
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py
index 86dddee61..69c6958db 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py
@@ -14,7 +14,6 @@
 
 import abc
 import logging
-import time
 
 import six
 
@@ -23,6 +22,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_address
 from oslo_messaging._drivers.zmq_driver import zmq_async
 from oslo_messaging._drivers.zmq_driver import zmq_names
 from oslo_messaging._drivers.zmq_driver import zmq_socket
+from oslo_messaging._drivers.zmq_driver import zmq_updater
 from oslo_messaging._i18n import _LE
 
 LOG = logging.getLogger(__name__)
@@ -60,8 +60,8 @@ class SingleSocketConsumer(ConsumerBase):
         self.socket_type = socket_type
         self.host = None
         self.socket = self.subscribe_socket(socket_type)
-        self.target_updater = TargetUpdater(conf, self.matchmaker, self.target,
-                                            self.host, socket_type)
+        self.target_updater = TargetUpdater(
+            conf, self.matchmaker, self.target, self.host, socket_type)
 
     def subscribe_socket(self, socket_type):
         try:
@@ -96,25 +96,20 @@ class SingleSocketConsumer(ConsumerBase):
         super(SingleSocketConsumer, self).cleanup()
 
 
-class TargetUpdater(object):
+class TargetUpdater(zmq_updater.UpdaterBase):
     """This entity performs periodic async updates
     to the matchmaker.
     """
 
     def __init__(self, conf, matchmaker, target, host, socket_type):
-        self.conf = conf
-        self.matchmaker = matchmaker
         self.target = target
         self.host = host
         self.socket_type = socket_type
-        self.executor = zmq_async.get_executor(method=self._update_target)
-        self.executor.execute()
+        super(TargetUpdater, self).__init__(conf, matchmaker,
+                                            self._update_target)
 
     def _update_target(self):
         self.matchmaker.register(
             self.target, self.host,
-            zmq_names.socket_type_str(self.socket_type))
-        time.sleep(self.conf.zmq_target_expire / 2)
-
-    def cleanup(self):
-        self.executor.stop()
+            zmq_names.socket_type_str(self.socket_type),
+            expire=self.conf.zmq_target_expire)
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
index f1f5d6018..f0fd11177 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
@@ -25,6 +25,7 @@ from oslo_messaging._drivers.zmq_driver.server.consumers\
     import zmq_consumer_base
 from oslo_messaging._drivers.zmq_driver import zmq_async
 from oslo_messaging._drivers.zmq_driver import zmq_names
+from oslo_messaging._drivers.zmq_driver import zmq_updater
 from oslo_messaging._i18n import _LE, _LI
 
 LOG = logging.getLogger(__name__)
@@ -90,6 +91,8 @@ class DealerConsumer(zmq_consumer_base.ConsumerBase):
         self.target_updater = zmq_consumer_base.TargetUpdater(
             conf, self.matchmaker, self.target, self.host,
             zmq.DEALER)
+        self.connection_updater = ConsumerConnectionUpdater(
+            conf, self.matchmaker, self.socket)
         LOG.info(_LI("[%s] Run DEALER consumer"), self.host)
 
     def receive_message(self, socket):
@@ -111,6 +114,19 @@ class DealerConsumer(zmq_consumer_base.ConsumerBase):
             else:
                 LOG.error(_LE("Unknown message type: %s"),
                           zmq_names.message_type_str(message_type))
-
         except (zmq.ZMQError, AssertionError) as e:
             LOG.error(_LE("Receiving message failure: %s"), str(e))
+
+    def cleanup(self):
+        LOG.info(_LI("[%s] Destroy DEALER consumer"), self.host)
+        super(DealerConsumer, self).cleanup()
+        self.matchmaker.unregister(self.target, self.host,
+                                   zmq_names.socket_type_str(zmq.DEALER))
+
+
+class ConsumerConnectionUpdater(zmq_updater.ConnectionUpdater):
+
+    def _update_connection(self):
+        routers = self.matchmaker.get_routers()
+        for router_address in routers:
+            self.socket.connect_to_host(router_address)
diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py
index 254c6e5ed..c963c452f 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py
@@ -48,11 +48,11 @@ class ZmqServer(base.PollStyleListener):
             conf, self.poller, self) if conf.use_pub_sub else None
 
         self.consumers = []
-        if self.router_consumer:
+        if self.router_consumer is not None:
             self.consumers.append(self.router_consumer)
-        if self.dealer_consumer:
+        if self.dealer_consumer is not None:
             self.consumers.append(self.dealer_consumer)
-        if self.sub_consumer:
+        if self.sub_consumer is not None:
             self.consumers.append(self.sub_consumer)
 
     @base.batch_poll_helper
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
index 2ca816b41..a97343e7f 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
@@ -96,6 +96,8 @@ class ZmqSocket(object):
         self.handle.close(*args, **kwargs)
 
     def connect_to_address(self, address):
+        if address in self.connections:
+            return
         stype = zmq_names.socket_type_str(self.socket_type)
         try:
             LOG.info(_LI("Connecting %(stype)s id %(id)s to %(address)s"),
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_updater.py b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py
new file mode 100644
index 000000000..0b4594a33
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py
@@ -0,0 +1,55 @@
+#    Copyright 2016 Mirantis, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import abc
+import logging
+import time
+
+import six
+
+from oslo_messaging._drivers.zmq_driver import zmq_async
+
+LOG = logging.getLogger(__name__)
+
+zmq = zmq_async.import_zmq()
+
+
+class UpdaterBase(object):
+
+    def __init__(self, conf, matchmaker, update_method):
+        self.conf = conf
+        self.matchmaker = matchmaker
+        self.update_method = update_method
+        self.executor = zmq_async.get_executor(method=self._update_loop)
+        self.executor.execute()
+
+    def _update_loop(self):
+        self.update_method()
+        time.sleep(self.conf.zmq_target_update)
+
+    def cleanup(self):
+        self.executor.stop()
+
+
+@six.add_metaclass(abc.ABCMeta)
+class ConnectionUpdater(UpdaterBase):
+
+    def __init__(self, conf, matchmaker, socket):
+        self.socket = socket
+        super(ConnectionUpdater, self).__init__(
+            conf, matchmaker, self._update_connection)
+
+    @abc.abstractmethod
+    def _update_connection(self):
+        """Update connection info"""