Add separate transport for notifications
In oslo.messaging 2.9.0, the RPC and notification buses were decoupled into separate transport. This changes over the Neutron notifier to use the notification transport provided from oslo.messaging instead of the original transport (which is now used for RPC). By default, the notification transport will pull in the original transport if nothing is configured in oslo_messaging_notification, so this can be an in-place replacement without impacting upgrades or configurations. This functionality was added to oslo.messaging to address bug #1504622 Change-Id: I89c8a84c81079677088cc7d656a0bcb5e323de4d
This commit is contained in:
parent
e4f2fb9df6
commit
d3ee7a5338
@ -29,6 +29,7 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
TRANSPORT = None
|
||||
NOTIFICATION_TRANSPORT = None
|
||||
NOTIFIER = None
|
||||
|
||||
ALLOWED_EXMODS = [
|
||||
@ -56,21 +57,26 @@ RPC_DISABLED = False
|
||||
|
||||
|
||||
def init(conf):
|
||||
global TRANSPORT, NOTIFIER
|
||||
global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER
|
||||
exmods = get_allowed_exmods()
|
||||
TRANSPORT = oslo_messaging.get_transport(conf,
|
||||
allowed_remote_exmods=exmods,
|
||||
aliases=TRANSPORT_ALIASES)
|
||||
NOTIFICATION_TRANSPORT = oslo_messaging.get_notification_transport(
|
||||
conf, allowed_remote_exmods=exmods, aliases=TRANSPORT_ALIASES)
|
||||
serializer = RequestContextSerializer()
|
||||
NOTIFIER = oslo_messaging.Notifier(TRANSPORT, serializer=serializer)
|
||||
NOTIFIER = oslo_messaging.Notifier(NOTIFICATION_TRANSPORT,
|
||||
serializer=serializer)
|
||||
|
||||
|
||||
def cleanup():
|
||||
global TRANSPORT, NOTIFIER
|
||||
global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER
|
||||
assert TRANSPORT is not None
|
||||
assert NOTIFICATION_TRANSPORT is not None
|
||||
assert NOTIFIER is not None
|
||||
TRANSPORT.cleanup()
|
||||
TRANSPORT = NOTIFIER = None
|
||||
NOTIFICATION_TRANSPORT.cleanup()
|
||||
TRANSPORT = NOTIFICATION_TRANSPORT = NOTIFIER = None
|
||||
|
||||
|
||||
def add_extra_exmods(*args):
|
||||
|
@ -33,6 +33,7 @@ CONF.import_opt('state_path', 'neutron.common.config')
|
||||
class RPCFixture(fixtures.Fixture):
|
||||
def _setUp(self):
|
||||
self.trans = copy.copy(rpc.TRANSPORT)
|
||||
self.noti_trans = copy.copy(rpc.NOTIFICATION_TRANSPORT)
|
||||
self.noti = copy.copy(rpc.NOTIFIER)
|
||||
self.all_mods = copy.copy(rpc.ALLOWED_EXMODS)
|
||||
self.ext_mods = copy.copy(rpc.EXTRA_EXMODS)
|
||||
@ -40,6 +41,7 @@ class RPCFixture(fixtures.Fixture):
|
||||
|
||||
def _reset_everything(self):
|
||||
rpc.TRANSPORT = self.trans
|
||||
rpc.NOTIFICATION_TRANSPORT = self.noti_trans
|
||||
rpc.NOTIFIER = self.noti
|
||||
rpc.ALLOWED_EXMODS = self.all_mods
|
||||
rpc.EXTRA_EXMODS = self.ext_mods
|
||||
@ -53,15 +55,19 @@ class TestRPC(base.DietTestCase):
|
||||
@mock.patch.object(rpc, 'get_allowed_exmods')
|
||||
@mock.patch.object(rpc, 'RequestContextSerializer')
|
||||
@mock.patch.object(messaging, 'get_transport')
|
||||
@mock.patch.object(messaging, 'get_notification_transport')
|
||||
@mock.patch.object(messaging, 'Notifier')
|
||||
def test_init(self, mock_not, mock_trans, mock_ser, mock_exmods):
|
||||
def test_init(self, mock_not, mock_noti_trans, mock_trans, mock_ser,
|
||||
mock_exmods):
|
||||
notifier = mock.Mock()
|
||||
transport = mock.Mock()
|
||||
noti_transport = mock.Mock()
|
||||
serializer = mock.Mock()
|
||||
conf = mock.Mock()
|
||||
|
||||
mock_exmods.return_value = ['foo']
|
||||
mock_trans.return_value = transport
|
||||
mock_noti_trans.return_value = noti_transport
|
||||
mock_ser.return_value = serializer
|
||||
mock_not.return_value = notifier
|
||||
|
||||
@ -70,28 +76,45 @@ class TestRPC(base.DietTestCase):
|
||||
mock_exmods.assert_called_once_with()
|
||||
mock_trans.assert_called_once_with(conf, allowed_remote_exmods=['foo'],
|
||||
aliases=rpc.TRANSPORT_ALIASES)
|
||||
mock_not.assert_called_once_with(transport, serializer=serializer)
|
||||
mock_noti_trans.assert_called_once_with(conf,
|
||||
allowed_remote_exmods=['foo'],
|
||||
aliases=rpc.TRANSPORT_ALIASES)
|
||||
mock_not.assert_called_once_with(noti_transport,
|
||||
serializer=serializer)
|
||||
self.assertIsNotNone(rpc.TRANSPORT)
|
||||
self.assertIsNotNone(rpc.NOTIFICATION_TRANSPORT)
|
||||
self.assertIsNotNone(rpc.NOTIFIER)
|
||||
|
||||
def test_cleanup_transport_null(self):
|
||||
rpc.NOTIFIER = mock.Mock()
|
||||
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
|
||||
self.assertRaises(AssertionError, rpc.cleanup)
|
||||
|
||||
def test_cleanup_notification_transport_null(self):
|
||||
rpc.TRANSPORT = mock.Mock()
|
||||
rpc.NOTIFIER = mock.Mock()
|
||||
self.assertRaises(AssertionError, rpc.cleanup)
|
||||
|
||||
def test_cleanup_notifier_null(self):
|
||||
rpc.TRANSPORT = mock.Mock()
|
||||
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
|
||||
self.assertRaises(AssertionError, rpc.cleanup)
|
||||
|
||||
def test_cleanup(self):
|
||||
rpc.NOTIFIER = mock.Mock()
|
||||
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
|
||||
rpc.TRANSPORT = mock.Mock()
|
||||
trans_cleanup = mock.Mock()
|
||||
not_trans_cleanup = mock.Mock()
|
||||
rpc.TRANSPORT.cleanup = trans_cleanup
|
||||
rpc.NOTIFICATION_TRANSPORT.cleanup = not_trans_cleanup
|
||||
|
||||
rpc.cleanup()
|
||||
|
||||
trans_cleanup.assert_called_once_with()
|
||||
not_trans_cleanup.assert_called_once_with()
|
||||
self.assertIsNone(rpc.TRANSPORT)
|
||||
self.assertIsNone(rpc.NOTIFICATION_TRANSPORT)
|
||||
self.assertIsNone(rpc.NOTIFIER)
|
||||
|
||||
def test_add_extra_exmods(self):
|
||||
|
@ -0,0 +1,7 @@
|
||||
---
|
||||
features:
|
||||
- The RPC and notification queues have been separated into different queues.
|
||||
Specify the transport_url to be used for notifications within the
|
||||
[oslo_messaging_notifications] section of the configuration file.
|
||||
If no transport_url is specified in [oslo_messaging_notifications],
|
||||
the transport_url used for RPC will be used.
|
Loading…
Reference in New Issue
Block a user