Merge "rabbit: remove publisher classes"

This commit is contained in:
Jenkins 2015-05-26 14:39:20 +00:00 committed by Gerrit Code Review
commit ecb7803d5f
2 changed files with 177 additions and 209 deletions
oslo_messaging

@ -14,6 +14,7 @@
import collections
import contextlib
import functools
import logging
import os
import socket
@ -248,134 +249,6 @@ class Consumer(object):
message.ack()
class Publisher(object):
"""Publisher that silently creates exchange but no queues."""
passive = False
def __init__(self, conf, exchange_name, routing_key, type, durable,
auto_delete):
"""Init the Publisher class with the exchange_name, routing_key,
type, durable auto_delete
"""
self.queue_arguments = _get_queue_arguments(conf)
self.exchange_name = exchange_name
self.routing_key = routing_key
self.auto_delete = auto_delete
self.durable = durable
self.exchange = kombu.entity.Exchange(name=self.exchange_name,
type=type,
durable=durable,
auto_delete=auto_delete,
passive=self.passive)
def send(self, conn, msg, timeout=None):
"""Send a message on an channel."""
producer = kombu.messaging.Producer(exchange=self.exchange,
channel=conn.channel,
routing_key=self.routing_key)
headers = {}
if timeout:
# AMQP TTL is in milliseconds when set in the property.
# Details: http://www.rabbitmq.com/ttl.html#per-message-ttl
# NOTE(sileht): this amqp header doesn't exists ... LP#1444854
headers['ttl'] = timeout * 1000
# NOTE(sileht): no need to wait more, caller expects
# a answer before timeout is reached
transport_timeout = timeout
heartbeat_timeout = conn.driver_conf.heartbeat_timeout_threshold
if (conn._heartbeat_supported_and_enabled() and (
transport_timeout is None or
transport_timeout > heartbeat_timeout)):
# NOTE(sileht): we are supposed to send heartbeat every
# heartbeat_timeout, no need to wait more otherwise will
# disconnect us, so raise timeout earlier ourself
transport_timeout = heartbeat_timeout
with conn._transport_socket_timeout(transport_timeout):
producer.publish(msg, headers=headers)
class DeclareQueuePublisher(Publisher):
"""Publisher that declares a default queue
When the exchange is missing instead of silently creating an exchange
not binded to a queue, this publisher creates a default queue
named with the routing_key.
This is mainly used to not miss notifications in case of nobody consumes
them yet. If the future consumer binds the default queue it can retrieve
missing messages.
"""
DECLARED_QUEUES = collections.defaultdict(set)
def send(self, conn, msg, timeout=None):
queue_indentifier = (self.exchange_name,
self.routing_key)
# NOTE(sileht): We only do it once per reconnection
# the Connection._set_current_channel() is responsible to clear
# this cache
if queue_indentifier not in self.DECLARED_QUEUES[conn.channel]:
queue = kombu.entity.Queue(
channel=conn.channel,
exchange=self.exchange,
durable=self.durable,
auto_delete=self.auto_delete,
name=self.routing_key,
routing_key=self.routing_key,
queue_arguments=self.queue_arguments)
queue.declare()
self.DECLARED_QUEUES[conn.channel].add(queue_indentifier)
super(DeclareQueuePublisher, self).send(
conn, msg, timeout)
@classmethod
def reset_cache(cls, channel):
cls.DECLARED_QUEUES.pop(channel, None)
class RetryOnMissingExchangePublisher(Publisher):
"""Publisher that retry during 60 seconds if the exchange is missing."""
passive = True
def send(self, conn, msg, timeout=None):
# TODO(sileht):
# * use timeout parameter when available
# * use rpc_timeout if not instead of hardcoded 60
# * use @retrying
timer = rpc_common.DecayingTimer(duration=60)
timer.start()
while True:
try:
super(RetryOnMissingExchangePublisher, self).send(conn, msg,
timeout)
return
except conn.connection.channel_errors as exc:
# NOTE(noelbk/sileht):
# If rabbit dies, the consumer can be disconnected before the
# publisher sends, and if the consumer hasn't declared the
# queue, the publisher's will send a message to an exchange
# that's not bound to a queue, and the message wll be lost.
# So we set passive=True to the publisher exchange and catch
# the 404 kombu ChannelError and retry until the exchange
# appears
if exc.code == 404 and timer.check_return() > 0:
LOG.info(_LI("The exchange %(exchange)s to send to "
"%(routing_key)s doesn't exist yet, "
"retrying...") % {
'exchange': self.exchange,
'routing_key': self.routing_key})
time.sleep(1)
continue
raise
class DummyConnectionLock(object):
def acquire(self):
pass
@ -795,7 +668,7 @@ class Connection(object):
NOTE(sileht): Must be called within the connection lock
"""
if self.channel is not None and new_channel != self.channel:
DeclareQueuePublisher.reset_cache(self.channel)
self.PUBLISHER_DECLARED_QUEUES.pop(self.channel, None)
self.connection.maybe_close_channel(self.channel)
self.channel = new_channel
@ -980,20 +853,8 @@ class Connection(object):
recoverable_error_callback=_recoverable_error_callback,
error_callback=_error_callback)
def publisher_send(self, publisher, msg, timeout=None, retry=None):
"""Send to a publisher based on the publisher class."""
def _error_callback(exc):
log_info = {'topic': publisher.exchange_name, 'err_str': exc}
LOG.error(_("Failed to publish message to topic "
"'%(topic)s': %(err_str)s"), log_info)
LOG.debug('Exception', exc_info=exc)
def _publish():
publisher.send(self, msg, timeout)
with self._connection_lock:
self.ensure(_publish, retry=retry, error_callback=_error_callback)
def stop_consuming(self):
self._consume_loop_stopped = True
def declare_direct_consumer(self, topic, callback):
"""Create a 'direct' queue.
@ -1045,54 +906,166 @@ class Connection(object):
self.declare_consumer(consumer)
def _ensure_publishing(self, method, exchange, msg, routing_key=None,
timeout=None, retry=None):
"""Send to a publisher based on the publisher class."""
def _error_callback(exc):
log_info = {'topic': exchange.name, 'err_str': exc}
LOG.error(_("Failed to publish message to topic "
"'%(topic)s': %(err_str)s"), log_info)
LOG.debug('Exception', exc_info=exc)
method = functools.partial(method, exchange, msg, routing_key, timeout)
with self._connection_lock:
self.ensure(method, retry=retry, error_callback=_error_callback)
def _publish(self, exchange, msg, routing_key=None, timeout=None):
"""Publish a message."""
producer = kombu.messaging.Producer(exchange=exchange,
channel=self.channel,
routing_key=routing_key)
headers = {}
if timeout:
# AMQP TTL is in milliseconds when set in the property.
# Details: http://www.rabbitmq.com/ttl.html#per-message-ttl
# NOTE(sileht): this amqp header doesn't exists ... LP#1444854
headers['ttl'] = timeout * 1000
# NOTE(sileht): no need to wait more, caller expects
# a answer before timeout is reached
transport_timeout = timeout
heartbeat_timeout = self.driver_conf.heartbeat_timeout_threshold
if (self._heartbeat_supported_and_enabled() and (
transport_timeout is None or
transport_timeout > heartbeat_timeout)):
# NOTE(sileht): we are supposed to send heartbeat every
# heartbeat_timeout, no need to wait more otherwise will
# disconnect us, so raise timeout earlier ourself
transport_timeout = heartbeat_timeout
with self._transport_socket_timeout(transport_timeout):
producer.publish(msg, headers=headers)
PUBLISHER_DECLARED_QUEUES = collections.defaultdict(set)
def _publish_and_creates_default_queue(self, exchange, msg,
routing_key=None, timeout=None):
"""Publisher that declares a default queue
When the exchange is missing instead of silency creates an exchange
not binded to a queue, this publisher creates a default queue
named with the routing_key
This is mainly used to not miss notification in case of nobody consumes
them yet. If the futur consumer bind the default queue it can retrieve
missing messages.
_set_current_channel is responsible to cleanup the cache.
"""
queue_indentifier = (exchange.name, routing_key)
# NOTE(sileht): We only do it once per reconnection
# the Connection._set_current_channel() is responsible to clear
# this cache
if (queue_indentifier not in
self.PUBLISHER_DECLARED_QUEUES[self.channel]):
queue = kombu.entity.Queue(
channel=self.channel,
exchange=exchange,
durable=exchange.durable,
auto_delete=exchange.auto_delete,
name=routing_key,
routing_key=routing_key,
queue_arguments=_get_queue_arguments(self.driver_conf))
queue.declare()
self.PUBLISHER_DECLARED_QUEUES[self.channel].add(queue_indentifier)
self._publish(exchange, msg, routing_key=routing_key, timeout=timeout)
def _publish_and_retry_on_missing_exchange(self, exchange, msg,
routing_key=None, timeout=None):
"""Publisher that retry during 60 seconds if the exchange is missing.
"""
if not exchange.passive:
RuntimeError("_publish_and_retry_on_missing_exchange() must be "
"called with an passive exchange.")
# TODO(sileht):
# * use timeout parameter when available
# * use rpc_timeout if not instead of hardcoded 60
# * use @retrying
timer = rpc_common.DecayingTimer(duration=60)
timer.start()
while True:
try:
self._publish(exchange, msg, routing_key=routing_key,
timeout=timeout)
return
except self.connection.channel_errors as exc:
# NOTE(noelbk/sileht):
# If rabbit dies, the consumer can be disconnected before the
# publisher sends, and if the consumer hasn't declared the
# queue, the publisher's will send a message to an exchange
# that's not bound to a queue, and the message wll be lost.
# So we set passive=True to the publisher exchange and catch
# the 404 kombu ChannelError and retry until the exchange
# appears
if exc.code == 404 and timer.check_return() > 0:
LOG.info(_LI("The exchange %(exchange)s to send to "
"%(routing_key)s doesn't exist yet, "
"retrying...") % {
'exchange': exchange.name,
'routing_key': routing_key})
time.sleep(1)
continue
raise
def direct_send(self, msg_id, msg):
"""Send a 'direct' message."""
exchange = kombu.entity.Exchange(name=msg_id,
type='direct',
durable=False,
auto_delete=True,
passive=True)
p = RetryOnMissingExchangePublisher(self.driver_conf,
exchange_name=msg_id,
routing_key=msg_id,
type='direct',
durable=False,
auto_delete=True)
self.publisher_send(p, msg)
self._ensure_publishing(self._publish_and_retry_on_missing_exchange,
exchange, msg, routing_key=msg_id)
def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
"""Send a 'topic' message."""
p = Publisher(self.driver_conf,
exchange_name=exchange_name,
routing_key=topic,
type='topic',
durable=self.driver_conf.amqp_durable_queues,
auto_delete=self.driver_conf.amqp_auto_delete)
self.publisher_send(p, msg, timeout, retry=retry)
def fanout_send(self, topic, msg, retry=None):
"""Send a 'fanout' message."""
p = Publisher(self.driver_conf,
exchange_name='%s_fanout' % topic,
routing_key=None,
type='fanout',
durable=False,
auto_delete=True)
self.publisher_send(p, msg, retry=retry)
def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs):
"""Send a notify message on a topic."""
p = DeclareQueuePublisher(
self.driver_conf,
exchange_name=exchange_name,
routing_key=topic,
exchange = kombu.entity.Exchange(
name=exchange_name,
type='topic',
durable=self.driver_conf.amqp_durable_queues,
auto_delete=self.driver_conf.amqp_auto_delete)
self.publisher_send(p, msg, timeout=None, retry=retry)
self._ensure_publishing(self._publish, exchange, msg,
routing_key=topic, retry=retry)
def stop_consuming(self):
self._consume_loop_stopped = True
def fanout_send(self, topic, msg, retry=None):
"""Send a 'fanout' message."""
exchange = kombu.entity.Exchange(name='%s_fanout' % topic,
type='fanout',
durable=False,
auto_delete=True)
self._ensure_publishing(self._publish, exchange, msg, retry=retry)
def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs):
"""Send a notify message on a topic."""
exchange = kombu.entity.Exchange(
name=exchange_name,
type='topic',
durable=self.driver_conf.amqp_durable_queues,
auto_delete=self.driver_conf.amqp_auto_delete)
self._ensure_publishing(self._publish_and_creates_default_queue,
exchange, msg, routing_key=topic, retry=retry)
class RabbitDriver(amqpdriver.AMQPDriverBase):

@ -173,10 +173,6 @@ class TestRabbitDriverLoadSSL(test_utils.BaseTestCase):
heartbeat=0, failover_strategy="shuffle")
class RaiseOnNoExchangePublisher(rabbit_driver.Publisher):
passive = True
class TestRabbitPublisher(test_utils.BaseTestCase):
def test_declared_queue_publisher(self):
@ -184,41 +180,40 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
'kombu+memory:////')
self.addCleanup(transport.cleanup)
p1 = RaiseOnNoExchangePublisher(
self.conf.oslo_messaging_rabbit,
exchange_name='foobar',
routing_key='foobar',
e_passive = kombu.entity.Exchange(
name='foobar',
type='topic',
durable=False,
auto_delete=False)
passive=True)
p2 = rabbit_driver.DeclareQueuePublisher(
self.conf.oslo_messaging_rabbit,
exchange_name='foobar',
routing_key='foobar',
e_active = kombu.entity.Exchange(
name='foobar',
type='topic',
durable=False,
auto_delete=False)
passive=False)
with transport._driver._get_connection(amqp.PURPOSE_SEND) as pool_conn:
conn = pool_conn.connection
exc = conn.connection.channel_errors[0]
# Ensure the exchange does not exists
self.assertRaises(exc, conn.publisher_send, p1, {})
# Creates it
conn.publisher_send(p2, {})
# Ensure it creates it
conn.publisher_send(p1, {})
with mock.patch('kombu.messaging.Producer',
side_effect=exc):
def try_send(exchange):
conn._ensure_publishing(
conn._publish_and_creates_default_queue,
exchange, {}, routing_key='foobar')
# Ensure the exchange does not exists
self.assertRaises(exc, try_send, e_passive)
# Create it
try_send(e_active)
# Ensure it creates it
try_send(e_passive)
with mock.patch('kombu.messaging.Producer', side_effect=exc):
# Shoud reset the cache and ensures the exchange does
# not exitsts
self.assertRaises(exc, conn.publisher_send, p1, {})
# Recreates it
conn.publisher_send(p2, {})
# not exists
self.assertRaises(exc, try_send, e_passive)
# Recreate it
try_send(e_active)
# Ensure it have been recreated
conn.publisher_send(p1, {})
try_send(e_passive)
class TestRabbitConsume(test_utils.BaseTestCase):