diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 02c3dbec1..1726fec2d 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -49,7 +49,6 @@ from oslo_messaging import exceptions
 # NOTE(sileht): don't exists in py2 socket module
 TCP_USER_TIMEOUT = 18
 
-
 rabbit_opts = [
     cfg.BoolOpt('ssl',
                 default=False,
@@ -1150,15 +1149,17 @@ class Connection(object):
                     'transport_options': str(transport_options)}
         LOG.trace('Connection._publish: sending message %(msg)s to'
                   ' %(who)s with routing key %(key)s', log_info)
-
         # NOTE(sileht): no need to wait more, caller expects
         # a answer before timeout is reached
         with self._transport_socket_timeout(timeout):
-            self._producer.publish(msg,
-                                   exchange=exchange,
-                                   routing_key=routing_key,
-                                   expiration=timeout,
-                                   compression=self.kombu_compression)
+            self._producer.publish(
+                msg,
+                mandatory=transport_options.at_least_once if
+                transport_options else False,
+                exchange=exchange,
+                routing_key=routing_key,
+                expiration=timeout,
+                compression=self.kombu_compression)
 
     def _publish_and_creates_default_queue(self, exchange, msg,
                                            routing_key=None, timeout=None,
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index a18da1535..5f302db30 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -199,6 +199,7 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
             'msg', expiration=1,
             exchange=exchange_mock,
             compression=self.conf.oslo_messaging_rabbit.kombu_compression,
+            mandatory=False,
             routing_key='routing_key')
 
     @mock.patch('kombu.messaging.Producer.publish')
@@ -212,6 +213,7 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
             conn._publish(exchange_mock, 'msg', routing_key='routing_key')
         fake_publish.assert_called_with(
             'msg', expiration=None,
+            mandatory=False,
             compression=self.conf.oslo_messaging_rabbit.kombu_compression,
             exchange=exchange_mock,
             routing_key='routing_key')
diff --git a/oslo_messaging/tests/rpc/test_client.py b/oslo_messaging/tests/rpc/test_client.py
index ec22d705c..6c54625fe 100755
--- a/oslo_messaging/tests/rpc/test_client.py
+++ b/oslo_messaging/tests/rpc/test_client.py
@@ -42,14 +42,15 @@ class TestCastCall(test_utils.BaseTestCase):
 
     def test_cast_call(self):
         self.config(rpc_response_timeout=None)
-
+        transport_options = oslo_messaging.TransportOptions()
         transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
-        client = oslo_messaging.RPCClient(transport, oslo_messaging.Target())
+        client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
+                                          transport_options=transport_options)
 
         transport._send = mock.Mock()
 
         msg = dict(method='foo', args=self.args)
-        kwargs = {'retry': None, 'transport_options': None}
+        kwargs = {'retry': None, 'transport_options': transport_options}
         if self.call:
             kwargs['wait_for_reply'] = True
             kwargs['timeout'] = None
@@ -57,7 +58,7 @@ class TestCastCall(test_utils.BaseTestCase):
 
         method = client.call if self.call else client.cast
         method(self.ctxt, 'foo', **self.args)
-
+        self.assertFalse(transport_options.at_least_once)
         transport._send.assert_called_once_with(oslo_messaging.Target(),
                                                 self.ctxt,
                                                 msg,
@@ -67,13 +68,18 @@ class TestCastCall(test_utils.BaseTestCase):
         self.config(rpc_response_timeout=None)
 
         transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
-        client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
-                                          transport_options={'my_k': 'my_val'})
+
+        transport_options = oslo_messaging.TransportOptions(at_least_once=True)
+        client = oslo_messaging.RPCClient(
+            transport,
+            oslo_messaging.Target(),
+            transport_options=transport_options)
 
         transport._send = mock.Mock()
 
         msg = dict(method='foo', args=self.args)
-        kwargs = {'retry': None, 'transport_options': {'my_k': 'my_val'}}
+        kwargs = {'retry': None,
+                  'transport_options': transport_options}
         if self.call:
             kwargs['wait_for_reply'] = True
             kwargs['timeout'] = None
@@ -82,6 +88,7 @@ class TestCastCall(test_utils.BaseTestCase):
         method = client.call if self.call else client.cast
         method(self.ctxt, 'foo', **self.args)
 
+        self.assertTrue(transport_options.at_least_once)
         transport._send.assert_called_once_with(oslo_messaging.Target(),
                                                 self.ctxt,
                                                 msg,
diff --git a/oslo_messaging/transport.py b/oslo_messaging/transport.py
index c263e3541..48979a64b 100644
--- a/oslo_messaging/transport.py
+++ b/oslo_messaging/transport.py
@@ -33,6 +33,7 @@ __all__ = [
     'Transport',
     'TransportHost',
     'TransportURL',
+    'TransportOptions',
     'get_transport',
     'set_transport_defaults',
 ]
@@ -277,6 +278,16 @@ class TransportHost(object):
         return '<TransportHost ' + values + '>'
 
 
+class TransportOptions(object):
+
+    def __init__(self, at_least_once=False):
+        self._at_least_once = at_least_once
+
+    @property
+    def at_least_once(self):
+        return self._at_least_once
+
+
 class TransportURL(object):
 
     """A parsed transport URL.