diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py
index c5b2378e7..f9f1b06e8 100644
--- a/oslo/messaging/_drivers/amqpdriver.py
+++ b/oslo/messaging/_drivers/amqpdriver.py
@@ -64,6 +64,10 @@ class AMQPIncomingMessage(base.IncomingMessage):
             conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg))
 
     def reply(self, reply=None, failure=None, log_failure=True):
+        if not self.msg_id:
+            # NOTE(Alexei_987) not sending reply, if msg_id is empty
+            #    because reply should not be expected by caller side
+            return
         with self.listener.driver._get_connection() as conn:
             self._send_reply(conn, reply, failure, log_failure=log_failure)
             self._send_reply(conn, ending=True)
diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py
index 05b2ad937..198252cb2 100644
--- a/tests/drivers/test_impl_rabbit.py
+++ b/tests/drivers/test_impl_rabbit.py
@@ -317,19 +317,25 @@ class TestRacyWaitForReply(test_utils.BaseTestCase):
 
         self.stubs.Set(amqpdriver.ReplyWaiter, 'wait', reply_waiter)
 
-        def send_and_wait_for_reply(i):
+        def send_and_wait_for_reply(i, wait_for_reply):
             replies.append(driver.send(target,
                                        {},
                                        {'tx_id': i},
-                                       wait_for_reply=True,
+                                       wait_for_reply=wait_for_reply,
                                        timeout=None))
 
         while len(senders) < 2:
             t = threading.Thread(target=send_and_wait_for_reply,
-                                 args=(len(senders), ))
+                                 args=(len(senders), True))
             t.daemon = True
             senders.append(t)
 
+        # test the case then msg_id is not set
+        t = threading.Thread(target=send_and_wait_for_reply,
+                             args=(len(senders), False))
+        t.daemon = True
+        senders.append(t)
+
         # Start the first guy, receive his message, but delay his polling
         notify_condition = threading.Condition()
         wait_conditions.append(notify_condition)
@@ -354,6 +360,20 @@ class TestRacyWaitForReply(test_utils.BaseTestCase):
         # Wait for the second thread to finish
         senders[1].join()
 
+        # Start the 3rd guy, receive his message
+        senders[2].start()
+
+        msgs.append(listener.poll())
+        self.assertEqual({'tx_id': 2}, msgs[-1].message)
+
+        # Verify the _send_reply was not invoked by driver:
+        with mock.patch.object(msgs[2], '_send_reply') as method:
+            msgs[2].reply({'rx_id': 2})
+            self.assertEqual(method.call_count, 0)
+
+        # Wait for the 3rd thread to finish
+        senders[2].join()
+
         # Let the first thread continue
         with notify_condition:
             notify_condition.notify()
@@ -364,7 +384,8 @@ class TestRacyWaitForReply(test_utils.BaseTestCase):
         # Verify replies were received out of order
         self.assertEqual(len(senders), len(replies))
         self.assertEqual({'rx_id': 1}, replies[0])
-        self.assertEqual({'rx_id': 0}, replies[1])
+        self.assertIsNone(replies[1])
+        self.assertEqual({'rx_id': 0}, replies[2])
 
 
 def _declare_queue(target):