diff --git a/oslo_messaging/_drivers/amqp1_driver/controller.py b/oslo_messaging/_drivers/amqp1_driver/controller.py index ad045013e..d2d59b642 100644 --- a/oslo_messaging/_drivers/amqp1_driver/controller.py +++ b/oslo_messaging/_drivers/amqp1_driver/controller.py @@ -120,21 +120,20 @@ class SendTask(Task): def _prepare(self, sender): """Called immediately before the message is handed off to the i/o - system. This implies that the sender link is up and credit is - available for this send request. + system. This implies that the sender link is up. """ - pass + if not self.wait_for_ack: + # sender is not concerned with waiting for acknowledgment + # "best effort at-most-once delivery" + self._cleanup() + self._wakeup.set() def _on_ack(self, state, info): """Called by eventloop thread when the ack/nack is received from the peer. """ - if self._wakeup.is_set(): - LOG.debug("Message ACKed after send completed: %s %s", state, info) - return - if state != pyngus.SenderLink.ACCEPTED: - # TODO(kgiusti): should retry if deadline not hit + # TODO(kgiusti): could retry if deadline not hit msg = ("{name} message send to {target} failed: remote" " disposition: {disp}, info:" "{info}".format(name=self.name, @@ -142,6 +141,7 @@ class SendTask(Task): disp=state, info=info)) self._error = exceptions.MessageDeliveryFailure(msg) + LOG.warning("%s", msg) self._cleanup() self._wakeup.set() @@ -149,18 +149,15 @@ class SendTask(Task): """Invoked by the eventloop when the send fails to complete before the timeout is reached. """ - if self._wakeup.is_set(): - LOG.debug("Message send timeout occurred after send completed") - return self.timer = None - if self.message.ttl: - msg = ("{name} message sent to {target} failed: timed" - " out".format(name=self.name, target=self.target)) - self._error = exceptions.MessagingTimeout(msg) - else: - msg = ("{name} message sent to {target} failed:" - " undeliverable".format(name=self.name, target=self.target)) - self._error = exceptions.MessageDeliveryFailure(msg) + msg = ("{name} message sent to {target} failed: timed" + " out".format(name=self.name, target=self.target)) + LOG.warning("%s", msg) + # Only raise a MessagingTimeout if the caller has explicitly specified + # a timeout. + self._error = exceptions.MessagingTimeout(msg) \ + if self.message.ttl else \ + exceptions.MessageDeliveryFailure(msg) self._cleanup() self._wakeup.set() @@ -168,15 +165,11 @@ class SendTask(Task): """Invoked by the eventloop if the send operation fails for reasons other than timeout and nack. """ - if self._wakeup.is_set(): - LOG.debug("Message send error occurred after send completed: %s", - str(description)) - return - msg = ("{name} message sent to {target} failed:" " {reason}".format(name=self.name, target=self.target, reason=description)) + LOG.warning("%s", msg) self._error = exceptions.MessageDeliveryFailure(msg) self._cleanup() self._wakeup.set() @@ -228,7 +221,7 @@ class RPCCallTask(SendTask): # must wait for reply if ACCEPTED def _cleanup(self): - if self._reply_link: + if self._reply_link and self._msg_id: self._reply_link.cancel_response(self._msg_id) self._msg_id = None super(RPCCallTask, self)._cleanup() @@ -334,9 +327,7 @@ class Sender(pyngus.SenderEventHandler): send_task.timer = self._scheduler.alarm(timer_callback, send_task.deadline) - if not self._can_send: - self._pending_sends.append(send_task) - elif self._pending_sends: + if not self._can_send or self._pending_sends: self._pending_sends.append(send_task) else: self._send(send_task) @@ -348,7 +339,7 @@ class Sender(pyngus.SenderEventHandler): self._send_pending() def credit_granted(self, sender_link): - self._send_pending() + pass def sender_remote_closed(self, sender_link, pn_condition): # The remote has initiated a close. This could happen when the message @@ -403,36 +394,30 @@ class Sender(pyngus.SenderEventHandler): @property def _can_send(self): - return (self._link is not None and - self._link.active and - self._link.credit > 0) + return self._link and self._link.active def _send(self, send_task): send_task._prepare(self) send_task.message.address = self._address - if send_task.wait_for_ack: - def pyngus_callback(link, handle, state, info): - # invoked when the message bus (n)acks this message - if state == pyngus.SenderLink.TIMED_OUT: - # ignore pyngus timeout - we maintain our own timer - return - self._unacked.discard(send_task) - send_task._on_ack(state, info) - self._unacked.add(send_task) - self._link.send(send_task.message, - delivery_callback=pyngus_callback, - handle=self, - deadline=send_task.deadline) - else: - self._link.send(send_task.message) - # simulate ack to wakeup sender - send_task._on_ack(pyngus.SenderLink.ACCEPTED, dict()) + def pyngus_callback(link, handle, state, info): + # invoked when the message bus (n)acks this message + if state == pyngus.SenderLink.TIMED_OUT: + # ignore pyngus timeout - we maintain our own timer + return + self._unacked.discard(send_task) + send_task._on_ack(state, info) + + self._unacked.add(send_task) + self._link.send(send_task.message, + delivery_callback=pyngus_callback, + handle=self, + deadline=send_task.deadline) def _send_pending(self): - # send as many pending messages as there is credit available + # send all pending messages if self._can_send: - while self._pending_sends and self._link.credit > 0: + while self._pending_sends: self._send(self._pending_sends.popleft()) def _open_link(self): diff --git a/oslo_messaging/_drivers/amqp1_driver/opts.py b/oslo_messaging/_drivers/amqp1_driver/opts.py index e7cc38494..c407ea58c 100644 --- a/oslo_messaging/_drivers/amqp1_driver/opts.py +++ b/oslo_messaging/_drivers/amqp1_driver/opts.py @@ -226,7 +226,7 @@ amqp1_opts = [ # Settlement control cfg.MultiStrOpt('pre_settled', - default=['rpc-cast'], + default=['rpc-cast', 'rpc-reply'], help="Send messages of this type pre-settled.\n" "Pre-settled messages will not receive acknowledgement\n" "from the peer. Note well: pre-settled messages may be\n" diff --git a/oslo_messaging/tests/drivers/test_amqp_driver.py b/oslo_messaging/tests/drivers/test_amqp_driver.py index cc8f9c370..d1ca36ce7 100644 --- a/oslo_messaging/tests/drivers/test_amqp_driver.py +++ b/oslo_messaging/tests/drivers/test_amqp_driver.py @@ -1085,16 +1085,16 @@ class TestLinkRecovery(_AmqpBrokerTestCase): target.fanout = True target.server = None # these threads will share the same link - th = [] for i in range(3): t = threading.Thread(target=driver.send, args=(target, {"context": "whatever"}, {"msg": "n=%d" % i}), kwargs={'wait_for_reply': False}) t.start() - t.join(timeout=1) - self.assertTrue(t.isAlive()) - th.append(t) + # casts return once message is put on active link + t.join(timeout=30) + + time.sleep(1) # ensure messages are going nowhere self.assertEqual(self._broker.fanout_sent_count, 0) # this will trigger the release of credit for the previous links target.fanout = False @@ -1106,9 +1106,6 @@ class TestLinkRecovery(_AmqpBrokerTestCase): listener.join(timeout=30) self.assertTrue(self._broker.fanout_count == 3) self.assertFalse(listener.isAlive()) - for t in th: - t.join(timeout=30) - self.assertFalse(t.isAlive()) driver.cleanup()