From 27594bd40f2562f29aef1ae0337be5722fe76f8e Mon Sep 17 00:00:00 2001
From: ozamiatin <ozamiatin@mirantis.com>
Date: Wed, 21 Sep 2016 01:27:54 +0300
Subject: [PATCH] [zmq] Make second ROUTER socket optional for proxy

Since proxy supposed to be used in different configurations
second ROUTER socket may stay unused (e.g. when we use proxy
for publishing only fanout messages in mixed direct/pub-sub
configuration see ZmqClientMixDirectPubSub).

This patch introduces two modes of proxy SingleRouterProxy (where
frontend and backend ROUTERS are the same socket) and
DoubleRouterProxy (different sockets for the frontend and for the
backend - the original behavior).

Change-Id: Ia859b92e1f238fcbbcf42e17b06e0f4ad04e79f6
---
 oslo_messaging/_cmd/zmq_proxy.py              |  76 ++-------
 .../_drivers/zmq_driver/client/zmq_request.py |   2 +-
 .../zmq_driver/proxy/central/__init__.py      |   0
 .../zmq_central_proxy.py}                     | 147 ++++++++++--------
 .../{ => central}/zmq_publisher_proxy.py      |  22 +--
 .../_drivers/zmq_driver/proxy/zmq_proxy.py    |  75 ++++++++-
 .../_drivers/zmq_driver/proxy/zmq_sender.py   |  69 ++++++++
 .../_drivers/zmq_driver/zmq_address.py        |   4 -
 .../_drivers/zmq_driver/zmq_names.py          |  19 ++-
 .../tests/drivers/zmq/test_pub_sub.py         |   6 +-
 .../tests/drivers/zmq/test_zmq_ack_manager.py |   4 +-
 11 files changed, 252 insertions(+), 172 deletions(-)
 create mode 100644 oslo_messaging/_drivers/zmq_driver/proxy/central/__init__.py
 rename oslo_messaging/_drivers/zmq_driver/proxy/{zmq_queue_proxy.py => central/zmq_central_proxy.py} (60%)
 rename oslo_messaging/_drivers/zmq_driver/proxy/{ => central}/zmq_publisher_proxy.py (71%)
 create mode 100644 oslo_messaging/_drivers/zmq_driver/proxy/zmq_sender.py

diff --git a/oslo_messaging/_cmd/zmq_proxy.py b/oslo_messaging/_cmd/zmq_proxy.py
index 80f8ad69c..29bf34e03 100644
--- a/oslo_messaging/_cmd/zmq_proxy.py
+++ b/oslo_messaging/_cmd/zmq_proxy.py
@@ -1,4 +1,4 @@
-#    Copyright 2015 Mirantis, Inc.
+#    Copyright 2015-2016 Mirantis, Inc.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 #    not use this file except in compliance with the License. You may obtain
@@ -12,86 +12,36 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-import argparse
 import logging
 
 from oslo_config import cfg
 
 from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
-from oslo_messaging._drivers.zmq_driver.proxy import zmq_queue_proxy
 from oslo_messaging._drivers.zmq_driver import zmq_options
+from oslo_messaging._i18n import _LI
 
-CONF = cfg.CONF
-
-zmq_options.register_opts(CONF)
-
-opt_group = cfg.OptGroup(name='zmq_proxy_opts',
-                         title='ZeroMQ proxy options')
-CONF.register_opts(zmq_proxy.zmq_proxy_opts, group=opt_group)
-
-
-USAGE = """ Usage: ./zmq-proxy.py [-h] [] ...
-
-Usage example:
- python oslo_messaging/_cmd/zmq-proxy.py"""
+LOG = logging.getLogger(__name__)
 
 
 def main():
-    parser = argparse.ArgumentParser(
-        description='ZeroMQ proxy service',
-        usage=USAGE
-    )
 
-    parser.add_argument('-c', '--config-file', dest='config_file', type=str,
-                        help='Path to configuration file')
-    parser.add_argument('-l', '--log-file', dest='log_file', type=str,
-                        help='Path to log file')
+    conf = cfg.CONF
+    opt_group = cfg.OptGroup(name='zmq_proxy_opts',
+                             title='ZeroMQ proxy options')
+    conf.register_opts(zmq_proxy.zmq_proxy_opts, group=opt_group)
+    zmq_options.register_opts(conf)
+    zmq_proxy.parse_command_line_args(conf)
 
-    parser.add_argument('-H', '--host', dest='host', type=str,
-                        help='Host FQDN for current proxy')
-    parser.add_argument('-f', '--frontend-port', dest='frontend_port',
-                        type=int,
-                        help='Front-end ROUTER port number')
-    parser.add_argument('-b', '--backend-port', dest='backend_port', type=int,
-                        help='Back-end ROUTER port number')
-    parser.add_argument('-p', '--publisher-port', dest='publisher_port',
-                        type=int,
-                        help='Front-end PUBLISHER port number')
-
-    parser.add_argument('-d', '--debug', dest='debug', type=bool,
-                        default=False,
-                        help='Turn on DEBUG logging level instead of INFO')
-
-    args = parser.parse_args()
-
-    if args.config_file:
-        cfg.CONF(['--config-file', args.config_file])
-
-    log_kwargs = {'level': logging.DEBUG if args.debug else logging.INFO,
-                  'format': '%(asctime)s %(name)s %(levelname)-8s %(message)s'}
-    if args.log_file:
-        log_kwargs.update({'filename': args.log_file})
-    logging.basicConfig(**log_kwargs)
-
-    if args.host:
-        CONF.zmq_proxy_opts.host = args.host
-    if args.frontend_port:
-        CONF.set_override('frontend_port', args.frontend_port,
-                          group='zmq_proxy_opts')
-    if args.backend_port:
-        CONF.set_override('backend_port', args.backend_port,
-                          group='zmq_proxy_opts')
-    if args.publisher_port:
-        CONF.set_override('publisher_port', args.publisher_port,
-                          group='zmq_proxy_opts')
-
-    reactor = zmq_proxy.ZmqProxy(CONF, zmq_queue_proxy.UniversalQueueProxy)
+    reactor = zmq_proxy.ZmqProxy(conf)
 
     try:
         while True:
             reactor.run()
     except (KeyboardInterrupt, SystemExit):
+        LOG.info(_LI("Exit proxy by interrupt signal."))
+    finally:
         reactor.close()
 
+
 if __name__ == "__main__":
     main()
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py
index b3f8aae86..c3d45e185 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py
@@ -55,7 +55,7 @@ class Request(object):
         :type retry: int
         """
 
-        if self.msg_type not in zmq_names.MESSAGE_TYPES:
+        if self.msg_type not in zmq_names.REQUEST_TYPES:
             raise RuntimeError("Unknown message type!")
 
         self.target = target
diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/central/__init__.py b/oslo_messaging/_drivers/zmq_driver/proxy/central/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_central_proxy.py
similarity index 60%
rename from oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py
rename to oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_central_proxy.py
index 1b2ebd433..02bff5a83 100644
--- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_central_proxy.py
@@ -1,4 +1,4 @@
-#    Copyright 2015 Mirantis, Inc.
+#    Copyright 2015-2016 Mirantis, Inc.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 #    not use this file except in compliance with the License. You may obtain
@@ -14,114 +14,123 @@
 
 import logging
 
-import six
-
-from oslo_messaging._drivers.zmq_driver.proxy import zmq_publisher_proxy
+from oslo_messaging._drivers.zmq_driver.proxy.central \
+    import zmq_publisher_proxy
+from oslo_messaging._drivers.zmq_driver.proxy import zmq_sender
 from oslo_messaging._drivers.zmq_driver import zmq_async
 from oslo_messaging._drivers.zmq_driver import zmq_names
 from oslo_messaging._drivers.zmq_driver import zmq_socket
 from oslo_messaging._drivers.zmq_driver import zmq_updater
-from oslo_messaging._i18n import _LE, _LI
+from oslo_messaging._i18n import _LI, _LE
 
 zmq = zmq_async.import_zmq()
 LOG = logging.getLogger(__name__)
 
 
-class UniversalQueueProxy(object):
+def check_message_format(func):
+    def _check_message_format(*args, **kwargs):
+        try:
+            return func(*args, **kwargs)
+        except Exception as e:
+            LOG.error(_LE("Received message with wrong format"))
+            LOG.exception(e)
+    return _check_message_format
+
+
+class SingleRouterProxy(object):
 
     def __init__(self, conf, context, matchmaker):
         self.conf = conf
         self.context = context
-        super(UniversalQueueProxy, self).__init__()
+        super(SingleRouterProxy, self).__init__()
         self.matchmaker = matchmaker
+        host = conf.zmq_proxy_opts.host
+
         self.poller = zmq_async.get_poller()
 
         port = conf.zmq_proxy_opts.frontend_port
-        host = conf.zmq_proxy_opts.host
         self.fe_router_socket = zmq_socket.ZmqFixedPortSocket(
             conf, context, zmq.ROUTER, host,
             conf.zmq_proxy_opts.frontend_port) if port != 0 else \
-            zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER, host)
+            zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER,
+                                           host)
 
-        port = conf.zmq_proxy_opts.backend_port
-        self.be_router_socket = zmq_socket.ZmqFixedPortSocket(
-            conf, context, zmq.ROUTER, host,
-            conf.zmq_proxy_opts.backend_port) if port != 0 else \
-            zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER, host)
+        self.poller.register(self.fe_router_socket, self._receive_message)
 
-        self.poller.register(self.fe_router_socket, self._receive_in_request)
-        self.poller.register(self.be_router_socket, self._receive_in_request)
-
-        self.pub_publisher = zmq_publisher_proxy.PublisherProxy(
+        self.publisher = zmq_publisher_proxy.PublisherProxy(
             conf, matchmaker)
-
-        self._router_updater = RouterUpdater(
-            conf, matchmaker, self.pub_publisher.host,
-            self.fe_router_socket.connect_address,
-            self.be_router_socket.connect_address)
+        self.router_sender = zmq_sender.CentralRouterSender()
+        self._router_updater = self._create_router_updater()
 
     def run(self):
         message, socket = self.poller.poll()
         if message is None:
             return
 
-        msg_type = message[0]
+        msg_type = int(message[zmq_names.MESSAGE_TYPE_IDX])
         if self.conf.oslo_messaging_zmq.use_pub_sub and \
                 msg_type in (zmq_names.CAST_FANOUT_TYPE,
                              zmq_names.NOTIFY_TYPE):
-            self.pub_publisher.send_request(message)
+            self.publisher.send_request(message)
         else:
-            self._redirect_message(self.be_router_socket
-                                   if socket is self.fe_router_socket
-                                   else self.fe_router_socket, message)
+            self.router_sender.send_message(
+                self._get_socket_to_dispatch_on(socket), message)
+
+    def _create_router_updater(self):
+        return RouterUpdater(
+            self.conf, self.matchmaker, self.publisher.host,
+            self.fe_router_socket.connect_address,
+            self.fe_router_socket.connect_address)
+
+    def _get_socket_to_dispatch_on(self, socket):
+        return self.fe_router_socket
 
     @staticmethod
-    def _receive_in_request(socket):
-        try:
-            reply_id = socket.recv()
-            assert reply_id is not None, "Valid id expected"
-            empty = socket.recv()
-            assert empty == b'', "Empty delimiter expected"
-            msg_type = int(socket.recv())
-            routing_key = socket.recv()
-            payload = socket.recv_multipart()
-            payload.insert(0, reply_id)
-            payload.insert(0, routing_key)
-            payload.insert(0, msg_type)
-            return payload
-        except (AssertionError, ValueError):
-            LOG.error(_LE("Received message with wrong format"))
-            if socket.getsockopt(zmq.RCVMORE):
-                # NOTE(ozamiatin): Drop the left parts of broken message
-                socket.recv_multipart()
-        except zmq.ZMQError as e:
-            LOG.exception(e)
-        return None
-
-    @staticmethod
-    def _redirect_message(socket, multipart_message):
-        message_type = multipart_message.pop(0)
-        routing_key = multipart_message.pop(0)
-        reply_id = multipart_message.pop(0)
-        message_id = multipart_message[0]
-        socket.send(routing_key, zmq.SNDMORE)
-        socket.send(b'', zmq.SNDMORE)
-        socket.send(reply_id, zmq.SNDMORE)
-        socket.send(six.b(str(message_type)), zmq.SNDMORE)
-        LOG.debug("Dispatching %(msg_type)s message %(msg_id)s - from %(rid)s "
-                  "to -> %(rkey)s" %
-                  {"msg_type": zmq_names.message_type_str(message_type),
-                   "msg_id": message_id,
-                   "rkey": routing_key,
-                   "rid": reply_id})
-        socket.send_multipart(multipart_message)
+    @check_message_format
+    def _receive_message(socket):
+        message = socket.recv_multipart()
+        assert len(message) > zmq_names.MESSAGE_ID_IDX, "Not enough parts"
+        assert message[zmq_names.REPLY_ID_IDX] != b'', "Valid id expected"
+        message_type = int(message[zmq_names.MESSAGE_TYPE_IDX])
+        assert message_type in zmq_names.MESSAGE_TYPES, "Known type expected!"
+        assert message[zmq_names.EMPTY_IDX] == b'', "Empty delimiter expected"
+        return message
 
     def cleanup(self):
+        self._router_updater.cleanup()
         self.poller.close()
         self.fe_router_socket.close()
+        self.publisher.cleanup()
+
+
+class DoubleRouterProxy(SingleRouterProxy):
+
+    def __init__(self, conf, context, matchmaker):
+        LOG.info(_LI('Running double router proxy'))
+        port = conf.zmq_proxy_opts.backend_port
+        host = conf.zmq_proxy_opts.host
+        self.be_router_socket = zmq_socket.ZmqFixedPortSocket(
+            conf, context, zmq.ROUTER, host,
+            conf.zmq_proxy_opts.backend_port) if port != 0 else \
+            zmq_socket.ZmqRandomPortSocket(
+                conf, context, zmq.ROUTER, host)
+        super(DoubleRouterProxy, self).__init__(conf, context, matchmaker)
+        self.poller.register(self.be_router_socket, self._receive_message)
+
+    def _create_router_updater(self):
+        return RouterUpdater(
+            self.conf, self.matchmaker, self.publisher.host,
+            self.fe_router_socket.connect_address,
+            self.be_router_socket.connect_address)
+
+    def _get_socket_to_dispatch_on(self, socket):
+        return self.be_router_socket \
+            if socket is self.fe_router_socket \
+            else self.fe_router_socket
+
+    def cleanup(self):
+        super(DoubleRouterProxy, self).cleanup()
         self.be_router_socket.close()
-        self.pub_publisher.cleanup()
-        self._router_updater.cleanup()
 
 
 class RouterUpdater(zmq_updater.UpdaterBase):
diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_publisher_proxy.py
similarity index 71%
rename from oslo_messaging/_drivers/zmq_driver/proxy/zmq_publisher_proxy.py
rename to oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_publisher_proxy.py
index 727b41903..09f578552 100644
--- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_publisher_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_publisher_proxy.py
@@ -1,4 +1,4 @@
-#    Copyright 2015 Mirantis, Inc.
+#    Copyright 2015-2016 Mirantis, Inc.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 #    not use this file except in compliance with the License. You may obtain
@@ -14,8 +14,8 @@
 
 import logging
 
+from oslo_messaging._drivers.zmq_driver.proxy import zmq_sender
 from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
 from oslo_messaging._drivers.zmq_driver import zmq_socket
 
 LOG = logging.getLogger(__name__)
@@ -44,7 +44,6 @@ class PublisherProxy(object):
         self.matchmaker = matchmaker
 
         port = conf.zmq_proxy_opts.publisher_port
-
         self.socket = zmq_socket.ZmqFixedPortSocket(
             self.conf, self.zmq_context, zmq.PUB, conf.zmq_proxy_opts.host,
             port) if port != 0 else \
@@ -52,23 +51,10 @@ class PublisherProxy(object):
                 self.conf, self.zmq_context, zmq.PUB, conf.zmq_proxy_opts.host)
 
         self.host = self.socket.connect_address
+        self.sender = zmq_sender.CentralPublisherSender()
 
     def send_request(self, multipart_message):
-        message_type = multipart_message.pop(0)
-        assert message_type in (zmq_names.CAST_FANOUT_TYPE,
-                                zmq_names.NOTIFY_TYPE), "Fanout expected!"
-        topic_filter = multipart_message.pop(0)
-        reply_id = multipart_message.pop(0)
-        message_id = multipart_message.pop(0)
-        assert reply_id is not None, "Reply id expected!"
-
-        self.socket.send(topic_filter, zmq.SNDMORE)
-        self.socket.send(message_id, zmq.SNDMORE)
-        self.socket.send_multipart(multipart_message)
-
-        LOG.debug("Publishing message %(message_id)s on [%(topic)s]",
-                  {"topic": topic_filter,
-                   "message_id": message_id})
+        self.sender.send_message(self.socket, multipart_message)
 
     def cleanup(self):
         self.socket.close()
diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py
index 15c777489..886da5464 100644
--- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py
@@ -1,4 +1,4 @@
-#    Copyright 2015 Mirantis, Inc.
+#    Copyright 2015-2016 Mirantis, Inc.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 #    not use this file except in compliance with the License. You may obtain
@@ -12,12 +12,14 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import argparse
 import logging
 import socket
 
+from oslo_config import cfg
 from stevedore import driver
 
-from oslo_config import cfg
+from oslo_messaging._drivers.zmq_driver.proxy.central import zmq_central_proxy
 from oslo_messaging._drivers.zmq_driver import zmq_async
 from oslo_messaging._i18n import _LI
 
@@ -25,6 +27,12 @@ zmq = zmq_async.import_zmq()
 LOG = logging.getLogger(__name__)
 
 
+USAGE = """ Usage: ./zmq-proxy.py [-h] [] ...
+
+Usage example:
+ python oslo_messaging/_cmd/zmq-proxy.py"""
+
+
 zmq_proxy_opts = [
     cfg.StrOpt('host', default=socket.gethostname(),
                help='Hostname (FQDN) of current proxy'
@@ -41,6 +49,56 @@ zmq_proxy_opts = [
 ]
 
 
+def parse_command_line_args(conf):
+    parser = argparse.ArgumentParser(
+        description='ZeroMQ proxy service',
+        usage=USAGE
+    )
+
+    parser.add_argument('-c', '--config-file', dest='config_file', type=str,
+                        help='Path to configuration file')
+    parser.add_argument('-l', '--log-file', dest='log_file', type=str,
+                        help='Path to log file')
+
+    parser.add_argument('-H', '--host', dest='host', type=str,
+                        help='Host FQDN for current proxy')
+    parser.add_argument('-f', '--frontend-port', dest='frontend_port',
+                        type=int,
+                        help='Front-end ROUTER port number')
+    parser.add_argument('-b', '--backend-port', dest='backend_port', type=int,
+                        help='Back-end ROUTER port number')
+    parser.add_argument('-p', '--publisher-port', dest='publisher_port',
+                        type=int,
+                        help='Front-end PUBLISHER port number')
+
+    parser.add_argument('-d', '--debug', dest='debug', type=bool,
+                        default=False,
+                        help='Turn on DEBUG logging level instead of INFO')
+
+    args = parser.parse_args()
+
+    if args.config_file:
+        conf(['--config-file', args.config_file])
+
+    log_kwargs = {'level': logging.DEBUG if args.debug else logging.INFO,
+                  'format': '%(asctime)s %(name)s %(levelname)-8s %(message)s'}
+    if args.log_file:
+        log_kwargs.update({'filename': args.log_file})
+    logging.basicConfig(**log_kwargs)
+
+    if args.host:
+        conf.zmq_proxy_opts.host = args.host
+    if args.frontend_port:
+        conf.set_override('frontend_port', args.frontend_port,
+                          group='zmq_proxy_opts')
+    if args.backend_port:
+        conf.set_override('backend_port', args.backend_port,
+                          group='zmq_proxy_opts')
+    if args.publisher_port:
+        conf.set_override('publisher_port', args.publisher_port,
+                          group='zmq_proxy_opts')
+
+
 class ZmqProxy(object):
     """Wrapper class for Publishers and Routers proxies.
        The main reason to have a proxy is high complexity of TCP sockets number
@@ -80,7 +138,7 @@ class ZmqProxy(object):
 
     """
 
-    def __init__(self, conf, proxy_cls):
+    def __init__(self, conf):
         super(ZmqProxy, self).__init__()
         self.conf = conf
         self.matchmaker = driver.DriverManager(
@@ -88,7 +146,16 @@ class ZmqProxy(object):
             self.conf.oslo_messaging_zmq.rpc_zmq_matchmaker,
         ).driver(self.conf)
         self.context = zmq.Context()
-        self.proxy = proxy_cls(conf, self.context, self.matchmaker)
+        self.proxy = self._choose_proxy_implementation()
+
+    def _choose_proxy_implementation(self):
+        if self.conf.zmq_proxy_opts.frontend_port != 0 and \
+                self.conf.zmq_proxy_opts.backend_port == 0:
+            return zmq_central_proxy.SingleRouterProxy(self.conf, self.context,
+                                                       self.matchmaker)
+        else:
+            return zmq_central_proxy.DoubleRouterProxy(self.conf, self.context,
+                                                       self.matchmaker)
 
     def run(self):
         self.proxy.run()
diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_sender.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_sender.py
new file mode 100644
index 000000000..3499292ff
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_sender.py
@@ -0,0 +1,69 @@
+#    Copyright 2016 Mirantis, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import abc
+import logging
+
+import six
+
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_names
+
+zmq = zmq_async.import_zmq()
+LOG = logging.getLogger(__name__)
+
+
+@six.add_metaclass(abc.ABCMeta)
+class Sender(object):
+
+    @abc.abstractmethod
+    def send_message(self, socket, multipart_message):
+        """Send message to a socket from multipart list"""
+
+
+class CentralRouterSender(Sender):
+
+    def send_message(self, socket, multipart_message):
+        message_type = int(multipart_message[zmq_names.MESSAGE_TYPE_IDX])
+        routing_key = multipart_message[zmq_names.ROUTING_KEY_IDX]
+        reply_id = multipart_message[zmq_names.REPLY_ID_IDX]
+        message_id = multipart_message[zmq_names.MESSAGE_ID_IDX]
+        socket.send(routing_key, zmq.SNDMORE)
+        socket.send(b'', zmq.SNDMORE)
+        socket.send(reply_id, zmq.SNDMORE)
+        socket.send(multipart_message[zmq_names.MESSAGE_TYPE_IDX], zmq.SNDMORE)
+        LOG.debug("Dispatching %(msg_type)s message %(msg_id)s - from %(rid)s "
+                  "to -> %(rkey)s" %
+                  {"msg_type": zmq_names.message_type_str(message_type),
+                   "msg_id": message_id,
+                   "rkey": routing_key,
+                   "rid": reply_id})
+        socket.send_multipart(multipart_message[zmq_names.MESSAGE_ID_IDX:])
+
+
+class CentralPublisherSender(Sender):
+
+    def send_message(self, socket, multipart_message):
+        message_type = int(multipart_message[zmq_names.MESSAGE_TYPE_IDX])
+        assert message_type in (zmq_names.CAST_FANOUT_TYPE,
+                                zmq_names.NOTIFY_TYPE), "Fanout expected!"
+        topic_filter = multipart_message[zmq_names.ROUTING_KEY_IDX]
+        message_id = multipart_message[zmq_names.MESSAGE_ID_IDX]
+
+        socket.send(topic_filter, zmq.SNDMORE)
+        socket.send_multipart(multipart_message[zmq_names.MESSAGE_ID_IDX:])
+
+        LOG.debug("Publishing message %(message_id)s on [%(topic)s]",
+                  {"topic": topic_filter,
+                   "message_id": message_id})
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_address.py b/oslo_messaging/_drivers/zmq_driver/zmq_address.py
index a1ca86d2f..800a26a3a 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_address.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_address.py
@@ -27,10 +27,6 @@ def get_tcp_random_address(conf):
     return "tcp://%s" % conf.oslo_messaging_zmq.rpc_zmq_bind_address
 
 
-def get_broker_address(conf):
-    return "ipc://%s/zmq-broker" % conf.oslo_messaging_zmq.rpc_zmq_ipc_dir
-
-
 def prefix_str(key, listener_type):
     return listener_type + "/" + key
 
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_names.py b/oslo_messaging/_drivers/zmq_driver/zmq_names.py
index 6ec99cb83..83361a2d6 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_names.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_names.py
@@ -23,13 +23,14 @@ FIELD_REPLY_BODY = 'reply_body'
 FIELD_FAILURE = 'failure'
 
 
-IDX_REPLY_TYPE = 1
-IDX_REPLY_BODY = 2
-
-MULTIPART_IDX_ENVELOPE = 0
-MULTIPART_IDX_BODY = 1
+REPLY_ID_IDX = 0
+EMPTY_IDX = 1
+MESSAGE_TYPE_IDX = 2
+ROUTING_KEY_IDX = 3
+MESSAGE_ID_IDX = 4
 
 
+DEFAULT_TYPE = 0
 CALL_TYPE = 1
 CAST_TYPE = 2
 CAST_FANOUT_TYPE = 3
@@ -37,13 +38,17 @@ NOTIFY_TYPE = 4
 REPLY_TYPE = 5
 ACK_TYPE = 6
 
-MESSAGE_TYPES = (CALL_TYPE,
+REQUEST_TYPES = (CALL_TYPE,
                  CAST_TYPE,
                  CAST_FANOUT_TYPE,
                  NOTIFY_TYPE)
 
+RESPONSE_TYPES = (REPLY_TYPE, ACK_TYPE)
+
+MESSAGE_TYPES = REQUEST_TYPES + RESPONSE_TYPES
+
 MULTISEND_TYPES = (CAST_FANOUT_TYPE, NOTIFY_TYPE)
-DIRECT_TYPES = (CALL_TYPE, CAST_TYPE, REPLY_TYPE)
+DIRECT_TYPES = (CALL_TYPE, CAST_TYPE, REPLY_TYPE, ACK_TYPE)
 CAST_TYPES = (CAST_TYPE, CAST_FANOUT_TYPE)
 NOTIFY_TYPES = (NOTIFY_TYPE,)
 NON_BLOCKING_TYPES = CAST_TYPES + NOTIFY_TYPES
diff --git a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py
index 81a708cfd..da21c604b 100644
--- a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py
+++ b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py
@@ -22,8 +22,9 @@ import testscenarios
 from oslo_config import cfg
 
 import oslo_messaging
+from oslo_messaging._drivers.zmq_driver.proxy.central \
+    import zmq_publisher_proxy
 from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
-from oslo_messaging._drivers.zmq_driver.proxy import zmq_publisher_proxy
 from oslo_messaging._drivers.zmq_driver import zmq_address
 from oslo_messaging._drivers.zmq_driver import zmq_async
 from oslo_messaging._drivers.zmq_driver import zmq_names
@@ -82,9 +83,8 @@ class TestPubSub(zmq_common.ZmqBaseTestCase):
         message = {'method': 'hello-world'}
 
         self.publisher.send_request(
-            [zmq_names.CAST_FANOUT_TYPE,
+            [b'', b'', zmq_names.CAST_FANOUT_TYPE,
              zmq_address.target_to_subscribe_filter(target),
-             b"message",
              b"0000-0000",
              self.dumps([context, message])])
 
diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py b/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py
index dea640c19..d1b45acca 100644
--- a/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py
+++ b/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py
@@ -20,7 +20,6 @@ import oslo_messaging
 from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
 from oslo_messaging._drivers.zmq_driver.client import zmq_senders
 from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
-from oslo_messaging._drivers.zmq_driver.proxy import zmq_queue_proxy
 from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
 from oslo_messaging._drivers.zmq_driver import zmq_async
 from oslo_messaging._drivers.zmq_driver import zmq_options
@@ -70,8 +69,7 @@ class TestZmqAckManager(test_utils.BaseTestCase):
         self.driver = transport._driver
 
         # prepare and launch proxy
-        self.proxy = zmq_proxy.ZmqProxy(self.conf,
-                                        zmq_queue_proxy.UniversalQueueProxy)
+        self.proxy = zmq_proxy.ZmqProxy(self.conf)
         vars(self.driver.matchmaker).update(vars(self.proxy.matchmaker))
         self.executor = zmq_async.get_executor(self.proxy.run)
         self.executor.execute()