Merge "Force creating non durable control exchange when a precondition failed"
This commit is contained in:
commit
b3c666ff34
@ -349,11 +349,44 @@ class Consumer(object):
|
|||||||
self._declared_on = None
|
self._declared_on = None
|
||||||
self.exchange = kombu.entity.Exchange(
|
self.exchange = kombu.entity.Exchange(
|
||||||
name=exchange_name,
|
name=exchange_name,
|
||||||
type=type,
|
type=self.type,
|
||||||
durable=self.durable,
|
durable=self.durable,
|
||||||
auto_delete=self.exchange_auto_delete)
|
auto_delete=self.exchange_auto_delete)
|
||||||
self.enable_cancel_on_failover = enable_cancel_on_failover
|
self.enable_cancel_on_failover = enable_cancel_on_failover
|
||||||
|
|
||||||
|
def _declare_fallback(self, err, conn, consumer_arguments):
|
||||||
|
"""Fallback by declaring a non durable queue.
|
||||||
|
|
||||||
|
When a control exchange is shared between services it is possible
|
||||||
|
that some service created first a non durable control exchange and
|
||||||
|
then after that an other service can try to create the same control
|
||||||
|
exchange but as a durable control exchange. In this case RabbitMQ
|
||||||
|
will raise an exception (PreconditionFailed), and then it will stop
|
||||||
|
our execution and our service will fail entirly. In this case we want
|
||||||
|
to fallback by creating a non durable queue to match the default
|
||||||
|
config.
|
||||||
|
"""
|
||||||
|
if "PRECONDITION_FAILED - inequivalent arg 'durable'" in str(err):
|
||||||
|
LOG.info(
|
||||||
|
"[%s] Retrying to declare the exchange (%s) as "
|
||||||
|
"non durable", conn.connection_id, self.exchange_name)
|
||||||
|
self.exchange = kombu.entity.Exchange(
|
||||||
|
name=self.exchange_name,
|
||||||
|
type=self.type,
|
||||||
|
durable=False,
|
||||||
|
auto_delete=self.queue_auto_delete)
|
||||||
|
self.queue = kombu.entity.Queue(
|
||||||
|
name=self.queue_name,
|
||||||
|
channel=conn.channel,
|
||||||
|
exchange=self.exchange,
|
||||||
|
durable=False,
|
||||||
|
auto_delete=self.queue_auto_delete,
|
||||||
|
routing_key=self.routing_key,
|
||||||
|
queue_arguments=self.queue_arguments,
|
||||||
|
consumer_arguments=consumer_arguments
|
||||||
|
)
|
||||||
|
self.queue.declare()
|
||||||
|
|
||||||
def declare(self, conn):
|
def declare(self, conn):
|
||||||
"""Re-declare the queue after a rabbit (re)connect."""
|
"""Re-declare the queue after a rabbit (re)connect."""
|
||||||
|
|
||||||
@ -376,7 +409,18 @@ class Consumer(object):
|
|||||||
try:
|
try:
|
||||||
LOG.debug('[%s] Queue.declare: %s',
|
LOG.debug('[%s] Queue.declare: %s',
|
||||||
conn.connection_id, self.queue_name)
|
conn.connection_id, self.queue_name)
|
||||||
|
try:
|
||||||
self.queue.declare()
|
self.queue.declare()
|
||||||
|
except amqp_exec.PreconditionFailed as err:
|
||||||
|
# NOTE(hberaud): This kind of exception may be triggered
|
||||||
|
# when a control exchange is shared between services and
|
||||||
|
# when services try to create it with configs that differ
|
||||||
|
# from each others. RabbitMQ will reject the services
|
||||||
|
# that try to create it with a configuration that differ
|
||||||
|
# from the one used first.
|
||||||
|
LOG.warning(err)
|
||||||
|
self._declare_fallback(err, conn, consumer_arguments)
|
||||||
|
|
||||||
except conn.connection.channel_errors as exc:
|
except conn.connection.channel_errors as exc:
|
||||||
# NOTE(jrosenboom): This exception may be triggered by a race
|
# NOTE(jrosenboom): This exception may be triggered by a race
|
||||||
# condition. Simply retrying will solve the error most of the time
|
# condition. Simply retrying will solve the error most of the time
|
||||||
@ -1354,6 +1398,18 @@ class Connection(object):
|
|||||||
"""Publish a message."""
|
"""Publish a message."""
|
||||||
|
|
||||||
if not (exchange.passive or exchange.name in self._declared_exchanges):
|
if not (exchange.passive or exchange.name in self._declared_exchanges):
|
||||||
|
try:
|
||||||
|
exchange(self.channel).declare()
|
||||||
|
except amqp_exec.PreconditionFailed as err:
|
||||||
|
# NOTE(hberaud): This kind of exception may be triggered
|
||||||
|
# when a control exchange is shared between services and
|
||||||
|
# when services try to create it with configs that differ
|
||||||
|
# from each others. RabbitMQ will reject the services
|
||||||
|
# that try to create it with a configuration that differ
|
||||||
|
# from the one used first.
|
||||||
|
if "PRECONDITION_FAILED - inequivalent arg 'durable'" \
|
||||||
|
in str(err):
|
||||||
|
exchange.durable = False
|
||||||
exchange(self.channel).declare()
|
exchange(self.channel).declare()
|
||||||
self._declared_exchanges.add(exchange.name)
|
self._declared_exchanges.add(exchange.name)
|
||||||
|
|
||||||
|
@ -0,0 +1,5 @@
|
|||||||
|
---
|
||||||
|
fixes:
|
||||||
|
- |
|
||||||
|
Force creating non durable control exchange when a precondition failed
|
||||||
|
related to config that differ occuring.
|
Loading…
Reference in New Issue
Block a user