diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py index a91be1aad..7fd3d37fe 100644 --- a/oslo_messaging/_drivers/amqp.py +++ b/oslo_messaging/_drivers/amqp.py @@ -263,3 +263,7 @@ def _add_unique_id(msg): """Add unique_id for checking duplicate messages.""" unique_id = uuid.uuid4().hex msg.update({UNIQUE_ID: unique_id}) + + +class AMQPDestinationNotFound(Exception): + pass diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 3a1d9bbed..c1d0f52be 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -19,6 +19,7 @@ import logging import threading import uuid +import cachetools from six import moves import oslo_messaging @@ -27,13 +28,15 @@ from oslo_messaging._drivers import base from oslo_messaging._drivers import common as rpc_common from oslo_messaging._i18n import _ from oslo_messaging._i18n import _LI +from oslo_messaging._i18n import _LW LOG = logging.getLogger(__name__) class AMQPIncomingMessage(base.IncomingMessage): - def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q): + def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q, + obsolete_reply_queues): super(AMQPIncomingMessage, self).__init__(listener, ctxt, dict(message)) @@ -42,9 +45,15 @@ class AMQPIncomingMessage(base.IncomingMessage): self.reply_q = reply_q self.acknowledge_callback = message.acknowledge self.requeue_callback = message.requeue + self._obsolete_reply_queues = obsolete_reply_queues def _send_reply(self, conn, reply=None, failure=None, ending=False, log_failure=True): + if (self.reply_q and + not self._obsolete_reply_queues.reply_q_valid(self.reply_q, + self.msg_id)): + return + if failure: failure = rpc_common.serialize_remote_exception(failure, log_failure) @@ -60,8 +69,15 @@ class AMQPIncomingMessage(base.IncomingMessage): # Otherwise use the msg_id for backward compatibility. if self.reply_q: msg['_msg_id'] = self.msg_id - conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg)) + try: + conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg)) + except rpc_amqp.AMQPDestinationNotFound: + self._obsolete_reply_queues.add(self.reply_q, self.msg_id) else: + # TODO(sileht): look at which version of oslo-incubator rpc + # send need this, but I guess this is older than icehouse + # if this is icehouse, we can drop this at M + # if this is havana, we can drop this now. conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg)) def reply(self, reply=None, failure=None, log_failure=True): @@ -69,6 +85,13 @@ class AMQPIncomingMessage(base.IncomingMessage): # NOTE(Alexei_987) not sending reply, if msg_id is empty # because reply should not be expected by caller side return + + # NOTE(sileht): return without hold the a connection if possible + if (self.reply_q and + not self._obsolete_reply_queues.reply_q_valid(self.reply_q, + self.msg_id)): + return + with self.listener.driver._get_connection( rpc_amqp.PURPOSE_SEND) as conn: if self.listener.driver.send_single_reply: @@ -92,6 +115,51 @@ class AMQPIncomingMessage(base.IncomingMessage): self.requeue_callback() +class ObsoleteReplyQueuesCache(object): + """Cache of reply queue id that doesn't exists anymore. + + NOTE(sileht): In case of a broker restart/failover + a reply queue can be unreachable for short period + the IncomingMessage.send_reply will block for 60 seconds + in this case or until rabbit recovers. + + But in case of the reply queue is unreachable because the + rpc client is really gone, we can have a ton of reply to send + waiting 60 seconds. + This leads to a starvation of connection of the pool + The rpc server take to much time to send reply, other rpc client will + raise TimeoutError because their don't receive their replies in time. + + This object cache stores already known gone client to not wait 60 seconds + and hold a connection of the pool. + Keeping 200 last gone rpc client for 1 minute is enough + and doesn't hold to much memory. + """ + + SIZE = 200 + TTL = 60 + + def __init__(self): + self._lock = threading.RLock() + self._cache = cachetools.TTLCache(self.SIZE, self.TTL) + + def reply_q_valid(self, reply_q, msg_id): + if reply_q in self._cache: + self._no_reply_log(reply_q, msg_id) + return False + return True + + def add(self, reply_q, msg_id): + with self._lock: + self._cache.update({reply_q: msg_id}) + self._no_reply_log(reply_q, msg_id) + + def _no_reply_log(self, reply_q, msg_id): + LOG.warn(_LW("%(reply_queue)s doesn't exists, drop reply to " + "%(msg_id)s"), {'reply_queue': reply_q, + 'msg_id': msg_id}) + + class AMQPListener(base.Listener): def __init__(self, driver, conn): @@ -100,6 +168,7 @@ class AMQPListener(base.Listener): self.msg_id_cache = rpc_amqp._MsgIdCache() self.incoming = [] self._stopped = threading.Event() + self._obsolete_reply_queues = ObsoleteReplyQueuesCache() def __call__(self, message): ctxt = rpc_amqp.unpack_context(self.conf, message) @@ -116,7 +185,8 @@ class AMQPListener(base.Listener): message, unique_id, ctxt.msg_id, - ctxt.reply_q)) + ctxt.reply_q, + self._obsolete_reply_queues)) def poll(self, timeout=None): while not self._stopped.is_set(): diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index aa6dc3643..2f3fb7146 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -1081,8 +1081,17 @@ class Connection(object): "retrying...") % { 'exchange': exchange.name, 'routing_key': routing_key}) - time.sleep(1) + time.sleep(0.25) continue + elif exc.code == 404: + msg = _("The exchange %(exchange)s to send to " + "%(routing_key)s still doesn't exist after " + "%(duration)s sec abandonning...") % { + 'duration': duration, + 'exchange': exchange.name, + 'routing_key': routing_key} + LOG.info(msg) + raise rpc_amqp.AMQPDestinationNotFound(msg) raise def direct_send(self, msg_id, msg): diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index 9b22a1a80..ba5b8d307 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -355,6 +355,11 @@ class TestSendReceive(test_utils.BaseTestCase): ('zero', dict(rx_id=False, reply=0)), ] + _reply_fail = [ + ('reply_success', dict(reply_failure_404=False)), + ('reply_failure', dict(reply_failure_404=True)), + ] + _failure = [ ('success', dict(failure=False)), ('failure', dict(failure=True, expected=False)), @@ -376,11 +381,14 @@ class TestSendReceive(test_utils.BaseTestCase): cls.scenarios = testscenarios.multiply_scenarios(cls._n_senders, cls._context, cls._reply, + cls._reply_fail, cls._failure, cls._timeout, cls._reply_ending) def test_send_receive(self): + self.config(kombu_reconnect_timeout=0.5, + group="oslo_messaging_rabbit") self.config(heartbeat_timeout_threshold=0, group="oslo_messaging_rabbit") self.config(send_single_reply=self.send_single_reply, @@ -409,16 +417,21 @@ class TestSendReceive(test_utils.BaseTestCase): def send_and_wait_for_reply(i): try: + if self.reply_failure_404: + timeout = 0.01 + else: + timeout = self.timeout replies.append(driver.send(target, self.ctxt, {'tx_id': i}, wait_for_reply=True, - timeout=self.timeout)) + timeout=timeout)) self.assertFalse(self.failure) self.assertIsNone(self.timeout) except (ZeroDivisionError, oslo_messaging.MessagingTimeout) as e: replies.append(e) - self.assertTrue(self.failure or self.timeout is not None) + self.assertTrue(self.failure or self.timeout is not None + or self.reply_failure_404) while len(senders) < self.n_senders: senders.append(threading.Thread(target=send_and_wait_for_reply, @@ -438,6 +451,18 @@ class TestSendReceive(test_utils.BaseTestCase): if len(order) > 1: order[-1], order[-2] = order[-2], order[-1] + if self.reply_failure_404: + start = time.time() + # NOTE(sileht): Simulate a rpc client restart + # By returning a ExchangeNotFound when we try to + # send reply + exc = (driver._reply_q_conn.connection. + connection.channel_errors[0]()) + exc.code = 404 + self.useFixture(mockpatch.Patch( + 'kombu.messaging.Producer.publish', + side_effect=exc)) + for i in order: if self.timeout is None: if self.failure: @@ -451,11 +476,19 @@ class TestSendReceive(test_utils.BaseTestCase): msgs[i].reply({'rx_id': i}) else: msgs[i].reply(self.reply) + elif self.reply_failure_404: + msgs[i].reply({}) senders[i].join() + if self.reply_failure_404: + # NOTE(sileht) all reply fail, first take + # kombu_reconnect_timeout seconds to fail + # next immediatly fail + self.assertAlmostEqual(0.5, time.time() - start, 1) + self.assertEqual(len(senders), len(replies)) for i, reply in enumerate(replies): - if self.timeout is not None: + if self.timeout is not None or self.reply_failure_404: self.assertIsInstance(reply, oslo_messaging.MessagingTimeout) elif self.failure: self.assertIsInstance(reply, ZeroDivisionError) diff --git a/requirements.txt b/requirements.txt index d3d545241..9595f6e3f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,6 +13,7 @@ stevedore>=1.5.0 # Apache-2.0 # for jsonutils six>=1.9.0 +cachetools>=1.0.0 # MIT License # FIXME(markmc): remove this when the drivers no longer # import eventlet