Merge "Handle unexpected failures during call monitor heartbeat"
This commit is contained in:
commit
78777a92b8
@ -30,6 +30,7 @@ from oslo_messaging._i18n import _
|
|||||||
from oslo_messaging._i18n import _LE
|
from oslo_messaging._i18n import _LE
|
||||||
from oslo_messaging._i18n import _LI
|
from oslo_messaging._i18n import _LI
|
||||||
from oslo_messaging._i18n import _LW
|
from oslo_messaging._i18n import _LW
|
||||||
|
from oslo_messaging import MessageDeliveryFailure
|
||||||
|
|
||||||
__all__ = ['AMQPDriverBase']
|
__all__ = ['AMQPDriverBase']
|
||||||
|
|
||||||
@ -168,9 +169,15 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
|
|||||||
return
|
return
|
||||||
|
|
||||||
def heartbeat(self):
|
def heartbeat(self):
|
||||||
|
# generate a keep alive for RPC call monitoring
|
||||||
with self.listener.driver._get_connection(
|
with self.listener.driver._get_connection(
|
||||||
rpc_common.PURPOSE_SEND) as conn:
|
rpc_common.PURPOSE_SEND) as conn:
|
||||||
|
try:
|
||||||
self._send_reply(conn, None, None, ending=False)
|
self._send_reply(conn, None, None, ending=False)
|
||||||
|
except rpc_amqp.AMQPDestinationNotFound:
|
||||||
|
# internal exception that indicates queue/exchange gone -
|
||||||
|
# broker unreachable.
|
||||||
|
raise MessageDeliveryFailure("Heartbeat send failed")
|
||||||
|
|
||||||
# NOTE(sileht): Those have already be ack in RpcListener IO thread
|
# NOTE(sileht): Those have already be ack in RpcListener IO thread
|
||||||
# We keep them as noop until all drivers do the same
|
# We keep them as noop until all drivers do the same
|
||||||
|
@ -218,7 +218,16 @@ class RPCDispatcher(dispatcher.DispatcherBase):
|
|||||||
'(interval=%(interval)i)' % (
|
'(interval=%(interval)i)' % (
|
||||||
{'method': incoming.message.get('method'),
|
{'method': incoming.message.get('method'),
|
||||||
'interval': cm_heartbeat_interval}))
|
'interval': cm_heartbeat_interval}))
|
||||||
|
try:
|
||||||
incoming.heartbeat()
|
incoming.heartbeat()
|
||||||
|
except Exception as exc:
|
||||||
|
# The heartbeat message failed to send. Likely the broker or
|
||||||
|
# client has died. Nothing to do here but exit the watchdog
|
||||||
|
# thread. If the client is still alive (dead broker) then its
|
||||||
|
# RPC will timeout as expected.
|
||||||
|
LOG.debug("Call-monitor heartbeat failed: %(exc)s"
|
||||||
|
% ({'exc': exc}))
|
||||||
|
break
|
||||||
|
|
||||||
def dispatch(self, incoming):
|
def dispatch(self, incoming):
|
||||||
"""Dispatch an RPC message to the appropriate endpoint method.
|
"""Dispatch an RPC message to the appropriate endpoint method.
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import testscenarios
|
import testscenarios
|
||||||
|
import time
|
||||||
|
|
||||||
import oslo_messaging
|
import oslo_messaging
|
||||||
from oslo_messaging import rpc
|
from oslo_messaging import rpc
|
||||||
@ -180,7 +181,8 @@ class TestDispatcher(test_utils.BaseTestCase):
|
|||||||
dispatcher = oslo_messaging.RPCDispatcher(endpoints, serializer,
|
dispatcher = oslo_messaging.RPCDispatcher(endpoints, serializer,
|
||||||
self.access_policy)
|
self.access_policy)
|
||||||
|
|
||||||
incoming = mock.Mock(ctxt=self.ctxt, message=self.msg)
|
incoming = mock.Mock(ctxt=self.ctxt, message=self.msg,
|
||||||
|
client_timeout=0)
|
||||||
|
|
||||||
res = None
|
res = None
|
||||||
|
|
||||||
@ -252,6 +254,7 @@ class TestSerializer(test_utils.BaseTestCase):
|
|||||||
incoming = mock.Mock()
|
incoming = mock.Mock()
|
||||||
incoming.ctxt = self.ctxt
|
incoming.ctxt = self.ctxt
|
||||||
incoming.message = dict(method='foo', args=self.args)
|
incoming.message = dict(method='foo', args=self.args)
|
||||||
|
incoming.client_timeout = 0
|
||||||
retval = dispatcher.dispatch(incoming)
|
retval = dispatcher.dispatch(incoming)
|
||||||
if self.retval is not None:
|
if self.retval is not None:
|
||||||
self.assertEqual('s' + self.retval, retval)
|
self.assertEqual('s' + self.retval, retval)
|
||||||
@ -265,3 +268,38 @@ class TestSerializer(test_utils.BaseTestCase):
|
|||||||
|
|
||||||
serializer.serialize_entity.assert_called_once_with(self.dctxt,
|
serializer.serialize_entity.assert_called_once_with(self.dctxt,
|
||||||
self.retval)
|
self.retval)
|
||||||
|
|
||||||
|
|
||||||
|
class TestMonitorFailure(test_utils.BaseTestCase):
|
||||||
|
"""Test what happens when the call monitor watchdog hits an exception when
|
||||||
|
sending the heartbeat.
|
||||||
|
"""
|
||||||
|
|
||||||
|
class _SleepyEndpoint(object):
|
||||||
|
def __init__(self, target=None):
|
||||||
|
self.target = target
|
||||||
|
|
||||||
|
def sleep(self, ctxt, **kwargs):
|
||||||
|
time.sleep(kwargs['timeout'])
|
||||||
|
return True
|
||||||
|
|
||||||
|
def test_heartbeat_failure(self):
|
||||||
|
|
||||||
|
endpoints = [self._SleepyEndpoint()]
|
||||||
|
dispatcher = oslo_messaging.RPCDispatcher(endpoints,
|
||||||
|
serializer=None)
|
||||||
|
|
||||||
|
# sleep long enough for the client_timeout to expire multiple times
|
||||||
|
# the timeout is (client_timeout/2) and must be > 1.0
|
||||||
|
message = {'method': 'sleep',
|
||||||
|
'args': {'timeout': 3.5}}
|
||||||
|
ctxt = {'test': 'value'}
|
||||||
|
|
||||||
|
incoming = mock.Mock(ctxt=ctxt, message=message, client_timeout=2.0)
|
||||||
|
incoming.heartbeat = mock.Mock(side_effect=Exception('BOOM!'))
|
||||||
|
res = dispatcher.dispatch(incoming)
|
||||||
|
self.assertTrue(res)
|
||||||
|
|
||||||
|
# only one call to heartbeat should be made since the watchdog thread
|
||||||
|
# should exit on the first exception thrown
|
||||||
|
self.assertEqual(1, incoming.heartbeat.call_count)
|
||||||
|
Loading…
Reference in New Issue
Block a user