Merge "rabbit: smarter declaration of the notif. queue"

This commit is contained in:
Jenkins 2015-05-13 19:54:35 +00:00 committed by Gerrit Code Review
commit 4fa7795f42
2 changed files with 87 additions and 14 deletions
oslo_messaging

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import contextlib
import logging
import os
@ -295,23 +296,33 @@ class DeclareQueuePublisher(Publisher):
them yet. If the future consumer binds the default queue it can retrieve
missing messages.
"""
# FIXME(sileht): The side effect of this is that we declare again and
# again the same queue, and generate a lot of useless rabbit traffic.
# https://bugs.launchpad.net/oslo.messaging/+bug/1437902
DECLARED_QUEUES = collections.defaultdict(set)
def send(self, conn, msg, timeout=None):
queue = kombu.entity.Queue(
channel=conn.channel,
exchange=self.exchange,
durable=self.durable,
auto_delete=self.auto_delete,
name=self.routing_key,
routing_key=self.routing_key,
queue_arguments=self.queue_arguments)
queue.declare()
queue_indentifier = (self.exchange_name,
self.routing_key)
# NOTE(sileht): We only do it once per reconnection
# the Connection._set_current_channel() is responsible to clear
# this cache
if queue_indentifier not in self.DECLARED_QUEUES[conn.channel]:
queue = kombu.entity.Queue(
channel=conn.channel,
exchange=self.exchange,
durable=self.durable,
auto_delete=self.auto_delete,
name=self.routing_key,
routing_key=self.routing_key,
queue_arguments=self.queue_arguments)
queue.declare()
self.DECLARED_QUEUES[conn.channel].add(queue_indentifier)
super(DeclareQueuePublisher, self).send(
conn, msg, timeout)
@classmethod
def reset_cache(cls, channel):
cls.DECLARED_QUEUES.pop(channel, None)
class RetryOnMissingExchangePublisher(Publisher):
"""Publisher that retry during 60 seconds if the exchange is missing."""
@ -577,6 +588,9 @@ class Connection(object):
if self._url.startswith('memory://'):
# Kludge to speed up tests.
self.connection.transport.polling_interval = 0.0
# Fixup logging
self.connection.hostname = "memory_driver"
self.connection.port = 1234
self._poll_timeout = 0.05
# FIXME(markmc): use oslo sslutils when it is available as a library
@ -715,8 +729,18 @@ class Connection(object):
self._set_current_channel(channel)
method()
recoverable_errors = (self.connection.recoverable_channel_errors +
self.connection.recoverable_connection_errors)
# NOTE(sileht): Some dummy driver like the in-memory one doesn't
# have notion of recoverable connection, so we must raise the original
# exception like kombu does in this case.
has_modern_errors = hasattr(
self.connection.transport, 'recoverable_connection_errors',
)
if has_modern_errors:
recoverable_errors = (
self.connection.recoverable_channel_errors +
self.connection.recoverable_connection_errors)
else:
recoverable_errors = ()
try:
autoretry_method = self.connection.autoretry(
@ -756,6 +780,7 @@ class Connection(object):
NOTE(sileht): Must be called within the connection lock
"""
if self.channel is not None and new_channel != self.channel:
DeclareQueuePublisher.reset_cache(self.channel)
self.connection.maybe_close_channel(self.channel)
self.channel = new_channel

@ -173,6 +173,54 @@ class TestRabbitDriverLoadSSL(test_utils.BaseTestCase):
heartbeat=0, failover_strategy="shuffle")
class RaiseOnNoExchangePublisher(rabbit_driver.Publisher):
passive = True
class TestRabbitPublisher(test_utils.BaseTestCase):
def test_declared_queue_publisher(self):
transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////')
self.addCleanup(transport.cleanup)
p1 = RaiseOnNoExchangePublisher(
self.conf.oslo_messaging_rabbit,
exchange_name='foobar',
routing_key='foobar',
type='topic',
durable=False,
auto_delete=False)
p2 = rabbit_driver.DeclareQueuePublisher(
self.conf.oslo_messaging_rabbit,
exchange_name='foobar',
routing_key='foobar',
type='topic',
durable=False,
auto_delete=False)
with transport._driver._get_connection(amqp.PURPOSE_SEND) as pool_conn:
conn = pool_conn.connection
exc = conn.connection.channel_errors[0]
# Ensure the exchange does not exists
self.assertRaises(exc, conn.publisher_send, p1, {})
# Creates it
conn.publisher_send(p2, {})
# Ensure it creates it
conn.publisher_send(p1, {})
with mock.patch('kombu.messaging.Producer',
side_effect=exc):
# Shoud reset the cache and ensures the exchange does
# not exitsts
self.assertRaises(exc, conn.publisher_send, p1, {})
# Recreates it
conn.publisher_send(p2, {})
# Ensure it have been recreated
conn.publisher_send(p1, {})
class TestRabbitConsume(test_utils.BaseTestCase):
def test_consume_timeout(self):