diff --git a/oslo_messaging/_drivers/amqp1_driver/controller.py b/oslo_messaging/_drivers/amqp1_driver/controller.py index f9b3b5717..47b2c934a 100644 --- a/oslo_messaging/_drivers/amqp1_driver/controller.py +++ b/oslo_messaging/_drivers/amqp1_driver/controller.py @@ -192,9 +192,9 @@ class RPCCallTask(SendTask): the destination. """ - def __init__(self, target, message, deadline, retry): + def __init__(self, target, message, deadline, retry, wait_for_ack): super(RPCCallTask, self).__init__("RPC Call", message, target, - deadline, retry, wait_for_ack=True) + deadline, retry, wait_for_ack) self._reply_link = None self._reply_msg = None self._msg_id = None diff --git a/oslo_messaging/_drivers/amqp1_driver/opts.py b/oslo_messaging/_drivers/amqp1_driver/opts.py index ed817ac5f..4451c8e73 100644 --- a/oslo_messaging/_drivers/amqp1_driver/opts.py +++ b/oslo_messaging/_drivers/amqp1_driver/opts.py @@ -223,5 +223,19 @@ amqp1_opts = [ cfg.IntOpt('notify_server_credit', default=100, min=1, - help='Window size for incoming Notification messages') + help='Window size for incoming Notification messages'), + + # Settlement control + + cfg.MultiStrOpt('pre_settled', + default=['rpc-cast'], + help="Send messages of this type pre-settled.\n" + "Pre-settled messages will not receive acknowledgement\n" + "from the peer. Note well: pre-settled messages may be\n" + "silently discarded if the delivery fails.\n" + "Permitted values:\n" + "'rpc-call' - send RPC Calls pre-settled\n" + "'rpc-reply'- send RPC Replies pre-settled\n" + "'rpc-cast' - Send RPC Casts pre-settled\n" + "'notify' - Send Notifications pre-settled\n") ] diff --git a/oslo_messaging/_drivers/impl_amqp1.py b/oslo_messaging/_drivers/impl_amqp1.py index 2b9bfac9d..6531f55cd 100644 --- a/oslo_messaging/_drivers/impl_amqp1.py +++ b/oslo_messaging/_drivers/impl_amqp1.py @@ -107,11 +107,12 @@ class ProtonIncomingMessage(base.RpcIncomingMessage): self._correlation_id) driver = self.listener.driver deadline = compute_timeout(driver._default_reply_timeout) + ack = not driver._pre_settle_reply task = controller.SendTask("RPC Reply", response, self._reply_to, # analogous to kombu missing dest t/o: deadline, retry=0, - wait_for_ack=True) + wait_for_ack=ack) driver._ctrl.add_task(task) rc = task.wait() if rc: @@ -225,6 +226,18 @@ class ProtonDriver(base.BaseDriver): self._default_send_timeout = opt_name.default_send_timeout self._default_notify_timeout = opt_name.default_notify_timeout + # which message types should be sent pre-settled? + ps = [s.lower() for s in opt_name.pre_settled] + self._pre_settle_call = 'rpc-call' in ps + self._pre_settle_reply = 'rpc-reply' in ps + self._pre_settle_cast = 'rpc-cast' in ps + self._pre_settle_notify = 'notify' in ps + bad_opts = set(ps).difference(['rpc-call', 'rpc-reply', + 'rpc-cast', 'notify']) + if bad_opts: + LOG.warning(_LW("Ignoring unrecognized pre_settle value(s): %s"), + " ".join(bad_opts)) + def _ensure_connect_called(func): """Causes a new controller to be created when the messaging service is first used by the current process. It is safe to push tasks to it @@ -297,10 +310,13 @@ class ProtonDriver(base.BaseDriver): expire = compute_timeout(self._default_send_timeout) LOG.debug("Sending message to %s", target) if wait_for_reply: - task = controller.RPCCallTask(target, request, expire, retry) + ack = not self._pre_settle_call + task = controller.RPCCallTask(target, request, expire, retry, + wait_for_ack=ack) else: + ack = not self._pre_settle_cast task = controller.SendTask("RPC Cast", request, target, expire, - retry, wait_for_ack=True) + retry, wait_for_ack=ack) self._ctrl.add_task(task) reply = task.wait() @@ -340,8 +356,9 @@ class ProtonDriver(base.BaseDriver): # TODO(kgiusti) should raise NotImplemented if not broker backend LOG.debug("Send notification to %s", target) deadline = compute_timeout(self._default_notify_timeout) + ack = not self._pre_settle_notify task = controller.SendTask("Notify", request, target, - deadline, retry, wait_for_ack=True, + deadline, retry, wait_for_ack=ack, notification=True) self._ctrl.add_task(task) rc = task.wait() diff --git a/oslo_messaging/tests/drivers/test_amqp_driver.py b/oslo_messaging/tests/drivers/test_amqp_driver.py index 0d3bbb6fd..cc8f9c370 100644 --- a/oslo_messaging/tests/drivers/test_amqp_driver.py +++ b/oslo_messaging/tests/drivers/test_amqp_driver.py @@ -321,8 +321,10 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto): def test_send_not_acked(self): """Verify exception thrown if send Nacked.""" + self.config(pre_settled=[], + group="oslo_messaging_amqp") driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) - # TODO(kgiusti): update when in config: + # set this directly so we can use a value < minimum allowed driver._default_send_timeout = 2 target = oslo_messaging.Target(topic="!no-ack!") @@ -334,6 +336,23 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto): wait_for_reply=False) driver.cleanup() + def test_no_ack_cast(self): + """Verify no exception is thrown if acks are turned off""" + # set casts to ignore ack + self.config(pre_settled=['rpc-cast'], + group="oslo_messaging_amqp") + driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) + # set this directly so we can use a value < minimum allowed + driver._default_send_timeout = 2 + target = oslo_messaging.Target(topic="!no-ack!") + + # the broker will silently discard this cast, but since ack'ing is + # disabled the send does not fail + driver.send(target, {"context": "whatever"}, + {"method": "drop"}, + wait_for_reply=False) + driver.cleanup() + def test_call_late_reply(self): """What happens if reply arrives after timeout?""" @@ -540,7 +559,7 @@ class TestAmqpNotification(_AmqpBrokerTestCaseAuto): def test_notification_not_acked(self): driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) - # TODO(kgiusti): update when in config: + # set this directly so we can use a value < minimum allowed driver._default_notify_timeout = 2 self.assertRaises(oslo_messaging.MessageDeliveryFailure, driver.send_notification, @@ -549,6 +568,19 @@ class TestAmqpNotification(_AmqpBrokerTestCaseAuto): 2.0) driver.cleanup() + def test_no_ack_notification(self): + """Verify no exception is thrown if acks are turned off""" + # add a couple of illegal values for coverage of the warning + self.config(pre_settled=['notify', 'fleabag', 'poochie'], + group="oslo_messaging_amqp") + + driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) + # set this directly so we can use a value < minimum allowed + driver._default_notify_timeout = 2 + driver.send_notification(oslo_messaging.Target(topic="!no-ack!"), + "context", {'target': "!no-ack!"}, 2.0) + driver.cleanup() + @testtools.skipUnless(pyngus and pyngus.VERSION < (2, 0, 0), "pyngus module not present")