diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py
similarity index 58%
rename from oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py
rename to oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py
index 89031ecc4..4a5eba4e7 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py
@@ -1,4 +1,4 @@
-#    Copyright 2015 Mirantis, Inc.
+#    Copyright 2016 Mirantis, Inc.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 #    not use this file except in compliance with the License. You may obtain
@@ -12,6 +12,7 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import abc
 from concurrent import futures
 import logging
 
@@ -21,6 +22,7 @@ import oslo_messaging
 from oslo_messaging._drivers import common as rpc_common
 from oslo_messaging._drivers.zmq_driver.client.publishers \
     import zmq_publisher_base
+from oslo_messaging._drivers.zmq_driver.client import zmq_sockets_manager
 from oslo_messaging._drivers.zmq_driver import zmq_async
 from oslo_messaging._drivers.zmq_driver import zmq_names
 from oslo_messaging._i18n import _LE
@@ -30,34 +32,23 @@ LOG = logging.getLogger(__name__)
 zmq = zmq_async.import_zmq()
 
 
-class DealerPublisher(zmq_publisher_base.PublisherBase):
-    """Non-CALL publisher using direct connections."""
+class DealerPublisherBase(zmq_publisher_base.PublisherBase):
+    """Abstract DEALER-publisher."""
 
-    def send_request(self, request):
-        if request.msg_type == zmq_names.CALL_TYPE:
+    def __init__(self, conf, matchmaker, sender, receiver):
+        sockets_manager = zmq_sockets_manager.SocketsManager(
+            conf, matchmaker, zmq.ROUTER, zmq.DEALER
+        )
+        super(DealerPublisherBase, self).__init__(sockets_manager, sender,
+                                                  receiver)
+
+    @staticmethod
+    def _check_pattern(request, supported_pattern):
+        if request.msg_type != supported_pattern:
             raise zmq_publisher_base.UnsupportedSendPattern(
                 zmq_names.message_type_str(request.msg_type)
             )
 
-        try:
-            socket = self.sockets_manager.get_socket(request.target)
-        except retrying.RetryError:
-            return
-
-        if request.msg_type in zmq_names.MULTISEND_TYPES:
-            for _ in range(socket.connections_count()):
-                self.sender.send(socket, request)
-        else:
-            self.sender.send(socket, request)
-
-
-class DealerCallPublisher(zmq_publisher_base.PublisherBase):
-    """CALL publisher using direct connections."""
-
-    def __init__(self, sockets_manager, sender, reply_receiver):
-        super(DealerCallPublisher, self).__init__(sockets_manager, sender)
-        self.reply_receiver = reply_receiver
-
     @staticmethod
     def _raise_timeout(request):
         raise oslo_messaging.MessagingTimeout(
@@ -65,26 +56,12 @@ class DealerCallPublisher(zmq_publisher_base.PublisherBase):
             {"tout": request.timeout, "msg_id": request.message_id}
         )
 
-    def send_request(self, request):
-        if request.msg_type != zmq_names.CALL_TYPE:
-            raise zmq_publisher_base.UnsupportedSendPattern(
-                zmq_names.message_type_str(request.msg_type)
-            )
-
-        try:
-            socket = self._connect_socket(request.target)
-        except retrying.RetryError:
-            self._raise_timeout(request)
-
-        self.sender.send(socket, request)
-        self.reply_receiver.register_socket(socket)
-        return self._recv_reply(request)
-
-    def _connect_socket(self, target):
-        return self.sockets_manager.get_socket(target)
+    @abc.abstractmethod
+    def _connect_socket(self, request):
+        pass
 
     def _recv_reply(self, request):
-        reply_future, = self.reply_receiver.track_request(request)
+        reply_future, = self.receiver.track_request(request)
 
         try:
             _, reply = reply_future.result(timeout=request.timeout)
@@ -95,7 +72,7 @@ class DealerCallPublisher(zmq_publisher_base.PublisherBase):
         except futures.TimeoutError:
             self._raise_timeout(request)
         finally:
-            self.reply_receiver.untrack_request(request)
+            self.receiver.untrack_request(request)
 
         if reply.failure:
             raise rpc_common.deserialize_remote_exception(
@@ -104,6 +81,30 @@ class DealerCallPublisher(zmq_publisher_base.PublisherBase):
         else:
             return reply.reply_body
 
-    def cleanup(self):
-        self.reply_receiver.stop()
-        super(DealerCallPublisher, self).cleanup()
+    def send_call(self, request):
+        self._check_pattern(request, zmq_names.CALL_TYPE)
+
+        try:
+            socket = self._connect_socket(request)
+        except retrying.RetryError:
+            self._raise_timeout(request)
+
+        self.sender.send(socket, request)
+        self.receiver.register_socket(socket)
+        return self._recv_reply(request)
+
+    @abc.abstractmethod
+    def _send_non_blocking(self, request):
+        pass
+
+    def send_cast(self, request):
+        self._check_pattern(request, zmq_names.CAST_TYPE)
+        self._send_non_blocking(request)
+
+    def send_fanout(self, request):
+        self._check_pattern(request, zmq_names.CAST_FANOUT_TYPE)
+        self._send_non_blocking(request)
+
+    def send_notify(self, request):
+        self._check_pattern(request, zmq_names.NOTIFY_TYPE)
+        self._send_non_blocking(request)
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py
new file mode 100644
index 000000000..56d8b4923
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py
@@ -0,0 +1,53 @@
+#    Copyright 2015 Mirantis, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import logging
+
+import retrying
+
+from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
+    import zmq_dealer_publisher_base
+from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
+from oslo_messaging._drivers.zmq_driver.client import zmq_senders
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_names
+
+LOG = logging.getLogger(__name__)
+
+zmq = zmq_async.import_zmq()
+
+
+class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase):
+    """DEALER-publisher using direct connections."""
+
+    def __init__(self, conf, matchmaker):
+        sender = zmq_senders.RequestSenderDirect(conf)
+        receiver = zmq_receivers.ReplyReceiverDirect(conf)
+        super(DealerPublisherDirect, self).__init__(conf, matchmaker, sender,
+                                                    receiver)
+
+    def _connect_socket(self, request):
+        return self.sockets_manager.get_socket(request.target)
+
+    def _send_non_blocking(self, request):
+        try:
+            socket = self._connect_socket(request)
+        except retrying.RetryError:
+            return
+
+        if request.msg_type in zmq_names.MULTISEND_TYPES:
+            for _ in range(socket.connections_count()):
+                self.sender.send(socket, request)
+        else:
+            self.sender.send(socket, request)
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
index 9f53bed15..29dd3fcd3 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
@@ -17,10 +17,10 @@ import logging
 import retrying
 
 from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
-    import zmq_dealer_publisher
-from oslo_messaging._drivers.zmq_driver.client.publishers \
-    import zmq_publisher_base
+    import zmq_dealer_publisher_base
+from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
 from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table
+from oslo_messaging._drivers.zmq_driver.client import zmq_senders
 from oslo_messaging._drivers.zmq_driver import zmq_address
 from oslo_messaging._drivers.zmq_driver import zmq_async
 from oslo_messaging._drivers.zmq_driver import zmq_names
@@ -31,17 +31,31 @@ LOG = logging.getLogger(__name__)
 zmq = zmq_async.import_zmq()
 
 
-class DealerPublisherProxy(zmq_publisher_base.PublisherBase):
-    """Non-CALL publisher via proxy."""
+class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase):
+    """DEALER-publisher via proxy."""
 
-    def __init__(self, sockets_manager, sender):
-        super(DealerPublisherProxy, self).__init__(sockets_manager, sender)
-        self.socket = sockets_manager.get_socket_to_publishers()
+    def __init__(self, conf, matchmaker):
+        sender = zmq_senders.RequestSenderProxy(conf)
+        receiver = zmq_receivers.ReplyReceiverProxy(conf)
+        super(DealerPublisherProxy, self).__init__(conf, matchmaker, sender,
+                                                   receiver)
+        self.socket = self.sockets_manager.get_socket_to_publishers()
         self.routing_table = zmq_routing_table.RoutingTable(self.conf,
                                                             self.matchmaker)
         self.connection_updater = \
             PublisherConnectionUpdater(self.conf, self.matchmaker, self.socket)
 
+    def _connect_socket(self, request):
+        return self.socket
+
+    def send_call(self, request):
+        try:
+            request.routing_key = \
+                self.routing_table.get_routable_host(request.target)
+        except retrying.RetryError:
+            self._raise_timeout(request)
+        return super(DealerPublisherProxy, self).send_call(request)
+
     def _get_routing_keys(self, request):
         try:
             if request.msg_type in zmq_names.DIRECT_TYPES:
@@ -54,48 +68,14 @@ class DealerPublisherProxy(zmq_publisher_base.PublisherBase):
         except retrying.RetryError:
             return []
 
-    def send_request(self, request):
-        if request.msg_type == zmq_names.CALL_TYPE:
-            raise zmq_publisher_base.UnsupportedSendPattern(
-                zmq_names.message_type_str(request.msg_type)
-            )
+    def _send_non_blocking(self, request):
         for routing_key in self._get_routing_keys(request):
             request.routing_key = routing_key
             self.sender.send(self.socket, request)
 
     def cleanup(self):
-        self.connection_updater.stop()
-        self.socket.close()
         super(DealerPublisherProxy, self).cleanup()
-
-
-class DealerCallPublisherProxy(zmq_dealer_publisher.DealerCallPublisher):
-    """CALL publisher via proxy."""
-
-    def __init__(self, sockets_manager, sender, reply_waiter):
-        super(DealerCallPublisherProxy, self).__init__(
-            sockets_manager, sender, reply_waiter
-        )
-        self.socket = self.sockets_manager.get_socket_to_publishers()
-        self.routing_table = zmq_routing_table.RoutingTable(self.conf,
-                                                            self.matchmaker)
-        self.connection_updater = \
-            PublisherConnectionUpdater(self.conf, self.matchmaker, self.socket)
-
-    def send_request(self, request):
-        try:
-            request.routing_key = \
-                self.routing_table.get_routable_host(request.target)
-        except retrying.RetryError:
-            self._raise_timeout(request)
-        return super(DealerCallPublisherProxy, self).send_request(request)
-
-    def _connect_socket(self, target):
-        return self.socket
-
-    def cleanup(self):
         self.connection_updater.stop()
-        super(DealerCallPublisherProxy, self).cleanup()
         self.socket.close()
 
 
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py
index bb5f29484..9da0c056e 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py
@@ -53,29 +53,42 @@ class PublisherBase(object):
     Publisher can send request objects from zmq_request.
     """
 
-    def __init__(self, sockets_manager, sender):
+    def __init__(self, sockets_manager, sender, receiver):
 
         """Construct publisher
 
-        Accept configuration object and Name Service interface object.
-        Create zmq.Context and connected sockets dictionary.
+        Accept sockets manager, sender and receiver objects.
 
-        :param conf: configuration object
-        :type conf: oslo_config.CONF
+        :param sockets_manager: sockets manager object
+        :type sockets_manager: zmq_sockets_manager.SocketsManager
+        :param senders: request sender object
+        :type senders: zmq_senders.RequestSender
+        :param receiver: reply receiver object
+        :type receiver: zmq_receivers.ReplyReceiver
         """
         self.sockets_manager = sockets_manager
         self.conf = sockets_manager.conf
         self.matchmaker = sockets_manager.matchmaker
         self.sender = sender
+        self.receiver = receiver
 
     @abc.abstractmethod
-    def send_request(self, request):
-        """Send request to consumer
+    def send_call(self, request):
+        pass
 
-        :param request: Message data and destination container object
-        :type request: zmq_request.Request
-        """
+    @abc.abstractmethod
+    def send_cast(self, request):
+        pass
+
+    @abc.abstractmethod
+    def send_fanout(self, request):
+        pass
+
+    @abc.abstractmethod
+    def send_notify(self, request):
+        pass
 
     def cleanup(self):
         """Cleanup publisher. Close allocated connections."""
+        self.receiver.stop()
         self.sockets_manager.cleanup()
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py
index a8cfe934a..e7362e2f6 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py
@@ -15,13 +15,10 @@
 
 from oslo_messaging._drivers import common
 from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
-    import zmq_dealer_publisher
+    import zmq_dealer_publisher_direct
 from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
     import zmq_dealer_publisher_proxy
 from oslo_messaging._drivers.zmq_driver.client import zmq_client_base
-from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
-from oslo_messaging._drivers.zmq_driver.client import zmq_senders
-from oslo_messaging._drivers.zmq_driver.client import zmq_sockets_manager
 from oslo_messaging._drivers.zmq_driver import zmq_async
 from oslo_messaging._drivers.zmq_driver import zmq_names
 
@@ -45,34 +42,18 @@ class ZmqClientMixDirectPubSub(zmq_client_base.ZmqClientBase):
         if conf.use_router_proxy or not conf.use_pub_sub:
             raise WrongClientException()
 
-        self.sockets_manager = zmq_sockets_manager.SocketsManager(
-            conf, matchmaker, zmq.ROUTER, zmq.DEALER
-        )
+        publisher_direct = \
+            zmq_dealer_publisher_direct.DealerPublisherDirect(conf, matchmaker)
 
-        sender_proxy = zmq_senders.RequestSenderProxy(conf)
-        sender_direct = zmq_senders.RequestSenderDirect(conf)
-
-        receiver_direct = zmq_receivers.ReplyReceiverDirect(conf)
-
-        fanout_publisher = zmq_dealer_publisher_proxy.DealerPublisherProxy(
-            self.sockets_manager, sender_proxy
-        )
+        publisher_proxy = \
+            zmq_dealer_publisher_proxy.DealerPublisherProxy(conf, matchmaker)
 
         super(ZmqClientMixDirectPubSub, self).__init__(
             conf, matchmaker, allowed_remote_exmods,
             publishers={
-                zmq_names.CALL_TYPE:
-                    zmq_dealer_publisher.DealerCallPublisher(
-                        self.sockets_manager, sender_direct, receiver_direct
-                    ),
-
-                zmq_names.CAST_FANOUT_TYPE: fanout_publisher,
-
-                zmq_names.NOTIFY_TYPE: fanout_publisher,
-
-                "default":
-                    zmq_dealer_publisher.DealerPublisher(self.sockets_manager,
-                                                         sender_direct)
+                zmq_names.CAST_FANOUT_TYPE: publisher_proxy,
+                zmq_names.NOTIFY_TYPE: publisher_proxy,
+                "default": publisher_direct
             }
         )
 
@@ -90,26 +71,12 @@ class ZmqClientDirect(zmq_client_base.ZmqClientBase):
         if conf.use_pub_sub or conf.use_router_proxy:
             raise WrongClientException()
 
-        self.sockets_manager = zmq_sockets_manager.SocketsManager(
-            conf, matchmaker, zmq.ROUTER, zmq.DEALER
-        )
-
-        sender = zmq_senders.RequestSenderDirect(conf)
-
-        receiver = zmq_receivers.ReplyReceiverDirect(conf)
+        publisher = \
+            zmq_dealer_publisher_direct.DealerPublisherDirect(conf, matchmaker)
 
         super(ZmqClientDirect, self).__init__(
             conf, matchmaker, allowed_remote_exmods,
-            publishers={
-                zmq_names.CALL_TYPE:
-                    zmq_dealer_publisher.DealerCallPublisher(
-                        self.sockets_manager, sender, receiver
-                    ),
-
-                "default":
-                    zmq_dealer_publisher.DealerPublisher(self.sockets_manager,
-                                                         sender)
-            }
+            publishers={"default": publisher}
         )
 
 
@@ -128,25 +95,10 @@ class ZmqClientProxy(zmq_client_base.ZmqClientBase):
         if not conf.use_router_proxy:
             raise WrongClientException()
 
-        self.sockets_manager = zmq_sockets_manager.SocketsManager(
-            conf, matchmaker, zmq.ROUTER, zmq.DEALER
-        )
-
-        sender = zmq_senders.RequestSenderProxy(conf)
-
-        receiver = zmq_receivers.ReplyReceiverProxy(conf)
+        publisher = \
+            zmq_dealer_publisher_proxy.DealerPublisherProxy(conf, matchmaker)
 
         super(ZmqClientProxy, self).__init__(
             conf, matchmaker, allowed_remote_exmods,
-            publishers={
-                zmq_names.CALL_TYPE:
-                    zmq_dealer_publisher_proxy.DealerCallPublisherProxy(
-                        self.sockets_manager, sender, receiver
-                    ),
-
-                "default":
-                    zmq_dealer_publisher_proxy.DealerPublisherProxy(
-                        self.sockets_manager, sender
-                    )
-            }
+            publishers={"default": publisher}
         )
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py
index 7630cc7f0..4643ff359 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py
@@ -24,45 +24,44 @@ class ZmqClientBase(object):
     def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None,
                  publishers=None):
         self.conf = conf
-        self.context = zmq.Context()
         self.matchmaker = matchmaker
         self.allowed_remote_exmods = allowed_remote_exmods or []
 
         self.publishers = publishers
-        self.call_publisher = publishers.get(zmq_names.CALL_TYPE) \
-            or publishers["default"]
-        self.cast_publisher = publishers.get(zmq_names.CAST_TYPE) \
-            or publishers["default"]
-        self.fanout_publisher = publishers.get(zmq_names.CAST_FANOUT_TYPE) \
-            or publishers["default"]
-        self.notify_publisher = publishers.get(zmq_names.NOTIFY_TYPE) \
-            or publishers["default"]
+        self.call_publisher = publishers.get(zmq_names.CALL_TYPE,
+                                             publishers["default"])
+        self.cast_publisher = publishers.get(zmq_names.CAST_TYPE,
+                                             publishers["default"])
+        self.fanout_publisher = publishers.get(zmq_names.CAST_FANOUT_TYPE,
+                                               publishers["default"])
+        self.notify_publisher = publishers.get(zmq_names.NOTIFY_TYPE,
+                                               publishers["default"])
 
     def send_call(self, target, context, message, timeout=None, retry=None):
         request = zmq_request.CallRequest(
             target, context=context, message=message, retry=retry,
             timeout=timeout, allowed_remote_exmods=self.allowed_remote_exmods
         )
-        return self.call_publisher.send_request(request)
+        return self.call_publisher.send_call(request)
 
     def send_cast(self, target, context, message, retry=None):
         request = zmq_request.CastRequest(
             target, context=context, message=message, retry=retry
         )
-        self.cast_publisher.send_request(request)
+        self.cast_publisher.send_cast(request)
 
     def send_fanout(self, target, context, message, retry=None):
         request = zmq_request.FanoutRequest(
             target, context=context, message=message, retry=retry
         )
-        self.fanout_publisher.send_request(request)
+        self.fanout_publisher.send_fanout(request)
 
     def send_notify(self, target, context, message, version, retry=None):
         request = zmq_request.NotificationRequest(
             target, context=context, message=message, retry=retry,
             version=version
         )
-        self.notify_publisher.send_request(request)
+        self.notify_publisher.send_notify(request)
 
     def cleanup(self):
         cleaned = set()
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py
index 2fb819135..3b83d9a50 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py
@@ -37,7 +37,15 @@ class SenderBase(object):
         pass
 
 
-class RequestSenderProxy(SenderBase):
+class RequestSender(SenderBase):
+    pass
+
+
+class ReplySender(SenderBase):
+    pass
+
+
+class RequestSenderProxy(RequestSender):
 
     def send(self, socket, request):
         socket.send(b'', zmq.SNDMORE)
@@ -55,7 +63,7 @@ class RequestSenderProxy(SenderBase):
                    "target": request.target})
 
 
-class ReplySenderProxy(SenderBase):
+class ReplySenderProxy(ReplySender):
 
     def send(self, socket, reply):
         LOG.debug("Replying to %s", reply.message_id)
@@ -69,7 +77,7 @@ class ReplySenderProxy(SenderBase):
         socket.send_dumped(reply.to_dict())
 
 
-class RequestSenderDirect(SenderBase):
+class RequestSenderDirect(RequestSender):
 
     def send(self, socket, request):
         socket.send(b'', zmq.SNDMORE)
@@ -85,7 +93,7 @@ class RequestSenderDirect(SenderBase):
                    "target": request.target})
 
 
-class ReplySenderDirect(SenderBase):
+class ReplySenderDirect(ReplySender):
 
     def send(self, socket, reply):
         LOG.debug("Replying to %s", reply.message_id)