diff --git a/oslo_messaging/_drivers/amqp1_driver/controller.py b/oslo_messaging/_drivers/amqp1_driver/controller.py index d2d59b642..2e7ec9e18 100644 --- a/oslo_messaging/_drivers/amqp1_driver/controller.py +++ b/oslo_messaging/_drivers/amqp1_driver/controller.py @@ -235,17 +235,19 @@ class MessageDispositionTask(Task): super(MessageDispositionTask, self).__init__() self._disposition = disposition self._released = released - self._wakeup = threading.Event() def wait(self): - self._wakeup.wait() + # disposition update does not have to block the sender since there is + # no result to pend for. This avoids a thread context switch with + # every RPC call + pass def _execute(self, controller): try: self._disposition(self._released) - except Exception: - pass - self._wakeup.set() + except Exception as e: + # there's really nothing we can do about a failed disposition. + LOG.exception(_LE("Message acknowledgment failed: %s"), e) class Sender(pyngus.SenderEventHandler): diff --git a/oslo_messaging/_drivers/impl_amqp1.py b/oslo_messaging/_drivers/impl_amqp1.py index 16da05fec..e48c47824 100644 --- a/oslo_messaging/_drivers/impl_amqp1.py +++ b/oslo_messaging/_drivers/impl_amqp1.py @@ -124,18 +124,12 @@ class ProtonIncomingMessage(base.RpcIncomingMessage): task = controller.MessageDispositionTask(self._disposition, released=False) self.listener.driver._ctrl.add_task(task) - rc = task.wait() - if rc: - LOG.debug("Message acknowledge failed: %s", str(rc)) def requeue(self): """Schedule a MessageDispositionTask to release the message""" task = controller.MessageDispositionTask(self._disposition, released=True) self.listener.driver._ctrl.add_task(task) - rc = task.wait() - if rc: - LOG.debug("Message requeue failed: %s", str(rc)) class Queue(object):