diff --git a/oslo_messaging/_cmd/zmq_proxy.py b/oslo_messaging/_cmd/zmq_proxy.py
index 80f8ad69c..29bf34e03 100644
--- a/oslo_messaging/_cmd/zmq_proxy.py
+++ b/oslo_messaging/_cmd/zmq_proxy.py
@@ -1,4 +1,4 @@
-#    Copyright 2015 Mirantis, Inc.
+#    Copyright 2015-2016 Mirantis, Inc.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 #    not use this file except in compliance with the License. You may obtain
@@ -12,86 +12,36 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-import argparse
 import logging
 
 from oslo_config import cfg
 
 from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
-from oslo_messaging._drivers.zmq_driver.proxy import zmq_queue_proxy
 from oslo_messaging._drivers.zmq_driver import zmq_options
+from oslo_messaging._i18n import _LI
 
-CONF = cfg.CONF
-
-zmq_options.register_opts(CONF)
-
-opt_group = cfg.OptGroup(name='zmq_proxy_opts',
-                         title='ZeroMQ proxy options')
-CONF.register_opts(zmq_proxy.zmq_proxy_opts, group=opt_group)
-
-
-USAGE = """ Usage: ./zmq-proxy.py [-h] [] ...
-
-Usage example:
- python oslo_messaging/_cmd/zmq-proxy.py"""
+LOG = logging.getLogger(__name__)
 
 
 def main():
-    parser = argparse.ArgumentParser(
-        description='ZeroMQ proxy service',
-        usage=USAGE
-    )
 
-    parser.add_argument('-c', '--config-file', dest='config_file', type=str,
-                        help='Path to configuration file')
-    parser.add_argument('-l', '--log-file', dest='log_file', type=str,
-                        help='Path to log file')
+    conf = cfg.CONF
+    opt_group = cfg.OptGroup(name='zmq_proxy_opts',
+                             title='ZeroMQ proxy options')
+    conf.register_opts(zmq_proxy.zmq_proxy_opts, group=opt_group)
+    zmq_options.register_opts(conf)
+    zmq_proxy.parse_command_line_args(conf)
 
-    parser.add_argument('-H', '--host', dest='host', type=str,
-                        help='Host FQDN for current proxy')
-    parser.add_argument('-f', '--frontend-port', dest='frontend_port',
-                        type=int,
-                        help='Front-end ROUTER port number')
-    parser.add_argument('-b', '--backend-port', dest='backend_port', type=int,
-                        help='Back-end ROUTER port number')
-    parser.add_argument('-p', '--publisher-port', dest='publisher_port',
-                        type=int,
-                        help='Front-end PUBLISHER port number')
-
-    parser.add_argument('-d', '--debug', dest='debug', type=bool,
-                        default=False,
-                        help='Turn on DEBUG logging level instead of INFO')
-
-    args = parser.parse_args()
-
-    if args.config_file:
-        cfg.CONF(['--config-file', args.config_file])
-
-    log_kwargs = {'level': logging.DEBUG if args.debug else logging.INFO,
-                  'format': '%(asctime)s %(name)s %(levelname)-8s %(message)s'}
-    if args.log_file:
-        log_kwargs.update({'filename': args.log_file})
-    logging.basicConfig(**log_kwargs)
-
-    if args.host:
-        CONF.zmq_proxy_opts.host = args.host
-    if args.frontend_port:
-        CONF.set_override('frontend_port', args.frontend_port,
-                          group='zmq_proxy_opts')
-    if args.backend_port:
-        CONF.set_override('backend_port', args.backend_port,
-                          group='zmq_proxy_opts')
-    if args.publisher_port:
-        CONF.set_override('publisher_port', args.publisher_port,
-                          group='zmq_proxy_opts')
-
-    reactor = zmq_proxy.ZmqProxy(CONF, zmq_queue_proxy.UniversalQueueProxy)
+    reactor = zmq_proxy.ZmqProxy(conf)
 
     try:
         while True:
             reactor.run()
     except (KeyboardInterrupt, SystemExit):
+        LOG.info(_LI("Exit proxy by interrupt signal."))
+    finally:
         reactor.close()
 
+
 if __name__ == "__main__":
     main()
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py
index b3f8aae86..c3d45e185 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py
@@ -55,7 +55,7 @@ class Request(object):
         :type retry: int
         """
 
-        if self.msg_type not in zmq_names.MESSAGE_TYPES:
+        if self.msg_type not in zmq_names.REQUEST_TYPES:
             raise RuntimeError("Unknown message type!")
 
         self.target = target
diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/central/__init__.py b/oslo_messaging/_drivers/zmq_driver/proxy/central/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_central_proxy.py
similarity index 60%
rename from oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py
rename to oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_central_proxy.py
index 1b2ebd433..02bff5a83 100644
--- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_central_proxy.py
@@ -1,4 +1,4 @@
-#    Copyright 2015 Mirantis, Inc.
+#    Copyright 2015-2016 Mirantis, Inc.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 #    not use this file except in compliance with the License. You may obtain
@@ -14,114 +14,123 @@
 
 import logging
 
-import six
-
-from oslo_messaging._drivers.zmq_driver.proxy import zmq_publisher_proxy
+from oslo_messaging._drivers.zmq_driver.proxy.central \
+    import zmq_publisher_proxy
+from oslo_messaging._drivers.zmq_driver.proxy import zmq_sender
 from oslo_messaging._drivers.zmq_driver import zmq_async
 from oslo_messaging._drivers.zmq_driver import zmq_names
 from oslo_messaging._drivers.zmq_driver import zmq_socket
 from oslo_messaging._drivers.zmq_driver import zmq_updater
-from oslo_messaging._i18n import _LE, _LI
+from oslo_messaging._i18n import _LI, _LE
 
 zmq = zmq_async.import_zmq()
 LOG = logging.getLogger(__name__)
 
 
-class UniversalQueueProxy(object):
+def check_message_format(func):
+    def _check_message_format(*args, **kwargs):
+        try:
+            return func(*args, **kwargs)
+        except Exception as e:
+            LOG.error(_LE("Received message with wrong format"))
+            LOG.exception(e)
+    return _check_message_format
+
+
+class SingleRouterProxy(object):
 
     def __init__(self, conf, context, matchmaker):
         self.conf = conf
         self.context = context
-        super(UniversalQueueProxy, self).__init__()
+        super(SingleRouterProxy, self).__init__()
         self.matchmaker = matchmaker
+        host = conf.zmq_proxy_opts.host
+
         self.poller = zmq_async.get_poller()
 
         port = conf.zmq_proxy_opts.frontend_port
-        host = conf.zmq_proxy_opts.host
         self.fe_router_socket = zmq_socket.ZmqFixedPortSocket(
             conf, context, zmq.ROUTER, host,
             conf.zmq_proxy_opts.frontend_port) if port != 0 else \
-            zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER, host)
+            zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER,
+                                           host)
 
-        port = conf.zmq_proxy_opts.backend_port
-        self.be_router_socket = zmq_socket.ZmqFixedPortSocket(
-            conf, context, zmq.ROUTER, host,
-            conf.zmq_proxy_opts.backend_port) if port != 0 else \
-            zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER, host)
+        self.poller.register(self.fe_router_socket, self._receive_message)
 
-        self.poller.register(self.fe_router_socket, self._receive_in_request)
-        self.poller.register(self.be_router_socket, self._receive_in_request)
-
-        self.pub_publisher = zmq_publisher_proxy.PublisherProxy(
+        self.publisher = zmq_publisher_proxy.PublisherProxy(
             conf, matchmaker)
-
-        self._router_updater = RouterUpdater(
-            conf, matchmaker, self.pub_publisher.host,
-            self.fe_router_socket.connect_address,
-            self.be_router_socket.connect_address)
+        self.router_sender = zmq_sender.CentralRouterSender()
+        self._router_updater = self._create_router_updater()
 
     def run(self):
         message, socket = self.poller.poll()
         if message is None:
             return
 
-        msg_type = message[0]
+        msg_type = int(message[zmq_names.MESSAGE_TYPE_IDX])
         if self.conf.oslo_messaging_zmq.use_pub_sub and \
                 msg_type in (zmq_names.CAST_FANOUT_TYPE,
                              zmq_names.NOTIFY_TYPE):
-            self.pub_publisher.send_request(message)
+            self.publisher.send_request(message)
         else:
-            self._redirect_message(self.be_router_socket
-                                   if socket is self.fe_router_socket
-                                   else self.fe_router_socket, message)
+            self.router_sender.send_message(
+                self._get_socket_to_dispatch_on(socket), message)
+
+    def _create_router_updater(self):
+        return RouterUpdater(
+            self.conf, self.matchmaker, self.publisher.host,
+            self.fe_router_socket.connect_address,
+            self.fe_router_socket.connect_address)
+
+    def _get_socket_to_dispatch_on(self, socket):
+        return self.fe_router_socket
 
     @staticmethod
-    def _receive_in_request(socket):
-        try:
-            reply_id = socket.recv()
-            assert reply_id is not None, "Valid id expected"
-            empty = socket.recv()
-            assert empty == b'', "Empty delimiter expected"
-            msg_type = int(socket.recv())
-            routing_key = socket.recv()
-            payload = socket.recv_multipart()
-            payload.insert(0, reply_id)
-            payload.insert(0, routing_key)
-            payload.insert(0, msg_type)
-            return payload
-        except (AssertionError, ValueError):
-            LOG.error(_LE("Received message with wrong format"))
-            if socket.getsockopt(zmq.RCVMORE):
-                # NOTE(ozamiatin): Drop the left parts of broken message
-                socket.recv_multipart()
-        except zmq.ZMQError as e:
-            LOG.exception(e)
-        return None
-
-    @staticmethod
-    def _redirect_message(socket, multipart_message):
-        message_type = multipart_message.pop(0)
-        routing_key = multipart_message.pop(0)
-        reply_id = multipart_message.pop(0)
-        message_id = multipart_message[0]
-        socket.send(routing_key, zmq.SNDMORE)
-        socket.send(b'', zmq.SNDMORE)
-        socket.send(reply_id, zmq.SNDMORE)
-        socket.send(six.b(str(message_type)), zmq.SNDMORE)
-        LOG.debug("Dispatching %(msg_type)s message %(msg_id)s - from %(rid)s "
-                  "to -> %(rkey)s" %
-                  {"msg_type": zmq_names.message_type_str(message_type),
-                   "msg_id": message_id,
-                   "rkey": routing_key,
-                   "rid": reply_id})
-        socket.send_multipart(multipart_message)
+    @check_message_format
+    def _receive_message(socket):
+        message = socket.recv_multipart()
+        assert len(message) > zmq_names.MESSAGE_ID_IDX, "Not enough parts"
+        assert message[zmq_names.REPLY_ID_IDX] != b'', "Valid id expected"
+        message_type = int(message[zmq_names.MESSAGE_TYPE_IDX])
+        assert message_type in zmq_names.MESSAGE_TYPES, "Known type expected!"
+        assert message[zmq_names.EMPTY_IDX] == b'', "Empty delimiter expected"
+        return message
 
     def cleanup(self):
+        self._router_updater.cleanup()
         self.poller.close()
         self.fe_router_socket.close()
+        self.publisher.cleanup()
+
+
+class DoubleRouterProxy(SingleRouterProxy):
+
+    def __init__(self, conf, context, matchmaker):
+        LOG.info(_LI('Running double router proxy'))
+        port = conf.zmq_proxy_opts.backend_port
+        host = conf.zmq_proxy_opts.host
+        self.be_router_socket = zmq_socket.ZmqFixedPortSocket(
+            conf, context, zmq.ROUTER, host,
+            conf.zmq_proxy_opts.backend_port) if port != 0 else \
+            zmq_socket.ZmqRandomPortSocket(
+                conf, context, zmq.ROUTER, host)
+        super(DoubleRouterProxy, self).__init__(conf, context, matchmaker)
+        self.poller.register(self.be_router_socket, self._receive_message)
+
+    def _create_router_updater(self):
+        return RouterUpdater(
+            self.conf, self.matchmaker, self.publisher.host,
+            self.fe_router_socket.connect_address,
+            self.be_router_socket.connect_address)
+
+    def _get_socket_to_dispatch_on(self, socket):
+        return self.be_router_socket \
+            if socket is self.fe_router_socket \
+            else self.fe_router_socket
+
+    def cleanup(self):
+        super(DoubleRouterProxy, self).cleanup()
         self.be_router_socket.close()
-        self.pub_publisher.cleanup()
-        self._router_updater.cleanup()
 
 
 class RouterUpdater(zmq_updater.UpdaterBase):
diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_publisher_proxy.py
similarity index 71%
rename from oslo_messaging/_drivers/zmq_driver/proxy/zmq_publisher_proxy.py
rename to oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_publisher_proxy.py
index 727b41903..09f578552 100644
--- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_publisher_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_publisher_proxy.py
@@ -1,4 +1,4 @@
-#    Copyright 2015 Mirantis, Inc.
+#    Copyright 2015-2016 Mirantis, Inc.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 #    not use this file except in compliance with the License. You may obtain
@@ -14,8 +14,8 @@
 
 import logging
 
+from oslo_messaging._drivers.zmq_driver.proxy import zmq_sender
 from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
 from oslo_messaging._drivers.zmq_driver import zmq_socket
 
 LOG = logging.getLogger(__name__)
@@ -44,7 +44,6 @@ class PublisherProxy(object):
         self.matchmaker = matchmaker
 
         port = conf.zmq_proxy_opts.publisher_port
-
         self.socket = zmq_socket.ZmqFixedPortSocket(
             self.conf, self.zmq_context, zmq.PUB, conf.zmq_proxy_opts.host,
             port) if port != 0 else \
@@ -52,23 +51,10 @@ class PublisherProxy(object):
                 self.conf, self.zmq_context, zmq.PUB, conf.zmq_proxy_opts.host)
 
         self.host = self.socket.connect_address
+        self.sender = zmq_sender.CentralPublisherSender()
 
     def send_request(self, multipart_message):
-        message_type = multipart_message.pop(0)
-        assert message_type in (zmq_names.CAST_FANOUT_TYPE,
-                                zmq_names.NOTIFY_TYPE), "Fanout expected!"
-        topic_filter = multipart_message.pop(0)
-        reply_id = multipart_message.pop(0)
-        message_id = multipart_message.pop(0)
-        assert reply_id is not None, "Reply id expected!"
-
-        self.socket.send(topic_filter, zmq.SNDMORE)
-        self.socket.send(message_id, zmq.SNDMORE)
-        self.socket.send_multipart(multipart_message)
-
-        LOG.debug("Publishing message %(message_id)s on [%(topic)s]",
-                  {"topic": topic_filter,
-                   "message_id": message_id})
+        self.sender.send_message(self.socket, multipart_message)
 
     def cleanup(self):
         self.socket.close()
diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py
index 15c777489..886da5464 100644
--- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py
@@ -1,4 +1,4 @@
-#    Copyright 2015 Mirantis, Inc.
+#    Copyright 2015-2016 Mirantis, Inc.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 #    not use this file except in compliance with the License. You may obtain
@@ -12,12 +12,14 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import argparse
 import logging
 import socket
 
+from oslo_config import cfg
 from stevedore import driver
 
-from oslo_config import cfg
+from oslo_messaging._drivers.zmq_driver.proxy.central import zmq_central_proxy
 from oslo_messaging._drivers.zmq_driver import zmq_async
 from oslo_messaging._i18n import _LI
 
@@ -25,6 +27,12 @@ zmq = zmq_async.import_zmq()
 LOG = logging.getLogger(__name__)
 
 
+USAGE = """ Usage: ./zmq-proxy.py [-h] [] ...
+
+Usage example:
+ python oslo_messaging/_cmd/zmq-proxy.py"""
+
+
 zmq_proxy_opts = [
     cfg.StrOpt('host', default=socket.gethostname(),
                help='Hostname (FQDN) of current proxy'
@@ -41,6 +49,56 @@ zmq_proxy_opts = [
 ]
 
 
+def parse_command_line_args(conf):
+    parser = argparse.ArgumentParser(
+        description='ZeroMQ proxy service',
+        usage=USAGE
+    )
+
+    parser.add_argument('-c', '--config-file', dest='config_file', type=str,
+                        help='Path to configuration file')
+    parser.add_argument('-l', '--log-file', dest='log_file', type=str,
+                        help='Path to log file')
+
+    parser.add_argument('-H', '--host', dest='host', type=str,
+                        help='Host FQDN for current proxy')
+    parser.add_argument('-f', '--frontend-port', dest='frontend_port',
+                        type=int,
+                        help='Front-end ROUTER port number')
+    parser.add_argument('-b', '--backend-port', dest='backend_port', type=int,
+                        help='Back-end ROUTER port number')
+    parser.add_argument('-p', '--publisher-port', dest='publisher_port',
+                        type=int,
+                        help='Front-end PUBLISHER port number')
+
+    parser.add_argument('-d', '--debug', dest='debug', type=bool,
+                        default=False,
+                        help='Turn on DEBUG logging level instead of INFO')
+
+    args = parser.parse_args()
+
+    if args.config_file:
+        conf(['--config-file', args.config_file])
+
+    log_kwargs = {'level': logging.DEBUG if args.debug else logging.INFO,
+                  'format': '%(asctime)s %(name)s %(levelname)-8s %(message)s'}
+    if args.log_file:
+        log_kwargs.update({'filename': args.log_file})
+    logging.basicConfig(**log_kwargs)
+
+    if args.host:
+        conf.zmq_proxy_opts.host = args.host
+    if args.frontend_port:
+        conf.set_override('frontend_port', args.frontend_port,
+                          group='zmq_proxy_opts')
+    if args.backend_port:
+        conf.set_override('backend_port', args.backend_port,
+                          group='zmq_proxy_opts')
+    if args.publisher_port:
+        conf.set_override('publisher_port', args.publisher_port,
+                          group='zmq_proxy_opts')
+
+
 class ZmqProxy(object):
     """Wrapper class for Publishers and Routers proxies.
        The main reason to have a proxy is high complexity of TCP sockets number
@@ -80,7 +138,7 @@ class ZmqProxy(object):
 
     """
 
-    def __init__(self, conf, proxy_cls):
+    def __init__(self, conf):
         super(ZmqProxy, self).__init__()
         self.conf = conf
         self.matchmaker = driver.DriverManager(
@@ -88,7 +146,16 @@ class ZmqProxy(object):
             self.conf.oslo_messaging_zmq.rpc_zmq_matchmaker,
         ).driver(self.conf)
         self.context = zmq.Context()
-        self.proxy = proxy_cls(conf, self.context, self.matchmaker)
+        self.proxy = self._choose_proxy_implementation()
+
+    def _choose_proxy_implementation(self):
+        if self.conf.zmq_proxy_opts.frontend_port != 0 and \
+                self.conf.zmq_proxy_opts.backend_port == 0:
+            return zmq_central_proxy.SingleRouterProxy(self.conf, self.context,
+                                                       self.matchmaker)
+        else:
+            return zmq_central_proxy.DoubleRouterProxy(self.conf, self.context,
+                                                       self.matchmaker)
 
     def run(self):
         self.proxy.run()
diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_sender.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_sender.py
new file mode 100644
index 000000000..3499292ff
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_sender.py
@@ -0,0 +1,69 @@
+#    Copyright 2016 Mirantis, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import abc
+import logging
+
+import six
+
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_names
+
+zmq = zmq_async.import_zmq()
+LOG = logging.getLogger(__name__)
+
+
+@six.add_metaclass(abc.ABCMeta)
+class Sender(object):
+
+    @abc.abstractmethod
+    def send_message(self, socket, multipart_message):
+        """Send message to a socket from multipart list"""
+
+
+class CentralRouterSender(Sender):
+
+    def send_message(self, socket, multipart_message):
+        message_type = int(multipart_message[zmq_names.MESSAGE_TYPE_IDX])
+        routing_key = multipart_message[zmq_names.ROUTING_KEY_IDX]
+        reply_id = multipart_message[zmq_names.REPLY_ID_IDX]
+        message_id = multipart_message[zmq_names.MESSAGE_ID_IDX]
+        socket.send(routing_key, zmq.SNDMORE)
+        socket.send(b'', zmq.SNDMORE)
+        socket.send(reply_id, zmq.SNDMORE)
+        socket.send(multipart_message[zmq_names.MESSAGE_TYPE_IDX], zmq.SNDMORE)
+        LOG.debug("Dispatching %(msg_type)s message %(msg_id)s - from %(rid)s "
+                  "to -> %(rkey)s" %
+                  {"msg_type": zmq_names.message_type_str(message_type),
+                   "msg_id": message_id,
+                   "rkey": routing_key,
+                   "rid": reply_id})
+        socket.send_multipart(multipart_message[zmq_names.MESSAGE_ID_IDX:])
+
+
+class CentralPublisherSender(Sender):
+
+    def send_message(self, socket, multipart_message):
+        message_type = int(multipart_message[zmq_names.MESSAGE_TYPE_IDX])
+        assert message_type in (zmq_names.CAST_FANOUT_TYPE,
+                                zmq_names.NOTIFY_TYPE), "Fanout expected!"
+        topic_filter = multipart_message[zmq_names.ROUTING_KEY_IDX]
+        message_id = multipart_message[zmq_names.MESSAGE_ID_IDX]
+
+        socket.send(topic_filter, zmq.SNDMORE)
+        socket.send_multipart(multipart_message[zmq_names.MESSAGE_ID_IDX:])
+
+        LOG.debug("Publishing message %(message_id)s on [%(topic)s]",
+                  {"topic": topic_filter,
+                   "message_id": message_id})
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_address.py b/oslo_messaging/_drivers/zmq_driver/zmq_address.py
index a1ca86d2f..800a26a3a 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_address.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_address.py
@@ -27,10 +27,6 @@ def get_tcp_random_address(conf):
     return "tcp://%s" % conf.oslo_messaging_zmq.rpc_zmq_bind_address
 
 
-def get_broker_address(conf):
-    return "ipc://%s/zmq-broker" % conf.oslo_messaging_zmq.rpc_zmq_ipc_dir
-
-
 def prefix_str(key, listener_type):
     return listener_type + "/" + key
 
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_names.py b/oslo_messaging/_drivers/zmq_driver/zmq_names.py
index 6ec99cb83..83361a2d6 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_names.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_names.py
@@ -23,13 +23,14 @@ FIELD_REPLY_BODY = 'reply_body'
 FIELD_FAILURE = 'failure'
 
 
-IDX_REPLY_TYPE = 1
-IDX_REPLY_BODY = 2
-
-MULTIPART_IDX_ENVELOPE = 0
-MULTIPART_IDX_BODY = 1
+REPLY_ID_IDX = 0
+EMPTY_IDX = 1
+MESSAGE_TYPE_IDX = 2
+ROUTING_KEY_IDX = 3
+MESSAGE_ID_IDX = 4
 
 
+DEFAULT_TYPE = 0
 CALL_TYPE = 1
 CAST_TYPE = 2
 CAST_FANOUT_TYPE = 3
@@ -37,13 +38,17 @@ NOTIFY_TYPE = 4
 REPLY_TYPE = 5
 ACK_TYPE = 6
 
-MESSAGE_TYPES = (CALL_TYPE,
+REQUEST_TYPES = (CALL_TYPE,
                  CAST_TYPE,
                  CAST_FANOUT_TYPE,
                  NOTIFY_TYPE)
 
+RESPONSE_TYPES = (REPLY_TYPE, ACK_TYPE)
+
+MESSAGE_TYPES = REQUEST_TYPES + RESPONSE_TYPES
+
 MULTISEND_TYPES = (CAST_FANOUT_TYPE, NOTIFY_TYPE)
-DIRECT_TYPES = (CALL_TYPE, CAST_TYPE, REPLY_TYPE)
+DIRECT_TYPES = (CALL_TYPE, CAST_TYPE, REPLY_TYPE, ACK_TYPE)
 CAST_TYPES = (CAST_TYPE, CAST_FANOUT_TYPE)
 NOTIFY_TYPES = (NOTIFY_TYPE,)
 NON_BLOCKING_TYPES = CAST_TYPES + NOTIFY_TYPES
diff --git a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py
index 81a708cfd..da21c604b 100644
--- a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py
+++ b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py
@@ -22,8 +22,9 @@ import testscenarios
 from oslo_config import cfg
 
 import oslo_messaging
+from oslo_messaging._drivers.zmq_driver.proxy.central \
+    import zmq_publisher_proxy
 from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
-from oslo_messaging._drivers.zmq_driver.proxy import zmq_publisher_proxy
 from oslo_messaging._drivers.zmq_driver import zmq_address
 from oslo_messaging._drivers.zmq_driver import zmq_async
 from oslo_messaging._drivers.zmq_driver import zmq_names
@@ -82,9 +83,8 @@ class TestPubSub(zmq_common.ZmqBaseTestCase):
         message = {'method': 'hello-world'}
 
         self.publisher.send_request(
-            [zmq_names.CAST_FANOUT_TYPE,
+            [b'', b'', zmq_names.CAST_FANOUT_TYPE,
              zmq_address.target_to_subscribe_filter(target),
-             b"message",
              b"0000-0000",
              self.dumps([context, message])])
 
diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py b/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py
index dea640c19..d1b45acca 100644
--- a/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py
+++ b/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py
@@ -20,7 +20,6 @@ import oslo_messaging
 from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
 from oslo_messaging._drivers.zmq_driver.client import zmq_senders
 from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
-from oslo_messaging._drivers.zmq_driver.proxy import zmq_queue_proxy
 from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
 from oslo_messaging._drivers.zmq_driver import zmq_async
 from oslo_messaging._drivers.zmq_driver import zmq_options
@@ -70,8 +69,7 @@ class TestZmqAckManager(test_utils.BaseTestCase):
         self.driver = transport._driver
 
         # prepare and launch proxy
-        self.proxy = zmq_proxy.ZmqProxy(self.conf,
-                                        zmq_queue_proxy.UniversalQueueProxy)
+        self.proxy = zmq_proxy.ZmqProxy(self.conf)
         vars(self.driver.matchmaker).update(vars(self.proxy.matchmaker))
         self.executor = zmq_async.get_executor(self.proxy.run)
         self.executor.execute()