Merge "[AMQP 1.0] Make the default settlement behavior configurable"

This commit is contained in:
Jenkins 2016-09-07 22:03:28 +00:00 committed by Gerrit Code Review
commit 7cac918e73
4 changed files with 72 additions and 9 deletions
oslo_messaging

@ -192,9 +192,9 @@ class RPCCallTask(SendTask):
the destination. the destination.
""" """
def __init__(self, target, message, deadline, retry): def __init__(self, target, message, deadline, retry, wait_for_ack):
super(RPCCallTask, self).__init__("RPC Call", message, target, super(RPCCallTask, self).__init__("RPC Call", message, target,
deadline, retry, wait_for_ack=True) deadline, retry, wait_for_ack)
self._reply_link = None self._reply_link = None
self._reply_msg = None self._reply_msg = None
self._msg_id = None self._msg_id = None

@ -223,5 +223,19 @@ amqp1_opts = [
cfg.IntOpt('notify_server_credit', cfg.IntOpt('notify_server_credit',
default=100, default=100,
min=1, min=1,
help='Window size for incoming Notification messages') help='Window size for incoming Notification messages'),
# Settlement control
cfg.MultiStrOpt('pre_settled',
default=['rpc-cast'],
help="Send messages of this type pre-settled.\n"
"Pre-settled messages will not receive acknowledgement\n"
"from the peer. Note well: pre-settled messages may be\n"
"silently discarded if the delivery fails.\n"
"Permitted values:\n"
"'rpc-call' - send RPC Calls pre-settled\n"
"'rpc-reply'- send RPC Replies pre-settled\n"
"'rpc-cast' - Send RPC Casts pre-settled\n"
"'notify' - Send Notifications pre-settled\n")
] ]

@ -107,11 +107,12 @@ class ProtonIncomingMessage(base.RpcIncomingMessage):
self._correlation_id) self._correlation_id)
driver = self.listener.driver driver = self.listener.driver
deadline = compute_timeout(driver._default_reply_timeout) deadline = compute_timeout(driver._default_reply_timeout)
ack = not driver._pre_settle_reply
task = controller.SendTask("RPC Reply", response, self._reply_to, task = controller.SendTask("RPC Reply", response, self._reply_to,
# analogous to kombu missing dest t/o: # analogous to kombu missing dest t/o:
deadline, deadline,
retry=0, retry=0,
wait_for_ack=True) wait_for_ack=ack)
driver._ctrl.add_task(task) driver._ctrl.add_task(task)
rc = task.wait() rc = task.wait()
if rc: if rc:
@ -225,6 +226,18 @@ class ProtonDriver(base.BaseDriver):
self._default_send_timeout = opt_name.default_send_timeout self._default_send_timeout = opt_name.default_send_timeout
self._default_notify_timeout = opt_name.default_notify_timeout self._default_notify_timeout = opt_name.default_notify_timeout
# which message types should be sent pre-settled?
ps = [s.lower() for s in opt_name.pre_settled]
self._pre_settle_call = 'rpc-call' in ps
self._pre_settle_reply = 'rpc-reply' in ps
self._pre_settle_cast = 'rpc-cast' in ps
self._pre_settle_notify = 'notify' in ps
bad_opts = set(ps).difference(['rpc-call', 'rpc-reply',
'rpc-cast', 'notify'])
if bad_opts:
LOG.warning(_LW("Ignoring unrecognized pre_settle value(s): %s"),
" ".join(bad_opts))
def _ensure_connect_called(func): def _ensure_connect_called(func):
"""Causes a new controller to be created when the messaging service is """Causes a new controller to be created when the messaging service is
first used by the current process. It is safe to push tasks to it first used by the current process. It is safe to push tasks to it
@ -297,10 +310,13 @@ class ProtonDriver(base.BaseDriver):
expire = compute_timeout(self._default_send_timeout) expire = compute_timeout(self._default_send_timeout)
LOG.debug("Sending message to %s", target) LOG.debug("Sending message to %s", target)
if wait_for_reply: if wait_for_reply:
task = controller.RPCCallTask(target, request, expire, retry) ack = not self._pre_settle_call
task = controller.RPCCallTask(target, request, expire, retry,
wait_for_ack=ack)
else: else:
ack = not self._pre_settle_cast
task = controller.SendTask("RPC Cast", request, target, expire, task = controller.SendTask("RPC Cast", request, target, expire,
retry, wait_for_ack=True) retry, wait_for_ack=ack)
self._ctrl.add_task(task) self._ctrl.add_task(task)
reply = task.wait() reply = task.wait()
@ -340,8 +356,9 @@ class ProtonDriver(base.BaseDriver):
# TODO(kgiusti) should raise NotImplemented if not broker backend # TODO(kgiusti) should raise NotImplemented if not broker backend
LOG.debug("Send notification to %s", target) LOG.debug("Send notification to %s", target)
deadline = compute_timeout(self._default_notify_timeout) deadline = compute_timeout(self._default_notify_timeout)
ack = not self._pre_settle_notify
task = controller.SendTask("Notify", request, target, task = controller.SendTask("Notify", request, target,
deadline, retry, wait_for_ack=True, deadline, retry, wait_for_ack=ack,
notification=True) notification=True)
self._ctrl.add_task(task) self._ctrl.add_task(task)
rc = task.wait() rc = task.wait()

@ -321,8 +321,10 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto):
def test_send_not_acked(self): def test_send_not_acked(self):
"""Verify exception thrown if send Nacked.""" """Verify exception thrown if send Nacked."""
self.config(pre_settled=[],
group="oslo_messaging_amqp")
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
# TODO(kgiusti): update when in config: # set this directly so we can use a value < minimum allowed
driver._default_send_timeout = 2 driver._default_send_timeout = 2
target = oslo_messaging.Target(topic="!no-ack!") target = oslo_messaging.Target(topic="!no-ack!")
@ -334,6 +336,23 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto):
wait_for_reply=False) wait_for_reply=False)
driver.cleanup() driver.cleanup()
def test_no_ack_cast(self):
"""Verify no exception is thrown if acks are turned off"""
# set casts to ignore ack
self.config(pre_settled=['rpc-cast'],
group="oslo_messaging_amqp")
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
# set this directly so we can use a value < minimum allowed
driver._default_send_timeout = 2
target = oslo_messaging.Target(topic="!no-ack!")
# the broker will silently discard this cast, but since ack'ing is
# disabled the send does not fail
driver.send(target, {"context": "whatever"},
{"method": "drop"},
wait_for_reply=False)
driver.cleanup()
def test_call_late_reply(self): def test_call_late_reply(self):
"""What happens if reply arrives after timeout?""" """What happens if reply arrives after timeout?"""
@ -540,7 +559,7 @@ class TestAmqpNotification(_AmqpBrokerTestCaseAuto):
def test_notification_not_acked(self): def test_notification_not_acked(self):
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
# TODO(kgiusti): update when in config: # set this directly so we can use a value < minimum allowed
driver._default_notify_timeout = 2 driver._default_notify_timeout = 2
self.assertRaises(oslo_messaging.MessageDeliveryFailure, self.assertRaises(oslo_messaging.MessageDeliveryFailure,
driver.send_notification, driver.send_notification,
@ -549,6 +568,19 @@ class TestAmqpNotification(_AmqpBrokerTestCaseAuto):
2.0) 2.0)
driver.cleanup() driver.cleanup()
def test_no_ack_notification(self):
"""Verify no exception is thrown if acks are turned off"""
# add a couple of illegal values for coverage of the warning
self.config(pre_settled=['notify', 'fleabag', 'poochie'],
group="oslo_messaging_amqp")
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
# set this directly so we can use a value < minimum allowed
driver._default_notify_timeout = 2
driver.send_notification(oslo_messaging.Target(topic="!no-ack!"),
"context", {'target': "!no-ack!"}, 2.0)
driver.cleanup()
@testtools.skipUnless(pyngus and pyngus.VERSION < (2, 0, 0), @testtools.skipUnless(pyngus and pyngus.VERSION < (2, 0, 0),
"pyngus module not present") "pyngus module not present")