diff --git a/oslo_messaging/_drivers/amqp1_driver/controller.py b/oslo_messaging/_drivers/amqp1_driver/controller.py index 2e7ec9e18..007ca3a88 100644 --- a/oslo_messaging/_drivers/amqp1_driver/controller.py +++ b/oslo_messaging/_drivers/amqp1_driver/controller.py @@ -287,7 +287,7 @@ class Sender(pyngus.SenderEventHandler): response to a close requested by the remote. May be re-attached later (after a reset is done) """ - self._address = None + LOG.debug("Sender %s detached", self._address) self._connection = None self._reply_link = None if self._link: @@ -311,6 +311,7 @@ class Sender(pyngus.SenderEventHandler): """Destroy the sender and all pending messages. Called on driver shutdown. """ + LOG.debug("Sender %s destroyed", self._address) self.reset() self._abort_pending("Link destroyed") @@ -337,7 +338,7 @@ class Sender(pyngus.SenderEventHandler): # Pyngus callbacks: def sender_active(self, sender_link): - LOG.debug("sender %s active", self._address) + LOG.debug("Sender %s active", self._address) self._send_pending() def credit_granted(self, sender_link): @@ -353,7 +354,6 @@ class Sender(pyngus.SenderEventHandler): # sender_closed() will be called once the link completes closing def sender_closed(self, sender_link): - LOG.debug("sender %s closed", self._address) self._abort_unacked("Sender closed") if self._connection: # still attached, so attempt to restart the link @@ -756,8 +756,12 @@ class Controller(pyngus.ConnectionEventHandler): # This allows the eventloop main thread to return to servicing socket # I/O in a timely manner self._max_task_batch = 50 - # cache of Sender links indexed by address: - self._senders = {} + # cache of all Sender links indexed by address: + self._all_senders = {} + # active Sender links indexed by address: + self._active_senders = set() + # closing Sender links indexed by address: + self._purged_senders = [] # Servers indexed by target. Each entry is a map indexed by the # specific ProtonListener's identifier: self._servers = {} @@ -817,6 +821,9 @@ class Controller(pyngus.ConnectionEventHandler): self._reply_credit = _opts.reply_link_credit self._rpc_credit = _opts.rpc_server_credit self._notify_credit = _opts.notify_server_credit + # sender link maintenance timer and interval + self._link_maint_timer = None + self._link_maint_timeout = _opts.default_sender_link_timeout def connect(self): """Connect to the messaging service.""" @@ -837,9 +844,9 @@ class Controller(pyngus.ConnectionEventHandler): LOG.debug("Waiting for eventloop to exit") self.processor.join(timeout) self._hard_reset() - for sender in itervalues(self._senders): + for sender in itervalues(self._all_senders): sender.destroy() - self._senders.clear() + self._all_senders.clear() self._servers.clear() self.processor.destroy() self.processor = None @@ -857,14 +864,15 @@ class Controller(pyngus.ConnectionEventHandler): if send_task.retry is None or send_task.retry < 0: send_task.retry = None key = keyify(send_task.target, send_task.service) - sender = self._senders.get(key) + sender = self._all_senders.get(key) if not sender: sender = Sender(send_task.target, self.processor, self.link_retry_delay, send_task.service) - self._senders[key] = sender + self._all_senders[key] = sender if self.reply_link and self.reply_link.active: sender.attach(self._socket_connection.connection, self.reply_link, self.addresser) + self._active_senders.add(key) sender.send_message(send_task) def subscribe(self, subscribe_task): @@ -987,7 +995,7 @@ class Controller(pyngus.ConnectionEventHandler): LOG.info(_LI("Messaging is active (%(hostname)s:%(port)s)"), {'hostname': self.hosts.current.hostname, 'port': self.hosts.current.port}) - for sender in itervalues(self._senders): + for sender in itervalues(self._all_senders): sender.attach(self._socket_connection.connection, self.reply_link, self.addresser) @@ -1041,6 +1049,9 @@ class Controller(pyngus.ConnectionEventHandler): self._reply_link_down, self._reply_credit) self._delay = 1 + # schedule periodic maintenance of sender links + self._link_maint_timer = self.processor.defer(self._purge_sender_links, + self._link_maint_timeout) def connection_closed(self, connection): """This is a Pyngus callback, invoked by Pyngus when the connection has @@ -1098,6 +1109,9 @@ class Controller(pyngus.ConnectionEventHandler): self._delay) self.processor.defer(self._do_reconnect, self._delay) self._delay = min(self._delay * 2, 60) + if self._link_maint_timer: + self._link_maint_timer.cancel() + self._link_maint_timer = None def _do_reconnect(self): """Invoked on connection/socket failure, failover and re-connect to the @@ -1116,16 +1130,21 @@ class Controller(pyngus.ConnectionEventHandler): # note well: since this method destroys the connection, it cannot be # invoked directly from a pyngus callback. Use processor.defer() to # run this method on the main loop instead. + for sender in self._purged_senders: + sender.destroy() + del self._purged_senders[:] + self._active_senders.clear() unused = [] - for key, sender in iteritems(self._senders): - # clean up any unused sender links: + for key, sender in iteritems(self._all_senders): + # clean up any unused sender links if sender.pending_messages == 0: unused.append(key) else: sender.reset() + self._active_senders.add(key) for key in unused: - self._senders[key].destroy() - del self._senders[key] + self._all_senders[key].destroy() + del self._all_senders[key] for servers in itervalues(self._servers): for server in itervalues(servers): server.reset() @@ -1137,7 +1156,7 @@ class Controller(pyngus.ConnectionEventHandler): def _detach_senders(self): """Close all sender links""" - for sender in itervalues(self._senders): + for sender in itervalues(self._all_senders): sender.detach() def _detach_servers(self): @@ -1146,6 +1165,26 @@ class Controller(pyngus.ConnectionEventHandler): for server in itervalues(servers): server.detach() + def _purge_sender_links(self): + """Purge inactive sender links""" + if not self._closing: + # destroy links that have already been closed + for sender in self._purged_senders: + sender.destroy() + del self._purged_senders[:] + + # determine next set to purge + purge = set(self._all_senders.keys()) - self._active_senders + for key in purge: + sender = self._all_senders[key] + if sender.pending_messages == 0: + sender.detach() + self._purged_senders.append(self._all_senders.pop(key)) + self._active_senders.clear() + self._link_maint_timer = \ + self.processor.defer(self._purge_sender_links, + self._link_maint_timeout) + @property def _active(self): # Is the connection up diff --git a/oslo_messaging/_drivers/amqp1_driver/opts.py b/oslo_messaging/_drivers/amqp1_driver/opts.py index c407ea58c..9278ebdd0 100644 --- a/oslo_messaging/_drivers/amqp1_driver/opts.py +++ b/oslo_messaging/_drivers/amqp1_driver/opts.py @@ -127,6 +127,13 @@ amqp1_opts = [ help='The deadline for a sent notification message delivery.' ' Only used when caller does not provide a timeout expiry.'), + # Sender link cache maintenance: + cfg.IntOpt('default_sender_link_timeout', + default=600, + min=1, + help='The duration to schedule a purge of idle sender links.' + ' Detach link after expiry.'), + # Addressing: cfg.StrOpt('addressing_mode', diff --git a/oslo_messaging/tests/drivers/test_amqp_driver.py b/oslo_messaging/tests/drivers/test_amqp_driver.py index cc1202cf6..dd1f25bac 100644 --- a/oslo_messaging/tests/drivers/test_amqp_driver.py +++ b/oslo_messaging/tests/drivers/test_amqp_driver.py @@ -505,6 +505,49 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto): listener.join(timeout=30) driver.cleanup() + def test_sender_link_maintenance(self): + # ensure links are purged from cache + self.config(default_sender_link_timeout=1, + group="oslo_messaging_amqp") + driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) + target = oslo_messaging.Target(topic="test-topic-maint") + listener = _ListenerThread( + driver.listen(target, None, None)._poll_style_listener, 3) + + # the send should create a receiver link on the broker + rc = driver.send(target, {"context": True}, + {"msg": "value"}, wait_for_reply=False) + self.assertIsNone(rc) + + predicate = lambda: (self._broker.receiver_link_count == 1) + _wait_until(predicate, 30) + self.assertTrue(predicate()) + + self.assertTrue(listener.isAlive()) + self.assertEqual({"msg": "value"}, listener.messages.get().message) + + predicate = lambda: (self._broker.receiver_link_count == 0) + _wait_until(predicate, 30) + self.assertTrue(predicate()) + + # the next send should create a separate receiver link on the broker + rc = driver.send(target, {"context": True}, + {"msg": "value"}, wait_for_reply=False) + self.assertIsNone(rc) + + predicate = lambda: (self._broker.receiver_link_count == 1) + _wait_until(predicate, 30) + self.assertTrue(predicate()) + + self.assertTrue(listener.isAlive()) + self.assertEqual({"msg": "value"}, listener.messages.get().message) + + predicate = lambda: (self._broker.receiver_link_count == 0) + _wait_until(predicate, 30) + self.assertTrue(predicate()) + + driver.cleanup() + class TestAmqpNotification(_AmqpBrokerTestCaseAuto): """Test sending and receiving notifications."""