diff --git a/doc/source/admin/rabbit.rst b/doc/source/admin/rabbit.rst index 142bdf70e..bf91e47c5 100644 --- a/doc/source/admin/rabbit.rst +++ b/doc/source/admin/rabbit.rst @@ -240,6 +240,7 @@ Consuming Options ^^^^^^^^^^^^^^^^^ - :oslo.config:option:`oslo_messaging_rabbit.rabbit_ha_queues` +- :oslo.config:option:`oslo_messaging_rabbit.rabbit_quorum_queue` - :oslo.config:option:`oslo_messaging_rabbit.rabbit_transient_queues_ttl` Connection Options diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py index d4db2c6b8..b0c9551d7 100644 --- a/oslo_messaging/_drivers/amqp.py +++ b/oslo_messaging/_drivers/amqp.py @@ -32,7 +32,9 @@ from oslo_messaging._drivers import common as rpc_common amqp_opts = [ cfg.BoolOpt('amqp_durable_queues', default=False, - help='Use durable queues in AMQP.'), + help='Use durable queues in AMQP. If rabbit_quorum_queue ' + 'is enabled, queues will be durable and this value will ' + 'be ignored.'), cfg.BoolOpt('amqp_auto_delete', default=False, deprecated_group='DEFAULT', diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 9d99822d5..c553dcca5 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -149,6 +149,17 @@ rabbit_opts = [ 'nodes, run: ' """\"rabbitmqctl set_policy HA '^(?!amq\\.).*' """ """'{"ha-mode": "all"}' \""""), + cfg.BoolOpt('rabbit_quorum_queue', + default=False, + help='Use quorum queues in RabbitMQ (x-queue-type: quorum). ' + 'The quorum queue is a modern queue type for RabbitMQ ' + 'implementing a durable, replicated FIFO queue based on the ' + 'Raft consensus algorithm. It is available as of ' + 'RabbitMQ 3.8.0. If set this option will conflict with ' + 'the HA queues (``rabbit_ha_queues``) aka mirrored queues, ' + 'in other words the HA queues should be disabled, quorum ' + 'queues durable by default so the amqp_durable_queues ' + 'opion is ignored when this option enabled.'), cfg.IntOpt('rabbit_transient_queues_ttl', min=1, default=1800, @@ -191,7 +202,8 @@ rabbit_opts = [ LOG = logging.getLogger(__name__) -def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl): +def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl, + rabbit_quorum_queue): """Construct the arguments for declaring a queue. If the rabbit_ha_queues option is set, we try to declare a mirrored queue @@ -214,12 +226,31 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl): Setting a queue TTL causes the queue to be automatically deleted if it is unused for the TTL duration. This is a helpful safeguard to prevent queues with zero consumers from growing without bound. + + If the rabbit_quorum_queue option is set, we try to declare a mirrored + queue as described here: + + https://www.rabbitmq.com/quorum-queues.html + + Setting x-queue-type to quorum means that replicated FIFO queue based on + the Raft consensus algorithm will be used. It is available as of + RabbitMQ 3.8.0. If set this option will conflict with + the HA queues (``rabbit_ha_queues``) aka mirrored queues, + in other words HA queues should be disabled. """ args = {} + if rabbit_quorum_queue and rabbit_ha_queues: + raise RuntimeError('Configuration Error: rabbit_quorum_queue ' + 'and rabbit_ha_queues both enabled, queue ' + 'type is quorum or HA (mirrored) not both') + if rabbit_ha_queues: args['x-ha-policy'] = 'all' + if rabbit_quorum_queue: + args['x-queue-type'] = 'quorum' + if rabbit_queue_ttl > 0: args['x-expires'] = rabbit_queue_ttl * 1000 @@ -248,7 +279,7 @@ class Consumer(object): def __init__(self, exchange_name, queue_name, routing_key, type, durable, exchange_auto_delete, queue_auto_delete, callback, nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0, - enable_cancel_on_failover=False): + enable_cancel_on_failover=False, rabbit_quorum_queue=False): """Init the Consumer class with the exchange_name, routing_key, type, durable auto_delete """ @@ -262,7 +293,8 @@ class Consumer(object): self.type = type self.nowait = nowait self.queue_arguments = _get_queue_arguments(rabbit_ha_queues, - rabbit_queue_ttl) + rabbit_queue_ttl, + rabbit_quorum_queue) self.queue = None self._declared_on = None self.exchange = kombu.entity.Exchange( @@ -475,6 +507,7 @@ class Connection(object): self.login_method = driver_conf.rabbit_login_method self.rabbit_ha_queues = driver_conf.rabbit_ha_queues + self.rabbit_quorum_queue = driver_conf.rabbit_quorum_queue self.rabbit_transient_queues_ttl = \ driver_conf.rabbit_transient_queues_ttl self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count @@ -674,6 +707,12 @@ class Connection(object): except AttributeError: pass + @property + def durable(self): + # Quorum queues are durable by default, durable option should + # be enabled by default with quorum queues + return self.amqp_durable_queues or self.rabbit_quorum_queue + @classmethod def validate_ssl_version(cls, version): key = version.lower() @@ -1163,12 +1202,13 @@ class Connection(object): queue_name=queue_name or topic, routing_key=topic, type='topic', - durable=self.amqp_durable_queues, + durable=self.durable, exchange_auto_delete=self.amqp_auto_delete, queue_auto_delete=self.amqp_auto_delete, callback=callback, rabbit_ha_queues=self.rabbit_ha_queues, - enable_cancel_on_failover=self.enable_cancel_on_failover) + enable_cancel_on_failover=self.enable_cancel_on_failover, + rabbit_quorum_queue=self.rabbit_quorum_queue) self.declare_consumer(consumer) @@ -1280,7 +1320,10 @@ class Connection(object): auto_delete=exchange.auto_delete, name=routing_key, routing_key=routing_key, - queue_arguments=_get_queue_arguments(self.rabbit_ha_queues, 0)) + queue_arguments=_get_queue_arguments( + self.rabbit_ha_queues, + 0, + self.rabbit_quorum_queue)) log_info = {'key': routing_key, 'exchange': exchange} LOG.trace( 'Connection._publish_and_creates_default_queue: ' @@ -1336,7 +1379,7 @@ class Connection(object): exchange = kombu.entity.Exchange( name=exchange_name, type='topic', - durable=self.amqp_durable_queues, + durable=self.durable, auto_delete=self.amqp_auto_delete) self._ensure_publishing(self._publish, exchange, msg, @@ -1358,7 +1401,7 @@ class Connection(object): exchange = kombu.entity.Exchange( name=exchange_name, type='topic', - durable=self.amqp_durable_queues, + durable=self.durable, auto_delete=self.amqp_auto_delete) self._ensure_publishing(self._publish_and_creates_default_queue, diff --git a/releasenotes/notes/adding_support_for_quorum_queues-3101d055b492289e.yaml b/releasenotes/notes/adding_support_for_quorum_queues-3101d055b492289e.yaml new file mode 100644 index 000000000..a88c5d852 --- /dev/null +++ b/releasenotes/notes/adding_support_for_quorum_queues-3101d055b492289e.yaml @@ -0,0 +1,11 @@ +--- +features: + - | + Adding support for quorum queues. Quorum queues are enabled if the + ``rabbit_quorum_queue`` parameter is sets (``x-queue-type: quorum``). + Setting x-queue-type to quorum means that replicated FIFO queue based on + the Raft consensus algorithm will be used. It is available as of + RabbitMQ 3.8.0. The quorum queues are durable by default + (``amqp_durable_queues``) will be ignored. + when enabled the HA queues (``rabbit_ha_queues``) aka mirrored queues + should be disabled since the queue can't be both types at the same time