Merge "Don't reply when we known that client is gone"
This commit is contained in:
commit
b48487e65b
@ -263,3 +263,7 @@ def _add_unique_id(msg):
|
||||
"""Add unique_id for checking duplicate messages."""
|
||||
unique_id = uuid.uuid4().hex
|
||||
msg.update({UNIQUE_ID: unique_id})
|
||||
|
||||
|
||||
class AMQPDestinationNotFound(Exception):
|
||||
pass
|
||||
|
@ -19,6 +19,7 @@ import logging
|
||||
import threading
|
||||
import uuid
|
||||
|
||||
import cachetools
|
||||
from six import moves
|
||||
|
||||
import oslo_messaging
|
||||
@ -27,13 +28,15 @@ from oslo_messaging._drivers import base
|
||||
from oslo_messaging._drivers import common as rpc_common
|
||||
from oslo_messaging._i18n import _
|
||||
from oslo_messaging._i18n import _LI
|
||||
from oslo_messaging._i18n import _LW
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AMQPIncomingMessage(base.IncomingMessage):
|
||||
|
||||
def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q):
|
||||
def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q,
|
||||
obsolete_reply_queues):
|
||||
super(AMQPIncomingMessage, self).__init__(listener, ctxt,
|
||||
dict(message))
|
||||
|
||||
@ -42,9 +45,15 @@ class AMQPIncomingMessage(base.IncomingMessage):
|
||||
self.reply_q = reply_q
|
||||
self.acknowledge_callback = message.acknowledge
|
||||
self.requeue_callback = message.requeue
|
||||
self._obsolete_reply_queues = obsolete_reply_queues
|
||||
|
||||
def _send_reply(self, conn, reply=None, failure=None,
|
||||
ending=False, log_failure=True):
|
||||
if (self.reply_q and
|
||||
not self._obsolete_reply_queues.reply_q_valid(self.reply_q,
|
||||
self.msg_id)):
|
||||
return
|
||||
|
||||
if failure:
|
||||
failure = rpc_common.serialize_remote_exception(failure,
|
||||
log_failure)
|
||||
@ -60,8 +69,15 @@ class AMQPIncomingMessage(base.IncomingMessage):
|
||||
# Otherwise use the msg_id for backward compatibility.
|
||||
if self.reply_q:
|
||||
msg['_msg_id'] = self.msg_id
|
||||
conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
|
||||
try:
|
||||
conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
|
||||
except rpc_amqp.AMQPDestinationNotFound:
|
||||
self._obsolete_reply_queues.add(self.reply_q, self.msg_id)
|
||||
else:
|
||||
# TODO(sileht): look at which version of oslo-incubator rpc
|
||||
# send need this, but I guess this is older than icehouse
|
||||
# if this is icehouse, we can drop this at M
|
||||
# if this is havana, we can drop this now.
|
||||
conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg))
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
@ -69,6 +85,13 @@ class AMQPIncomingMessage(base.IncomingMessage):
|
||||
# NOTE(Alexei_987) not sending reply, if msg_id is empty
|
||||
# because reply should not be expected by caller side
|
||||
return
|
||||
|
||||
# NOTE(sileht): return without hold the a connection if possible
|
||||
if (self.reply_q and
|
||||
not self._obsolete_reply_queues.reply_q_valid(self.reply_q,
|
||||
self.msg_id)):
|
||||
return
|
||||
|
||||
with self.listener.driver._get_connection(
|
||||
rpc_amqp.PURPOSE_SEND) as conn:
|
||||
if self.listener.driver.send_single_reply:
|
||||
@ -92,6 +115,51 @@ class AMQPIncomingMessage(base.IncomingMessage):
|
||||
self.requeue_callback()
|
||||
|
||||
|
||||
class ObsoleteReplyQueuesCache(object):
|
||||
"""Cache of reply queue id that doesn't exists anymore.
|
||||
|
||||
NOTE(sileht): In case of a broker restart/failover
|
||||
a reply queue can be unreachable for short period
|
||||
the IncomingMessage.send_reply will block for 60 seconds
|
||||
in this case or until rabbit recovers.
|
||||
|
||||
But in case of the reply queue is unreachable because the
|
||||
rpc client is really gone, we can have a ton of reply to send
|
||||
waiting 60 seconds.
|
||||
This leads to a starvation of connection of the pool
|
||||
The rpc server take to much time to send reply, other rpc client will
|
||||
raise TimeoutError because their don't receive their replies in time.
|
||||
|
||||
This object cache stores already known gone client to not wait 60 seconds
|
||||
and hold a connection of the pool.
|
||||
Keeping 200 last gone rpc client for 1 minute is enough
|
||||
and doesn't hold to much memory.
|
||||
"""
|
||||
|
||||
SIZE = 200
|
||||
TTL = 60
|
||||
|
||||
def __init__(self):
|
||||
self._lock = threading.RLock()
|
||||
self._cache = cachetools.TTLCache(self.SIZE, self.TTL)
|
||||
|
||||
def reply_q_valid(self, reply_q, msg_id):
|
||||
if reply_q in self._cache:
|
||||
self._no_reply_log(reply_q, msg_id)
|
||||
return False
|
||||
return True
|
||||
|
||||
def add(self, reply_q, msg_id):
|
||||
with self._lock:
|
||||
self._cache.update({reply_q: msg_id})
|
||||
self._no_reply_log(reply_q, msg_id)
|
||||
|
||||
def _no_reply_log(self, reply_q, msg_id):
|
||||
LOG.warn(_LW("%(reply_queue)s doesn't exists, drop reply to "
|
||||
"%(msg_id)s"), {'reply_queue': reply_q,
|
||||
'msg_id': msg_id})
|
||||
|
||||
|
||||
class AMQPListener(base.Listener):
|
||||
|
||||
def __init__(self, driver, conn):
|
||||
@ -100,6 +168,7 @@ class AMQPListener(base.Listener):
|
||||
self.msg_id_cache = rpc_amqp._MsgIdCache()
|
||||
self.incoming = []
|
||||
self._stopped = threading.Event()
|
||||
self._obsolete_reply_queues = ObsoleteReplyQueuesCache()
|
||||
|
||||
def __call__(self, message):
|
||||
ctxt = rpc_amqp.unpack_context(self.conf, message)
|
||||
@ -116,7 +185,8 @@ class AMQPListener(base.Listener):
|
||||
message,
|
||||
unique_id,
|
||||
ctxt.msg_id,
|
||||
ctxt.reply_q))
|
||||
ctxt.reply_q,
|
||||
self._obsolete_reply_queues))
|
||||
|
||||
def poll(self, timeout=None):
|
||||
while not self._stopped.is_set():
|
||||
|
@ -1081,8 +1081,17 @@ class Connection(object):
|
||||
"retrying...") % {
|
||||
'exchange': exchange.name,
|
||||
'routing_key': routing_key})
|
||||
time.sleep(1)
|
||||
time.sleep(0.25)
|
||||
continue
|
||||
elif exc.code == 404:
|
||||
msg = _("The exchange %(exchange)s to send to "
|
||||
"%(routing_key)s still doesn't exist after "
|
||||
"%(duration)s sec abandonning...") % {
|
||||
'duration': duration,
|
||||
'exchange': exchange.name,
|
||||
'routing_key': routing_key}
|
||||
LOG.info(msg)
|
||||
raise rpc_amqp.AMQPDestinationNotFound(msg)
|
||||
raise
|
||||
|
||||
def direct_send(self, msg_id, msg):
|
||||
|
@ -351,6 +351,11 @@ class TestSendReceive(test_utils.BaseTestCase):
|
||||
('zero', dict(rx_id=False, reply=0)),
|
||||
]
|
||||
|
||||
_reply_fail = [
|
||||
('reply_success', dict(reply_failure_404=False)),
|
||||
('reply_failure', dict(reply_failure_404=True)),
|
||||
]
|
||||
|
||||
_failure = [
|
||||
('success', dict(failure=False)),
|
||||
('failure', dict(failure=True, expected=False)),
|
||||
@ -372,11 +377,14 @@ class TestSendReceive(test_utils.BaseTestCase):
|
||||
cls.scenarios = testscenarios.multiply_scenarios(cls._n_senders,
|
||||
cls._context,
|
||||
cls._reply,
|
||||
cls._reply_fail,
|
||||
cls._failure,
|
||||
cls._timeout,
|
||||
cls._reply_ending)
|
||||
|
||||
def test_send_receive(self):
|
||||
self.config(kombu_reconnect_timeout=0.5,
|
||||
group="oslo_messaging_rabbit")
|
||||
self.config(heartbeat_timeout_threshold=0,
|
||||
group="oslo_messaging_rabbit")
|
||||
self.config(send_single_reply=self.send_single_reply,
|
||||
@ -405,16 +413,21 @@ class TestSendReceive(test_utils.BaseTestCase):
|
||||
|
||||
def send_and_wait_for_reply(i):
|
||||
try:
|
||||
if self.reply_failure_404:
|
||||
timeout = 0.01
|
||||
else:
|
||||
timeout = self.timeout
|
||||
replies.append(driver.send(target,
|
||||
self.ctxt,
|
||||
{'tx_id': i},
|
||||
wait_for_reply=True,
|
||||
timeout=self.timeout))
|
||||
timeout=timeout))
|
||||
self.assertFalse(self.failure)
|
||||
self.assertIsNone(self.timeout)
|
||||
except (ZeroDivisionError, oslo_messaging.MessagingTimeout) as e:
|
||||
replies.append(e)
|
||||
self.assertTrue(self.failure or self.timeout is not None)
|
||||
self.assertTrue(self.failure or self.timeout is not None
|
||||
or self.reply_failure_404)
|
||||
|
||||
while len(senders) < self.n_senders:
|
||||
senders.append(threading.Thread(target=send_and_wait_for_reply,
|
||||
@ -434,6 +447,18 @@ class TestSendReceive(test_utils.BaseTestCase):
|
||||
if len(order) > 1:
|
||||
order[-1], order[-2] = order[-2], order[-1]
|
||||
|
||||
if self.reply_failure_404:
|
||||
start = time.time()
|
||||
# NOTE(sileht): Simulate a rpc client restart
|
||||
# By returning a ExchangeNotFound when we try to
|
||||
# send reply
|
||||
exc = (driver._reply_q_conn.connection.
|
||||
connection.channel_errors[0]())
|
||||
exc.code = 404
|
||||
self.useFixture(mockpatch.Patch(
|
||||
'kombu.messaging.Producer.publish',
|
||||
side_effect=exc))
|
||||
|
||||
for i in order:
|
||||
if self.timeout is None:
|
||||
if self.failure:
|
||||
@ -447,11 +472,19 @@ class TestSendReceive(test_utils.BaseTestCase):
|
||||
msgs[i].reply({'rx_id': i})
|
||||
else:
|
||||
msgs[i].reply(self.reply)
|
||||
elif self.reply_failure_404:
|
||||
msgs[i].reply({})
|
||||
senders[i].join()
|
||||
|
||||
if self.reply_failure_404:
|
||||
# NOTE(sileht) all reply fail, first take
|
||||
# kombu_reconnect_timeout seconds to fail
|
||||
# next immediatly fail
|
||||
self.assertAlmostEqual(0.5, time.time() - start, 1)
|
||||
|
||||
self.assertEqual(len(senders), len(replies))
|
||||
for i, reply in enumerate(replies):
|
||||
if self.timeout is not None:
|
||||
if self.timeout is not None or self.reply_failure_404:
|
||||
self.assertIsInstance(reply, oslo_messaging.MessagingTimeout)
|
||||
elif self.failure:
|
||||
self.assertIsInstance(reply, ZeroDivisionError)
|
||||
|
@ -16,6 +16,7 @@ stevedore>=1.5.0 # Apache-2.0
|
||||
|
||||
# for jsonutils
|
||||
six>=1.9.0
|
||||
cachetools>=1.0.0 # MIT License
|
||||
|
||||
# FIXME(markmc): remove this when the drivers no longer
|
||||
# import eventlet
|
||||
|
Loading…
x
Reference in New Issue
Block a user