Merge "Allow to remove second _send_reply() call"

This commit is contained in:
Jenkins 2015-05-30 00:11:05 +00:00 committed by Gerrit Code Review
commit 27efb36b87
4 changed files with 36 additions and 11 deletions

@ -49,6 +49,18 @@ amqp_opts = [
default=False,
deprecated_group='DEFAULT',
help='Auto-delete queues in AMQP.'),
cfg.BoolOpt('send_single_reply',
default=False,
help='Send a single AMQP reply to call message. The current '
'behaviour since oslo-incubator is to send two AMQP '
'replies - first one with the payload, a second one to '
'ensure the other have finish to send the payload. We '
'are going to remove it in the N release, but we must '
'keep backward compatible at the same time. This option '
'provides such compatibility - it defaults to False in '
'Liberty and can be turned on for early adopters with a '
'new installations or for testing. Please note, that '
'this option will be removed in M release.')
]
UNIQUE_ID = '_unique_id'

@ -71,8 +71,12 @@ class AMQPIncomingMessage(base.IncomingMessage):
return
with self.listener.driver._get_connection(
rpc_amqp.PURPOSE_SEND) as conn:
self._send_reply(conn, reply, failure, log_failure=log_failure)
self._send_reply(conn, ending=True)
if self.listener.driver.send_single_reply:
self._send_reply(conn, reply, failure, log_failure=log_failure,
ending=True)
else:
self._send_reply(conn, reply, failure, log_failure=log_failure)
self._send_reply(conn, ending=True)
def acknowledge(self):
self.listener.msg_id_cache.add(self.unique_id)
@ -257,7 +261,8 @@ class ReplyWaiter(object):
class AMQPDriverBase(base.BaseDriver):
def __init__(self, conf, url, connection_pool,
default_exchange=None, allowed_remote_exmods=None):
default_exchange=None, allowed_remote_exmods=None,
send_single_reply=False):
super(AMQPDriverBase, self).__init__(conf, url, default_exchange,
allowed_remote_exmods)
@ -270,6 +275,8 @@ class AMQPDriverBase(base.BaseDriver):
self._reply_q_conn = None
self._waiter = None
self.send_single_reply = send_single_reply
def _get_exchange(self, target):
return target.exchange or self._default_exchange

@ -778,7 +778,10 @@ class QpidDriver(amqpdriver.AMQPDriverBase):
conf, conf.oslo_messaging_qpid.rpc_conn_pool_size,
url, Connection)
super(QpidDriver, self).__init__(conf, url,
connection_pool,
default_exchange,
allowed_remote_exmods)
super(QpidDriver, self).__init__(
conf, url,
connection_pool,
default_exchange,
allowed_remote_exmods,
conf.oslo_messaging_qpid.send_single_reply,
)

@ -1094,10 +1094,13 @@ class RabbitDriver(amqpdriver.AMQPDriverBase):
conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size,
url, Connection)
super(RabbitDriver, self).__init__(conf, url,
connection_pool,
default_exchange,
allowed_remote_exmods)
super(RabbitDriver, self).__init__(
conf, url,
connection_pool,
default_exchange,
allowed_remote_exmods,
conf.oslo_messaging_rabbit.send_single_reply,
)
def require_features(self, requeue=True):
pass