Transport reconnection retries for notification
This patch add support of reconnection retries for the messaging notifier. Related bug #1282639 Change-Id: Ia30331f8306ff0f6952d83ef42ff8bee6b900427
This commit is contained in:
parent
e349c5e6f2
commit
1ea9c35ab4
@ -389,9 +389,9 @@ class AMQPDriverBase(base.BaseDriver):
|
||||
return self._send(target, ctxt, message, wait_for_reply, timeout,
|
||||
retry=retry)
|
||||
|
||||
def send_notification(self, target, ctxt, message, version):
|
||||
def send_notification(self, target, ctxt, message, version, retry=None):
|
||||
return self._send(target, ctxt, message,
|
||||
envelope=(version == 2.0), notify=True)
|
||||
envelope=(version == 2.0), notify=True, retry=retry)
|
||||
|
||||
def listen(self, target):
|
||||
conn = self._get_connection(pooled=False)
|
||||
|
@ -168,7 +168,9 @@ class FakeDriver(base.BaseDriver):
|
||||
# transport always works
|
||||
return self._send(target, ctxt, message, wait_for_reply, timeout)
|
||||
|
||||
def send_notification(self, target, ctxt, message, version):
|
||||
def send_notification(self, target, ctxt, message, version, retry=None):
|
||||
# NOTE(sileht): retry doesn't need to be implemented, the fake
|
||||
# transport always works
|
||||
self._send(target, ctxt, message)
|
||||
|
||||
def listen(self, target):
|
||||
|
@ -697,10 +697,10 @@ class Connection(object):
|
||||
"""Send a 'fanout' message."""
|
||||
self.publisher_send(FanoutPublisher, topic=topic, msg=msg, retry=retry)
|
||||
|
||||
def notify_send(self, exchange_name, topic, msg, **kwargs):
|
||||
def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs):
|
||||
"""Send a notify message on a topic."""
|
||||
self.publisher_send(NotifyPublisher, topic=topic, msg=msg,
|
||||
exchange_name=exchange_name)
|
||||
exchange_name=exchange_name, retry=retry)
|
||||
|
||||
def consume(self, limit=None, timeout=None):
|
||||
"""Consume from all queues/consumers."""
|
||||
|
@ -788,10 +788,10 @@ class Connection(object):
|
||||
"""Send a 'fanout' message."""
|
||||
self.publisher_send(FanoutPublisher, topic, msg, retry=retry)
|
||||
|
||||
def notify_send(self, exchange_name, topic, msg, **kwargs):
|
||||
def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs):
|
||||
"""Send a notify message on a topic."""
|
||||
self.publisher_send(NotifyPublisher, topic, msg, timeout=None,
|
||||
exchange_name=exchange_name, **kwargs)
|
||||
exchange_name=exchange_name, retry=retry, **kwargs)
|
||||
|
||||
def consume(self, limit=None, timeout=None):
|
||||
"""Consume from all queues/consumers."""
|
||||
|
@ -945,9 +945,11 @@ class ZmqDriver(base.BaseDriver):
|
||||
# retry anything
|
||||
return self._send(target, ctxt, message, wait_for_reply, timeout)
|
||||
|
||||
def send_notification(self, target, ctxt, message, version):
|
||||
def send_notification(self, target, ctxt, message, version, retry=None):
|
||||
# NOTE(ewindisch): dot-priority in rpc notifier does not
|
||||
# work with our assumptions.
|
||||
# NOTE(sileht): retry is not implemented because this driver never
|
||||
# retry anything
|
||||
target = target(topic=target.topic.replace('.', '-'))
|
||||
return self._send(target, ctxt, message, envelope=(version == 2.0))
|
||||
|
||||
|
@ -27,7 +27,7 @@ class LogDriver(notifier._Driver):
|
||||
|
||||
LOGGER_BASE = 'oslo.messaging.notification'
|
||||
|
||||
def notify(self, ctxt, message, priority):
|
||||
def notify(self, ctxt, message, priority, retry):
|
||||
logger = logging.getLogger('%s.%s' % (self.LOGGER_BASE,
|
||||
message['event_type']))
|
||||
method = getattr(logger, priority.lower(), None)
|
||||
|
@ -38,13 +38,14 @@ class MessagingDriver(notifier._Driver):
|
||||
super(MessagingDriver, self).__init__(conf, topics, transport)
|
||||
self.version = version
|
||||
|
||||
def notify(self, ctxt, message, priority):
|
||||
def notify(self, ctxt, message, priority, retry):
|
||||
priority = priority.lower()
|
||||
for topic in self.topics:
|
||||
target = messaging.Target(topic='%s.%s' % (topic, priority))
|
||||
try:
|
||||
self.transport._send_notification(target, ctxt, message,
|
||||
version=self.version)
|
||||
version=self.version,
|
||||
retry=retry)
|
||||
except Exception:
|
||||
LOG.exception("Could not send notification to %(topic)s. "
|
||||
"Payload=%(message)s",
|
||||
|
@ -20,5 +20,5 @@ from oslo.messaging.notify import notifier
|
||||
|
||||
class NoOpDriver(notifier._Driver):
|
||||
|
||||
def notify(self, ctxt, message, priority):
|
||||
def notify(self, ctxt, message, priority, retry):
|
||||
pass
|
||||
|
@ -104,21 +104,23 @@ class RoutingDriver(notifier._Driver):
|
||||
|
||||
return list(accepted_drivers)
|
||||
|
||||
def _filter_func(self, ext, context, message, priority, accepted_drivers):
|
||||
def _filter_func(self, ext, context, message, priority, retry,
|
||||
accepted_drivers):
|
||||
"""True/False if the driver should be called for this message.
|
||||
"""
|
||||
# context is unused here, but passed in by map()
|
||||
return ext.name in accepted_drivers
|
||||
|
||||
def _call_notify(self, ext, context, message, priority, accepted_drivers):
|
||||
def _call_notify(self, ext, context, message, priority, retry,
|
||||
accepted_drivers):
|
||||
"""Emit the notification.
|
||||
"""
|
||||
# accepted_drivers is passed in as a result of the map() function
|
||||
LOG.info(_("Routing '%(event)s' notification to '%(driver)s' driver") %
|
||||
{'event': message.get('event_type'), 'driver': ext.name})
|
||||
ext.obj.notify(context, message, priority)
|
||||
ext.obj.notify(context, message, priority, retry)
|
||||
|
||||
def notify(self, context, message, priority):
|
||||
def notify(self, context, message, priority, retry):
|
||||
if not self.plugin_manager:
|
||||
self._load_notifiers()
|
||||
|
||||
@ -131,4 +133,5 @@ class RoutingDriver(notifier._Driver):
|
||||
self._get_drivers_for_message(group, event_type,
|
||||
priority.lower()))
|
||||
self.plugin_manager.map(self._filter_func, self._call_notify, context,
|
||||
message, priority, list(accepted_drivers))
|
||||
message, priority, retry,
|
||||
list(accepted_drivers))
|
||||
|
@ -30,5 +30,5 @@ class TestDriver(notifier._Driver):
|
||||
|
||||
"Store notifications in memory for test verification."
|
||||
|
||||
def notify(self, ctxt, message, priority):
|
||||
NOTIFICATIONS.append((ctxt, message, priority))
|
||||
def notify(self, ctxt, message, priority, retry):
|
||||
NOTIFICATIONS.append((ctxt, message, priority, retry))
|
||||
|
@ -49,7 +49,7 @@ class _Driver(object):
|
||||
self.transport = transport
|
||||
|
||||
@abc.abstractmethod
|
||||
def notify(self, ctxt, msg, priority):
|
||||
def notify(self, ctxt, msg, priority, retry):
|
||||
pass
|
||||
|
||||
|
||||
@ -96,7 +96,7 @@ class Notifier(object):
|
||||
|
||||
def __init__(self, transport, publisher_id=None,
|
||||
driver=None, topic=None,
|
||||
serializer=None):
|
||||
serializer=None, retry=None):
|
||||
"""Construct a Notifier object.
|
||||
|
||||
:param transport: the transport to use for sending messages
|
||||
@ -109,11 +109,17 @@ class Notifier(object):
|
||||
:type topic: str
|
||||
:param serializer: an optional entity serializer
|
||||
:type serializer: Serializer
|
||||
:param retry: an connection retries configuration
|
||||
None or -1 means to retry forever
|
||||
0 means no retry
|
||||
N means N retries
|
||||
:type retry: int
|
||||
"""
|
||||
transport.conf.register_opts(_notifier_opts)
|
||||
|
||||
self.transport = transport
|
||||
self.publisher_id = publisher_id
|
||||
self.retry = retry
|
||||
|
||||
self._driver_names = ([driver] if driver is not None
|
||||
else transport.conf.notification_driver)
|
||||
@ -130,12 +136,12 @@ class Notifier(object):
|
||||
invoke_kwds={
|
||||
'topics': self._topics,
|
||||
'transport': self.transport,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
_marker = object()
|
||||
|
||||
def prepare(self, publisher_id=_marker):
|
||||
def prepare(self, publisher_id=_marker, retry=_marker):
|
||||
"""Return a specialized Notifier instance.
|
||||
|
||||
Returns a new Notifier instance with the supplied publisher_id. Allows
|
||||
@ -144,10 +150,16 @@ class Notifier(object):
|
||||
|
||||
:param publisher_id: field in notifications sent, e.g. 'compute.host1'
|
||||
:type publisher_id: str
|
||||
:param retry: an connection retries configuration
|
||||
None or -1 means to retry forever
|
||||
0 means no retry
|
||||
N means N retries
|
||||
:type retry: int
|
||||
"""
|
||||
return _SubNotifier._prepare(self, publisher_id)
|
||||
return _SubNotifier._prepare(self, publisher_id, retry=retry)
|
||||
|
||||
def _notify(self, ctxt, event_type, payload, priority, publisher_id=None):
|
||||
def _notify(self, ctxt, event_type, payload, priority, publisher_id=None,
|
||||
retry=None):
|
||||
payload = self._serializer.serialize_entity(ctxt, payload)
|
||||
ctxt = self._serializer.serialize_context(ctxt)
|
||||
|
||||
@ -160,7 +172,7 @@ class Notifier(object):
|
||||
|
||||
def do_notify(ext):
|
||||
try:
|
||||
ext.obj.notify(ctxt, msg, priority)
|
||||
ext.obj.notify(ctxt, msg, priority, retry or self.retry)
|
||||
except Exception as e:
|
||||
_LOG.exception("Problem '%(e)s' attempting to send to "
|
||||
"notification system. Payload=%(payload)s",
|
||||
@ -178,6 +190,7 @@ class Notifier(object):
|
||||
:type event_type: str
|
||||
:param payload: the notification payload
|
||||
:type payload: dict
|
||||
:raises: MessageDeliveryFailure
|
||||
"""
|
||||
self._notify(ctxt, event_type, payload, 'AUDIT')
|
||||
|
||||
@ -190,6 +203,7 @@ class Notifier(object):
|
||||
:type event_type: str
|
||||
:param payload: the notification payload
|
||||
:type payload: dict
|
||||
:raises: MessageDeliveryFailure
|
||||
"""
|
||||
self._notify(ctxt, event_type, payload, 'DEBUG')
|
||||
|
||||
@ -202,6 +216,7 @@ class Notifier(object):
|
||||
:type event_type: str
|
||||
:param payload: the notification payload
|
||||
:type payload: dict
|
||||
:raises: MessageDeliveryFailure
|
||||
"""
|
||||
self._notify(ctxt, event_type, payload, 'INFO')
|
||||
|
||||
@ -214,6 +229,7 @@ class Notifier(object):
|
||||
:type event_type: str
|
||||
:param payload: the notification payload
|
||||
:type payload: dict
|
||||
:raises: MessageDeliveryFailure
|
||||
"""
|
||||
self._notify(ctxt, event_type, payload, 'WARN')
|
||||
|
||||
@ -228,6 +244,7 @@ class Notifier(object):
|
||||
:type event_type: str
|
||||
:param payload: the notification payload
|
||||
:type payload: dict
|
||||
:raises: MessageDeliveryFailure
|
||||
"""
|
||||
self._notify(ctxt, event_type, payload, 'ERROR')
|
||||
|
||||
@ -240,6 +257,7 @@ class Notifier(object):
|
||||
:type event_type: str
|
||||
:param payload: the notification payload
|
||||
:type payload: dict
|
||||
:raises: MessageDeliveryFailure
|
||||
"""
|
||||
self._notify(ctxt, event_type, payload, 'CRITICAL')
|
||||
|
||||
@ -258,6 +276,7 @@ class Notifier(object):
|
||||
:type event_type: str
|
||||
:param payload: the notification payload
|
||||
:type payload: dict
|
||||
:raises: MessageDeliveryFailure
|
||||
"""
|
||||
self._notify(ctxt, event_type, payload, 'SAMPLE')
|
||||
|
||||
@ -266,10 +285,11 @@ class _SubNotifier(Notifier):
|
||||
|
||||
_marker = Notifier._marker
|
||||
|
||||
def __init__(self, base, publisher_id):
|
||||
def __init__(self, base, publisher_id, retry):
|
||||
self._base = base
|
||||
self.transport = base.transport
|
||||
self.publisher_id = publisher_id
|
||||
self.retry = retry
|
||||
|
||||
self._serializer = self._base._serializer
|
||||
self._driver_mgr = self._base._driver_mgr
|
||||
@ -278,7 +298,9 @@ class _SubNotifier(Notifier):
|
||||
super(_SubNotifier, self)._notify(ctxt, event_type, payload, priority)
|
||||
|
||||
@classmethod
|
||||
def _prepare(cls, base, publisher_id=_marker):
|
||||
def _prepare(cls, base, publisher_id=_marker, retry=_marker):
|
||||
if publisher_id is cls._marker:
|
||||
publisher_id = base.publisher_id
|
||||
return cls(base, publisher_id)
|
||||
if retry is cls._marker:
|
||||
retry = base.retry
|
||||
return cls(base, publisher_id, retry=retry)
|
||||
|
@ -89,11 +89,12 @@ class Transport(object):
|
||||
wait_for_reply=wait_for_reply,
|
||||
timeout=timeout, retry=retry)
|
||||
|
||||
def _send_notification(self, target, ctxt, message, version):
|
||||
def _send_notification(self, target, ctxt, message, version, retry=None):
|
||||
if not target.topic:
|
||||
raise exceptions.InvalidTarget('A topic is required to send',
|
||||
target)
|
||||
self._driver.send_notification(target, ctxt, message, version)
|
||||
self._driver.send_notification(target, ctxt, message, version,
|
||||
retry=retry)
|
||||
|
||||
def _listen(self, target):
|
||||
if not (target.topic and target.server):
|
||||
|
@ -44,7 +44,7 @@ class _FakeTransport(object):
|
||||
def __init__(self, conf):
|
||||
self.conf = conf
|
||||
|
||||
def _send_notification(self, target, ctxt, message, version):
|
||||
def _send_notification(self, target, ctxt, message, version, retry=None):
|
||||
pass
|
||||
|
||||
|
||||
@ -123,6 +123,13 @@ class TestMessagingNotifier(test_utils.BaseTestCase):
|
||||
('ctxt', dict(ctxt={'user': 'bob'})),
|
||||
]
|
||||
|
||||
_retry = [
|
||||
('unconfigured', dict()),
|
||||
('None', dict(retry=None)),
|
||||
('0', dict(retry=0)),
|
||||
('5', dict(retry=5)),
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def generate_scenarios(cls):
|
||||
cls.scenarios = testscenarios.multiply_scenarios(cls._v1,
|
||||
@ -131,7 +138,8 @@ class TestMessagingNotifier(test_utils.BaseTestCase):
|
||||
cls._topics,
|
||||
cls._priority,
|
||||
cls._payload,
|
||||
cls._context)
|
||||
cls._context,
|
||||
cls._retry)
|
||||
|
||||
def setUp(self):
|
||||
super(TestMessagingNotifier, self).setUp()
|
||||
@ -159,8 +167,13 @@ class TestMessagingNotifier(test_utils.BaseTestCase):
|
||||
else:
|
||||
notifier = messaging.Notifier(transport)
|
||||
|
||||
prepare_kwds = {}
|
||||
if hasattr(self, 'retry'):
|
||||
prepare_kwds['retry'] = self.retry
|
||||
if hasattr(self, 'prep_pub_id'):
|
||||
notifier = notifier.prepare(publisher_id=self.prep_pub_id)
|
||||
prepare_kwds['publisher_id'] = self.prep_pub_id
|
||||
if prepare_kwds:
|
||||
notifier = notifier.prepare(**prepare_kwds)
|
||||
|
||||
self.mox.StubOutWithMock(transport, '_send_notification')
|
||||
|
||||
@ -187,6 +200,10 @@ class TestMessagingNotifier(test_utils.BaseTestCase):
|
||||
|
||||
for send_kwargs in sends:
|
||||
for topic in self.topics:
|
||||
if hasattr(self, 'retry'):
|
||||
send_kwargs['retry'] = self.retry
|
||||
else:
|
||||
send_kwargs['retry'] = None
|
||||
target = messaging.Target(topic='%s.%s' % (topic,
|
||||
self.priority))
|
||||
transport._send_notification(target, self.ctxt, message,
|
||||
@ -244,7 +261,7 @@ class TestSerializer(test_utils.BaseTestCase):
|
||||
'timestamp': str(timeutils.utcnow()),
|
||||
}
|
||||
|
||||
self.assertEqual([(dict(user='alice'), message, 'INFO')],
|
||||
self.assertEqual([(dict(user='alice'), message, 'INFO', None)],
|
||||
_impl_test.NOTIFICATIONS)
|
||||
|
||||
|
||||
@ -299,7 +316,7 @@ class TestLogNotifier(test_utils.BaseTestCase):
|
||||
self.mox.ReplayAll()
|
||||
|
||||
msg = {'event_type': 'foo'}
|
||||
driver.notify(None, msg, "sample")
|
||||
driver.notify(None, msg, "sample", None)
|
||||
|
||||
|
||||
class TestRoutingNotifier(test_utils.BaseTestCase):
|
||||
@ -467,11 +484,11 @@ group_1:
|
||||
|
||||
# Good ...
|
||||
self.assertTrue(self.router._filter_func(ext, {}, {}, 'info',
|
||||
['foo', 'rpc']))
|
||||
None, ['foo', 'rpc']))
|
||||
|
||||
# Bad
|
||||
self.assertFalse(self.router._filter_func(ext, {}, {}, 'info',
|
||||
['foo']))
|
||||
None, ['foo']))
|
||||
|
||||
def test_notify(self):
|
||||
self.router.routing_groups = {'group_1': None, 'group_2': None}
|
||||
@ -482,7 +499,7 @@ group_1:
|
||||
with mock.patch.object(self.router, '_get_drivers_for_message',
|
||||
drivers_mock):
|
||||
self.notifier.info({}, 'my_event', {})
|
||||
self.assertEqual(['rpc', 'foo'], pm.map.call_args[0][5])
|
||||
self.assertEqual(['rpc', 'foo'], pm.map.call_args[0][6])
|
||||
|
||||
def test_notify_filtered(self):
|
||||
self.config(routing_notifier_config="routing_notifier.yaml")
|
||||
@ -518,6 +535,7 @@ group_1:
|
||||
return_value=pm)):
|
||||
self.notifier.info({}, 'my_event', {})
|
||||
self.assertFalse(bar_driver.info.called)
|
||||
rpc_driver.notify.assert_called_once_with({}, mock.ANY, 'INFO')
|
||||
rpc_driver.notify.assert_called_once_with(
|
||||
{}, mock.ANY, 'INFO', None)
|
||||
rpc2_driver.notify.assert_called_once_with(
|
||||
{}, mock.ANY, 'INFO')
|
||||
{}, mock.ANY, 'INFO', None)
|
||||
|
@ -174,10 +174,11 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
|
||||
notifier = self._setup_notifier(transport, topic="topic")
|
||||
|
||||
def mock_notifier_exchange(name):
|
||||
def side_effect(target, ctxt, message, version):
|
||||
def side_effect(target, ctxt, message, version, retry):
|
||||
target.exchange = name
|
||||
return transport._driver.send_notification(target, ctxt,
|
||||
message, version)
|
||||
message, version,
|
||||
retry=retry)
|
||||
transport._send_notification = mock.MagicMock(
|
||||
side_effect=side_effect)
|
||||
|
||||
|
@ -292,11 +292,23 @@ class TestTransportMethodArgs(test_utils.BaseTestCase):
|
||||
t = transport.Transport(_FakeDriver(cfg.CONF))
|
||||
|
||||
self.mox.StubOutWithMock(t._driver, 'send_notification')
|
||||
t._driver.send_notification(self._target, 'ctxt', 'message', 1.0)
|
||||
t._driver.send_notification(self._target, 'ctxt', 'message', 1.0,
|
||||
retry=None)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
t._send_notification(self._target, 'ctxt', 'message', version=1.0)
|
||||
|
||||
def test_send_notification_all_args(self):
|
||||
t = transport.Transport(_FakeDriver(cfg.CONF))
|
||||
|
||||
self.mox.StubOutWithMock(t._driver, 'send_notification')
|
||||
t._driver.send_notification(self._target, 'ctxt', 'message', 1.0,
|
||||
retry=5)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
t._send_notification(self._target, 'ctxt', 'message', version=1.0,
|
||||
retry=5)
|
||||
|
||||
def test_listen(self):
|
||||
t = transport.Transport(_FakeDriver(cfg.CONF))
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user