diff --git a/lower-constraints.txt b/lower-constraints.txt
index 3df8f1a83..338c12a42 100644
--- a/lower-constraints.txt
+++ b/lower-constraints.txt
@@ -28,7 +28,7 @@ imagesize==0.7.1
 iso8601==0.1.11
 Jinja2==2.10
 keystoneauth1==3.4.0
-kombu==4.0.0
+kombu==4.6.1
 linecache2==1.0.0
 MarkupSafe==1.0
 mccabe==0.2.1
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 1726fec2d..124e7ef6d 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -756,6 +756,10 @@ class Connection(object):
             # NOTE(sileht): we must reraise this without
             # trigger error_callback
             raise
+        except exceptions.MessageUndeliverable:
+            # NOTE(gsantomaggio): we must reraise this without
+            # trigger error_callback
+            raise
         except Exception as exc:
             error_callback and error_callback(exc)
             self._set_current_channel(None)
@@ -769,6 +773,11 @@ class Connection(object):
             LOG.error(msg)
             raise exceptions.MessageDeliveryFailure(msg)
 
+    @staticmethod
+    def on_return(exception, exchange, routing_key, message):
+        raise exceptions.MessageUndeliverable(exception, exchange, routing_key,
+                                              message)
+
     def _set_current_channel(self, new_channel):
         """Change the channel to use.
 
@@ -787,7 +796,8 @@ class Connection(object):
         if new_channel is not None:
             if self.purpose == rpc_common.PURPOSE_LISTEN:
                 self._set_qos(new_channel)
-            self._producer = kombu.messaging.Producer(new_channel)
+            self._producer = kombu.messaging.Producer(new_channel,
+                                                      on_return=self.on_return)
             for consumer in self._consumers:
                 consumer.declare(self)
 
diff --git a/oslo_messaging/exceptions.py b/oslo_messaging/exceptions.py
index cfe6a7efb..f6ba20a76 100644
--- a/oslo_messaging/exceptions.py
+++ b/oslo_messaging/exceptions.py
@@ -16,7 +16,7 @@
 import six
 
 __all__ = ['MessagingException', 'MessagingTimeout', 'MessageDeliveryFailure',
-           'InvalidTarget']
+           'InvalidTarget', 'MessageUndeliverable']
 
 
 class MessagingException(Exception):
@@ -38,3 +38,14 @@ class InvalidTarget(MessagingException, ValueError):
         msg = msg + ":" + six.text_type(target)
         super(InvalidTarget, self).__init__(msg)
         self.target = target
+
+
+class MessageUndeliverable(Exception):
+    """Raised if message is not routed with mandatory flag"""
+
+    def __init__(self, exception, exchange, routing_key, message):
+        super(MessageUndeliverable, self).__init__()
+        self.exception = exception
+        self.exchange = exchange
+        self.routing_key = routing_key
+        self.message = message
diff --git a/oslo_messaging/tests/functional/test_functional.py b/oslo_messaging/tests/functional/test_functional.py
index 4fa8b48f1..6800f596f 100644
--- a/oslo_messaging/tests/functional/test_functional.py
+++ b/oslo_messaging/tests/functional/test_functional.py
@@ -152,6 +152,40 @@ class CallTestCase(utils.SkipIfNoTransportURL):
 
         self.assertEqual(10, server.endpoint.ival)
 
+    def test_mandatory_call(self):
+        if not self.url.startswith("rabbit://"):
+            self.skipTest("backend does not support call monitoring")
+
+        transport = self.useFixture(utils.RPCTransportFixture(self.conf,
+                                                              self.url))
+        target = oslo_messaging.Target(topic='topic_' + str(uuid.uuid4()),
+                                       server='server_' + str(uuid.uuid4()))
+
+        # test for mandatory flag using transport-options, see:
+        # https://blueprints.launchpad.net/oslo.messaging/+spec/transport-options
+        # first test with `at_least_once=False` raises a "MessagingTimeout"
+        # error since there is no control if the queue actually exists.
+        # (Default behavior)
+        options = oslo_messaging.TransportOptions(at_least_once=False)
+        client1 = utils.ClientStub(transport.transport, target,
+                                   cast=False, timeout=1,
+                                   transport_options=options)
+
+        self.assertRaises(oslo_messaging.MessagingTimeout,
+                          client1.delay)
+
+        # second test with `at_least_once=True` raises a "MessageUndeliverable"
+        # caused by mandatory flag.
+        # the MessageUndeliverable error is raised immediately without waiting
+        # any timeout
+        options2 = oslo_messaging.TransportOptions(at_least_once=True)
+        client2 = utils.ClientStub(transport.transport, target,
+                                   cast=False, timeout=60,
+                                   transport_options=options2)
+
+        self.assertRaises(oslo_messaging.MessageUndeliverable,
+                          client2.delay)
+
     def test_monitor_long_call(self):
         if not (self.url.startswith("rabbit://") or
                 self.url.startswith("amqp://")):
diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py
index 4d403a07b..700c16277 100644
--- a/oslo_messaging/tests/functional/utils.py
+++ b/oslo_messaging/tests/functional/utils.py
@@ -226,10 +226,15 @@ class RpcCast(RpcCall):
 
 
 class ClientStub(object):
-    def __init__(self, transport, target, cast=False, name=None, **kwargs):
+    def __init__(self, transport, target, cast=False, name=None,
+                 transport_options=None, **kwargs):
         self.name = name or "functional-tests"
         self.cast = cast
-        self.client = oslo_messaging.RPCClient(transport, target, **kwargs)
+        self.client = oslo_messaging.RPCClient(
+            transport=transport,
+            target=target,
+            transport_options=transport_options,
+            **kwargs)
 
     def __getattr__(self, name):
         context = {"application": self.name}
diff --git a/requirements.txt b/requirements.txt
index 2e05118b4..7ed73945c 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -26,7 +26,7 @@ PyYAML>=3.12 # MIT
 # rabbit driver is the default
 # we set the amqp version to ensure heartbeat works
 amqp>=2.4.1 # BSD
-kombu!=4.0.2,>=4.0.0 # BSD
+kombu!=4.0.2,>=4.6.1 # BSD
 
 # middleware
 oslo.middleware>=3.31.0 # Apache-2.0