diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py
index 9e74bc498..89b630ed7 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py
@@ -13,6 +13,7 @@
 #    under the License.
 
 import logging
+import uuid
 
 import six
 
@@ -23,7 +24,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_address
 from oslo_messaging._drivers.zmq_driver import zmq_async
 from oslo_messaging._drivers.zmq_driver import zmq_socket
 from oslo_messaging._drivers.zmq_driver import zmq_updater
-from oslo_messaging._i18n import _LE
+from oslo_messaging._i18n import _LE, _LI
 
 LOG = logging.getLogger(__name__)
 
@@ -37,32 +38,34 @@ class SubConsumer(zmq_consumer_base.ConsumerBase):
         self.matchmaker = server.matchmaker
         self.target = server.target
         self.socket = zmq_socket.ZmqSocket(self.conf, self.context, zmq.SUB,
-                                           immediate=False)
+                                           immediate=False,
+                                           identity=self._generate_identity())
         self.sockets.append(self.socket)
-        self._subscribe_on_target(self.target)
+        self.host = self.socket.handle.identity
+        self._subscribe_to_topic()
         self.connection_updater = SubscriberConnectionUpdater(
             conf, self.matchmaker, self.socket)
         self.poller.register(self.socket, self.receive_message)
+        LOG.info(_LI("[%s] Run SUB consumer"), self.host)
 
-    def _subscribe_on_target(self, target):
-        topic_filter = zmq_address.target_to_subscribe_filter(target)
-        if target.topic:
-            self.socket.setsockopt(zmq.SUBSCRIBE, six.b(target.topic))
-        if target.server:
-            self.socket.setsockopt(zmq.SUBSCRIBE, six.b(target.server))
-        if target.topic and target.server:
-            self.socket.setsockopt(zmq.SUBSCRIBE, topic_filter)
+    def _generate_identity(self):
+        return six.b(self.conf.oslo_messaging_zmq.rpc_zmq_host + '/') + \
+            zmq_address.target_to_subscribe_filter(self.target) + \
+            six.b('/' + str(uuid.uuid4()))
+
+    def _subscribe_to_topic(self):
+        topic_filter = zmq_address.target_to_subscribe_filter(self.target)
+        self.socket.setsockopt(zmq.SUBSCRIBE, topic_filter)
         LOG.debug("[%(host)s] Subscribing to topic %(filter)s",
-                  {"host": self.socket.handle.identity,
-                   "filter": topic_filter})
+                  {"host": self.host, "filter": topic_filter})
 
-    @staticmethod
-    def _receive_request(socket):
+    def _receive_request(self, socket):
         topic_filter = socket.recv()
         message_id = socket.recv()
         context, message = socket.recv_loaded()
-        LOG.debug("Received %(topic_filter)s topic message %(id)s",
-                  {'id': message_id, 'topic_filter': topic_filter})
+        LOG.debug("[%(host)s] Received on topic %(filter)s message %(msg_id)s",
+                  {'host': self.host, 'filter': topic_filter,
+                   'msg_id': message_id})
         return context, message
 
     def receive_message(self, socket):
@@ -75,6 +78,7 @@ class SubConsumer(zmq_consumer_base.ConsumerBase):
             LOG.error(_LE("Receiving message failed: %s"), str(e))
 
     def cleanup(self):
+        LOG.info(_LI("[%s] Destroy SUB consumer"), self.host)
         self.connection_updater.cleanup()
         super(SubConsumer, self).cleanup()
 
@@ -83,7 +87,7 @@ class SubscriberConnectionUpdater(zmq_updater.ConnectionUpdater):
 
     def _update_connection(self):
         publishers = self.matchmaker.get_publishers()
-        for host, sync in publishers:
-            self.socket.connect(zmq_address.get_tcp_direct_address(host))
+        for publisher_address, router_address in publishers:
+            self.socket.connect_to_host(publisher_address)
         LOG.debug("[%s] SUB consumer connected to publishers %s",
                   self.socket.handle.identity, publishers)
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_address.py b/oslo_messaging/_drivers/zmq_driver/zmq_address.py
index 5f4d2a031..4d45dea54 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_address.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_address.py
@@ -50,11 +50,4 @@ def target_to_key(target, listener_type=None):
 
 
 def target_to_subscribe_filter(target):
-    if target.topic and target.server:
-        attributes = ['topic', 'server']
-        key = "/".join(getattr(target, attr) for attr in attributes)
-        return six.b(key)
-    if target.topic:
-        return six.b(target.topic)
-    if target.server:
-        return six.b(target.server)
+    return six.b(target.topic)
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
index 56a9d35a7..9210f637c 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
@@ -177,18 +177,19 @@ class ZmqSocket(object):
         if address in self.connections:
             return
         stype = zmq_names.socket_type_str(self.socket_type)
+        sid = self.handle.identity
         try:
-            LOG.info(_LI("Connecting %(stype)s id %(id)s to %(address)s"),
-                     {"stype": stype,
-                      "id": self.handle.identity,
-                      "address": address})
+            LOG.info(_LI("Connecting %(stype)s socket %(sid)s to %(address)s"),
+                     {"stype": stype, "sid": sid, "address": address})
             self.connect(address)
         except zmq.ZMQError as e:
-            errmsg = _LE("Failed connecting %(stype)s to %(address)s: %(e)s") \
-                % {"stype": stype, "address": address, "e": e}
-            LOG.error(_LE("Failed connecting %(stype)s to %(address)s: %(e)s"),
-                      {"stype": stype, "address": address, "e": e})
-            raise rpc_common.RPCException(errmsg)
+            LOG.error(_LE("Failed connecting %(stype)s-%(sid)s to "
+                          "%(address)s: %(e)s"),
+                      {"stype": stype, "sid": sid, "address": address, "e": e})
+            raise rpc_common.RPCException(
+                "Failed connecting %(stype)s-%(sid)s to %(address)s: %(e)s" %
+                {"stype": stype, "sid": sid, "address": address, "e": e}
+            )
 
     def connect_to_host(self, host):
         address = zmq_address.get_tcp_direct_address(