Merge "Periodically purge sender link cache"

This commit is contained in:
Jenkins 2016-12-05 22:43:56 +00:00 committed by Gerrit Code Review
commit 13dffe9913
3 changed files with 104 additions and 15 deletions
oslo_messaging
_drivers/amqp1_driver
tests/drivers

@ -287,7 +287,7 @@ class Sender(pyngus.SenderEventHandler):
response to a close requested by the remote. May be re-attached later
(after a reset is done)
"""
self._address = None
LOG.debug("Sender %s detached", self._address)
self._connection = None
self._reply_link = None
if self._link:
@ -311,6 +311,7 @@ class Sender(pyngus.SenderEventHandler):
"""Destroy the sender and all pending messages. Called on driver
shutdown.
"""
LOG.debug("Sender %s destroyed", self._address)
self.reset()
self._abort_pending("Link destroyed")
@ -337,7 +338,7 @@ class Sender(pyngus.SenderEventHandler):
# Pyngus callbacks:
def sender_active(self, sender_link):
LOG.debug("sender %s active", self._address)
LOG.debug("Sender %s active", self._address)
self._send_pending()
def credit_granted(self, sender_link):
@ -353,7 +354,6 @@ class Sender(pyngus.SenderEventHandler):
# sender_closed() will be called once the link completes closing
def sender_closed(self, sender_link):
LOG.debug("sender %s closed", self._address)
self._abort_unacked("Sender closed")
if self._connection:
# still attached, so attempt to restart the link
@ -756,8 +756,12 @@ class Controller(pyngus.ConnectionEventHandler):
# This allows the eventloop main thread to return to servicing socket
# I/O in a timely manner
self._max_task_batch = 50
# cache of Sender links indexed by address:
self._senders = {}
# cache of all Sender links indexed by address:
self._all_senders = {}
# active Sender links indexed by address:
self._active_senders = set()
# closing Sender links indexed by address:
self._purged_senders = []
# Servers indexed by target. Each entry is a map indexed by the
# specific ProtonListener's identifier:
self._servers = {}
@ -817,6 +821,9 @@ class Controller(pyngus.ConnectionEventHandler):
self._reply_credit = _opts.reply_link_credit
self._rpc_credit = _opts.rpc_server_credit
self._notify_credit = _opts.notify_server_credit
# sender link maintenance timer and interval
self._link_maint_timer = None
self._link_maint_timeout = _opts.default_sender_link_timeout
def connect(self):
"""Connect to the messaging service."""
@ -837,9 +844,9 @@ class Controller(pyngus.ConnectionEventHandler):
LOG.debug("Waiting for eventloop to exit")
self.processor.join(timeout)
self._hard_reset()
for sender in itervalues(self._senders):
for sender in itervalues(self._all_senders):
sender.destroy()
self._senders.clear()
self._all_senders.clear()
self._servers.clear()
self.processor.destroy()
self.processor = None
@ -857,14 +864,15 @@ class Controller(pyngus.ConnectionEventHandler):
if send_task.retry is None or send_task.retry < 0:
send_task.retry = None
key = keyify(send_task.target, send_task.service)
sender = self._senders.get(key)
sender = self._all_senders.get(key)
if not sender:
sender = Sender(send_task.target, self.processor,
self.link_retry_delay, send_task.service)
self._senders[key] = sender
self._all_senders[key] = sender
if self.reply_link and self.reply_link.active:
sender.attach(self._socket_connection.connection,
self.reply_link, self.addresser)
self._active_senders.add(key)
sender.send_message(send_task)
def subscribe(self, subscribe_task):
@ -987,7 +995,7 @@ class Controller(pyngus.ConnectionEventHandler):
LOG.info(_LI("Messaging is active (%(hostname)s:%(port)s)"),
{'hostname': self.hosts.current.hostname,
'port': self.hosts.current.port})
for sender in itervalues(self._senders):
for sender in itervalues(self._all_senders):
sender.attach(self._socket_connection.connection,
self.reply_link, self.addresser)
@ -1041,6 +1049,9 @@ class Controller(pyngus.ConnectionEventHandler):
self._reply_link_down,
self._reply_credit)
self._delay = 1
# schedule periodic maintenance of sender links
self._link_maint_timer = self.processor.defer(self._purge_sender_links,
self._link_maint_timeout)
def connection_closed(self, connection):
"""This is a Pyngus callback, invoked by Pyngus when the connection has
@ -1098,6 +1109,9 @@ class Controller(pyngus.ConnectionEventHandler):
self._delay)
self.processor.defer(self._do_reconnect, self._delay)
self._delay = min(self._delay * 2, 60)
if self._link_maint_timer:
self._link_maint_timer.cancel()
self._link_maint_timer = None
def _do_reconnect(self):
"""Invoked on connection/socket failure, failover and re-connect to the
@ -1116,16 +1130,21 @@ class Controller(pyngus.ConnectionEventHandler):
# note well: since this method destroys the connection, it cannot be
# invoked directly from a pyngus callback. Use processor.defer() to
# run this method on the main loop instead.
for sender in self._purged_senders:
sender.destroy()
del self._purged_senders[:]
self._active_senders.clear()
unused = []
for key, sender in iteritems(self._senders):
# clean up any unused sender links:
for key, sender in iteritems(self._all_senders):
# clean up any unused sender links
if sender.pending_messages == 0:
unused.append(key)
else:
sender.reset()
self._active_senders.add(key)
for key in unused:
self._senders[key].destroy()
del self._senders[key]
self._all_senders[key].destroy()
del self._all_senders[key]
for servers in itervalues(self._servers):
for server in itervalues(servers):
server.reset()
@ -1137,7 +1156,7 @@ class Controller(pyngus.ConnectionEventHandler):
def _detach_senders(self):
"""Close all sender links"""
for sender in itervalues(self._senders):
for sender in itervalues(self._all_senders):
sender.detach()
def _detach_servers(self):
@ -1146,6 +1165,26 @@ class Controller(pyngus.ConnectionEventHandler):
for server in itervalues(servers):
server.detach()
def _purge_sender_links(self):
"""Purge inactive sender links"""
if not self._closing:
# destroy links that have already been closed
for sender in self._purged_senders:
sender.destroy()
del self._purged_senders[:]
# determine next set to purge
purge = set(self._all_senders.keys()) - self._active_senders
for key in purge:
sender = self._all_senders[key]
if sender.pending_messages == 0:
sender.detach()
self._purged_senders.append(self._all_senders.pop(key))
self._active_senders.clear()
self._link_maint_timer = \
self.processor.defer(self._purge_sender_links,
self._link_maint_timeout)
@property
def _active(self):
# Is the connection up

@ -127,6 +127,13 @@ amqp1_opts = [
help='The deadline for a sent notification message delivery.'
' Only used when caller does not provide a timeout expiry.'),
# Sender link cache maintenance:
cfg.IntOpt('default_sender_link_timeout',
default=600,
min=1,
help='The duration to schedule a purge of idle sender links.'
' Detach link after expiry.'),
# Addressing:
cfg.StrOpt('addressing_mode',

@ -505,6 +505,49 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto):
listener.join(timeout=30)
driver.cleanup()
def test_sender_link_maintenance(self):
# ensure links are purged from cache
self.config(default_sender_link_timeout=1,
group="oslo_messaging_amqp")
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
target = oslo_messaging.Target(topic="test-topic-maint")
listener = _ListenerThread(
driver.listen(target, None, None)._poll_style_listener, 3)
# the send should create a receiver link on the broker
rc = driver.send(target, {"context": True},
{"msg": "value"}, wait_for_reply=False)
self.assertIsNone(rc)
predicate = lambda: (self._broker.receiver_link_count == 1)
_wait_until(predicate, 30)
self.assertTrue(predicate())
self.assertTrue(listener.isAlive())
self.assertEqual({"msg": "value"}, listener.messages.get().message)
predicate = lambda: (self._broker.receiver_link_count == 0)
_wait_until(predicate, 30)
self.assertTrue(predicate())
# the next send should create a separate receiver link on the broker
rc = driver.send(target, {"context": True},
{"msg": "value"}, wait_for_reply=False)
self.assertIsNone(rc)
predicate = lambda: (self._broker.receiver_link_count == 1)
_wait_until(predicate, 30)
self.assertTrue(predicate())
self.assertTrue(listener.isAlive())
self.assertEqual({"msg": "value"}, listener.messages.get().message)
predicate = lambda: (self._broker.receiver_link_count == 0)
_wait_until(predicate, 30)
self.assertTrue(predicate())
driver.cleanup()
class TestAmqpNotification(_AmqpBrokerTestCaseAuto):
"""Test sending and receiving notifications."""