diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py
index 44d5b6ae3..42575fcd9 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py
@@ -14,6 +14,8 @@
 
 import logging
 
+import tenacity
+
 from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
     import zmq_dealer_publisher_base
 from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
@@ -55,7 +57,7 @@ class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase):
     """
 
     def __init__(self, conf, matchmaker):
-        sender = zmq_senders.RequestSenderDirect(conf)
+        sender = zmq_senders.RequestSenderDirect(conf, async=True)
         receiver = zmq_receivers.ReceiverDirect(conf)
         super(DealerPublisherDirect, self).__init__(conf, matchmaker,
                                                     sender, receiver)
@@ -90,11 +92,16 @@ class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase):
         self.receiver.unregister_socket(socket)
 
     def send_request(self, socket, request):
-        if request.msg_type in zmq_names.MULTISEND_TYPES:
-            for _ in range(socket.connections_count()):
+        @tenacity.retry(retry=tenacity.retry_if_exception_type(zmq.Again),
+                        stop=tenacity.stop_after_delay(
+                            self.conf.rpc_response_timeout))
+        def send_retrying():
+            if request.msg_type in zmq_names.MULTISEND_TYPES:
+                for _ in range(socket.connections_count()):
+                    self.sender.send(socket, request)
+            else:
                 self.sender.send(socket, request)
-        else:
-            self.sender.send(socket, request)
+        return send_retrying()
 
     def cleanup(self):
         self.routing_table.cleanup()
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py
index f63e1d716..9f3f6d72b 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py
@@ -31,8 +31,9 @@ zmq = zmq_async.import_zmq()
 class SenderBase(object):
     """Base request/response sending interface."""
 
-    def __init__(self, conf):
+    def __init__(self, conf, async=False):
         self.conf = conf
+        self.async = async
         self._lock = threading.Lock()
         self._send_versions = zmq_version.get_method_versions(self, 'send')
 
@@ -155,11 +156,12 @@ class RequestSenderDirect(RequestSenderBase):
                    "msg_version": request.message_version})
 
     def _send_v_1_0(self, socket, request):
-        socket.send(b'', zmq.SNDMORE)
-        socket.send_string('1.0', zmq.SNDMORE)
-        socket.send(six.b(str(request.msg_type)), zmq.SNDMORE)
-        socket.send_string(request.message_id, zmq.SNDMORE)
-        socket.send_dumped([request.context, request.message])
+        flags = zmq.NOBLOCK if self.async else 0
+        socket.send(b'', zmq.SNDMORE | flags)
+        socket.send_string('1.0', zmq.SNDMORE | flags)
+        socket.send(six.b(str(request.msg_type)), zmq.SNDMORE | flags)
+        socket.send_string(request.message_id, zmq.SNDMORE | flags)
+        socket.send_dumped([request.context, request.message], flags)
 
 
 class AckSenderDirect(AckSenderBase):
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
index e352d8fa0..c9c7cead8 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
@@ -56,6 +56,11 @@ class ZmqSocket(object):
         # Put messages to only connected queues
         self.handle.setsockopt(zmq.IMMEDIATE, 1 if immediate else 0)
 
+        # Setup timeout on socket sending
+        if hasattr(self.conf, 'rpc_response_timeout'):
+            self.handle.setsockopt(zmq.SNDTIMEO,
+                                   self.conf.rpc_response_timeout * 1000)
+
         # Configure TCP keep alive
         keepalive = self.conf.oslo_messaging_zmq.zmq_tcp_keepalive
         if keepalive < 0: