Make DHCP notifier use core resource events
This makes the notifier subscribe to core resource events and leverage them if they are available. This solves the issue where internal core plugin calls from service plugins were not generating DHCP agent notifications. Closes-Bug: #1621345 Change-Id: I607635601caff0322fd0c80c9023f5c4f663ca25
This commit is contained in:
parent
9d24490da8
commit
181bdb374f
@ -52,6 +52,7 @@ class DhcpAgentNotifyAPI(object):
|
||||
'port.delete.end']
|
||||
|
||||
def __init__(self, topic=topics.DHCP_AGENT, plugin=None):
|
||||
self._unsubscribed_resources = []
|
||||
self._plugin = plugin
|
||||
target = oslo_messaging.Target(topic=topic, version='1.0')
|
||||
self.client = n_rpc.get_client(target)
|
||||
@ -69,9 +70,22 @@ class DhcpAgentNotifyAPI(object):
|
||||
resources.SUBNET,
|
||||
resources.SUBNETS,
|
||||
)
|
||||
if not cfg.CONF.dhcp_agent_notification:
|
||||
return
|
||||
for resource in callback_resources:
|
||||
registry.subscribe(self._send_dhcp_notification,
|
||||
resource, events.BEFORE_RESPONSE)
|
||||
self.uses_native_notifications = {}
|
||||
for resource in (resources.NETWORK, resources.PORT, resources.SUBNET):
|
||||
self.uses_native_notifications[resource] = {'create': False,
|
||||
'update': False,
|
||||
'delete': False}
|
||||
registry.subscribe(self._native_event_send_dhcp_notification,
|
||||
resource, events.AFTER_CREATE)
|
||||
registry.subscribe(self._native_event_send_dhcp_notification,
|
||||
resource, events.AFTER_UPDATE)
|
||||
registry.subscribe(self._native_event_send_dhcp_notification,
|
||||
resource, events.AFTER_DELETE)
|
||||
|
||||
@property
|
||||
def plugin(self):
|
||||
@ -212,16 +226,37 @@ class DhcpAgentNotifyAPI(object):
|
||||
{'port_id': kwargs['port']['id']},
|
||||
kwargs['port']['network_id'])
|
||||
|
||||
def _native_event_send_dhcp_notification(self, resource, event, trigger,
|
||||
context, **kwargs):
|
||||
action = event.replace('after_', '')
|
||||
# we unsubscribe the _send_dhcp_notification method now that we know
|
||||
# the loaded core plugin emits native resource events
|
||||
if resource not in self._unsubscribed_resources:
|
||||
self.uses_native_notifications[resource][action] = True
|
||||
if all(self.uses_native_notifications[resource].values()):
|
||||
# only unsubscribe the API level listener if we are
|
||||
# receiving all event types for this resource
|
||||
self._unsubscribed_resources.append(resource)
|
||||
registry.unsubscribe_by_resource(self._send_dhcp_notification,
|
||||
resource)
|
||||
method_name = '.'.join((resource, action, 'end'))
|
||||
payload = kwargs[resource]
|
||||
data = {resource: payload}
|
||||
self.notify(context, data, method_name)
|
||||
|
||||
def _send_dhcp_notification(self, resource, event, trigger, context=None,
|
||||
data=None, method_name=None, collection=None,
|
||||
**kwargs):
|
||||
if cfg.CONF.dhcp_agent_notification:
|
||||
if collection and collection in data:
|
||||
for body in data[collection]:
|
||||
item = {resource: body}
|
||||
self.notify(context, item, method_name)
|
||||
else:
|
||||
self.notify(context, data, method_name)
|
||||
action='', **kwargs):
|
||||
action = action.split('_')[0]
|
||||
if (resource in self.uses_native_notifications and
|
||||
self.uses_native_notifications[resource][action]):
|
||||
return
|
||||
if collection and collection in data:
|
||||
for body in data[collection]:
|
||||
item = {resource: body}
|
||||
self.notify(context, item, method_name)
|
||||
else:
|
||||
self.notify(context, data, method_name)
|
||||
|
||||
def notify(self, context, data, method_name):
|
||||
# data is {'key' : 'value'} with only one key
|
||||
|
@ -47,6 +47,8 @@ class NotifierHook(hooks.PecanHook):
|
||||
self._nova_notifier.send_network_change(action_resource, *args)
|
||||
|
||||
def _notify_dhcp_agent(self, context, resource_name, action, resources):
|
||||
# NOTE(kevinbenton): we should remove this whole method in Ocata and
|
||||
# make plugins emit the core resource events
|
||||
plugin = manager.NeutronManager.get_plugin_for_resource(resource_name)
|
||||
notifier_method = '%s.%s.end' % (resource_name, action)
|
||||
# use plugin's dhcp notifier, if this is already instantiated
|
||||
@ -55,6 +57,10 @@ class NotifierHook(hooks.PecanHook):
|
||||
agent_notifiers.get(constants.AGENT_TYPE_DHCP) or
|
||||
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||
)
|
||||
native_map = getattr(dhcp_agent_notifier, 'uses_native_notifications',
|
||||
{})
|
||||
if native_map.get(resource_name, {}).get(action):
|
||||
return
|
||||
# The DHCP Agent does not accept bulk notifications
|
||||
for resource in resources:
|
||||
item = {resource_name: resource}
|
||||
|
@ -19,6 +19,7 @@ Common utilities and helper functions for OpenStack Networking Plugins.
|
||||
import contextlib
|
||||
import hashlib
|
||||
|
||||
import debtcollector
|
||||
from neutron_lib import constants as n_const
|
||||
from neutron_lib import exceptions
|
||||
from oslo_config import cfg
|
||||
@ -172,6 +173,10 @@ def create_network(core_plugin, context, net, check_allow_post=True):
|
||||
return core_plugin.create_network(context, {'network': net_data})
|
||||
|
||||
|
||||
@debtcollector.removals.remove(
|
||||
message="This will be removed in the O cycle. "
|
||||
"Please call update_network directly on the plugin."
|
||||
)
|
||||
def update_network(core_plugin, context, network_id, net_data):
|
||||
network = core_plugin.update_network(
|
||||
context, network_id, {resources.NETWORK: net_data})
|
||||
|
@ -340,9 +340,9 @@ class AutoAllocatedTopologyMixin(common_db_mixin.CommonDbMixin):
|
||||
tenant_id=tenant_id,
|
||||
network_id=network_id,
|
||||
router_id=router_id))
|
||||
p_utils.update_network(
|
||||
self.core_plugin, context,
|
||||
network_id, {'admin_state_up': True})
|
||||
self.core_plugin.update_network(
|
||||
context, network_id,
|
||||
{'network': {'admin_state_up': True}})
|
||||
except db_exc.DBDuplicateEntry:
|
||||
LOG.error(_LE("Multiple auto-allocated networks detected for "
|
||||
"tenant %(tenant)s. Attempting clean up for "
|
||||
|
@ -222,7 +222,7 @@ class TestPolicyEnforcementHook(test_functional.PecanFunctionalTest):
|
||||
self.assertNotIn('restricted_attr', json_response['mehs'][0])
|
||||
|
||||
|
||||
class TestDHCPNotifierHook(test_functional.PecanFunctionalTest):
|
||||
class DHCPNotifierTestBase(test_functional.PecanFunctionalTest):
|
||||
|
||||
def setUp(self):
|
||||
# the DHCP notifier needs to be mocked so that correct operations can
|
||||
@ -232,29 +232,42 @@ class TestDHCPNotifierHook(test_functional.PecanFunctionalTest):
|
||||
patcher = mock.patch('neutron.api.rpc.agentnotifiers.'
|
||||
'dhcp_rpc_agent_api.DhcpAgentNotifyAPI.notify')
|
||||
self.mock_notifier = patcher.start()
|
||||
super(TestDHCPNotifierHook, self).setUp()
|
||||
super(DHCPNotifierTestBase, self).setUp()
|
||||
|
||||
|
||||
class TestDHCPNotifierHookNegative(DHCPNotifierTestBase):
|
||||
|
||||
def setUp(self):
|
||||
cfg.CONF.set_override('dhcp_agent_notification', False)
|
||||
super(TestDHCPNotifierHookNegative, self).setUp()
|
||||
|
||||
def test_dhcp_notifications_disabled(self):
|
||||
cfg.CONF.set_override('dhcp_agent_notification', False)
|
||||
self.app.post_json(
|
||||
'/v2.0/networks.json',
|
||||
params={'network': {'name': 'meh'}},
|
||||
headers={'X-Project-Id': 'tenid'})
|
||||
self.assertEqual(0, self.mock_notifier.call_count)
|
||||
|
||||
|
||||
class TestDHCPNotifierHook(DHCPNotifierTestBase):
|
||||
|
||||
def test_get_does_not_trigger_notification(self):
|
||||
self.do_request('/v2.0/networks', tenant_id='tenid')
|
||||
self.assertEqual(0, self.mock_notifier.call_count)
|
||||
|
||||
def test_post_put_delete_triggers_notification(self):
|
||||
ctx = context.get_admin_context()
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
|
||||
req_headers = {'X-Project-Id': 'tenid', 'X-Roles': 'admin'}
|
||||
response = self.app.post_json(
|
||||
'/v2.0/networks.json',
|
||||
params={'network': {'name': 'meh'}}, headers=req_headers)
|
||||
self.assertEqual(201, response.status_int)
|
||||
json_body = jsonutils.loads(response.body)
|
||||
net = {'network': plugin.get_network(ctx, json_body['network']['id'])}
|
||||
self.assertEqual(1, self.mock_notifier.call_count)
|
||||
self.assertEqual(mock.call(mock.ANY, json_body, 'network.create.end'),
|
||||
self.assertEqual(mock.call(mock.ANY, net, 'network.create.end'),
|
||||
self.mock_notifier.mock_calls[-1])
|
||||
network_id = json_body['network']['id']
|
||||
|
||||
@ -264,8 +277,9 @@ class TestDHCPNotifierHook(test_functional.PecanFunctionalTest):
|
||||
headers=req_headers)
|
||||
self.assertEqual(200, response.status_int)
|
||||
json_body = jsonutils.loads(response.body)
|
||||
net = {'network': plugin.get_network(ctx, json_body['network']['id'])}
|
||||
self.assertEqual(2, self.mock_notifier.call_count)
|
||||
self.assertEqual(mock.call(mock.ANY, json_body, 'network.update.end'),
|
||||
self.assertEqual(mock.call(mock.ANY, net, 'network.update.end'),
|
||||
self.mock_notifier.mock_calls[-1])
|
||||
|
||||
response = self.app.delete(
|
||||
|
@ -19,6 +19,9 @@ import mock
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
|
||||
from neutron.callbacks import events
|
||||
from neutron.callbacks import registry
|
||||
from neutron.callbacks import resources
|
||||
from neutron.common import utils
|
||||
from neutron.db import agents_db
|
||||
from neutron.db.agentschedulers_db import cfg
|
||||
@ -215,3 +218,23 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
|
||||
def test__cast_message(self):
|
||||
self.notifier._cast_message(mock.ANY, mock.ANY, mock.ANY)
|
||||
self.assertEqual(1, self.mock_cast.call_count)
|
||||
|
||||
def test__native_notification_unsubscribes(self):
|
||||
self.assertFalse(self.notifier._unsubscribed_resources)
|
||||
for res in (resources.PORT, resources.NETWORK, resources.SUBNET):
|
||||
self.notifier._unsubscribed_resources = []
|
||||
kwargs = {res: {}}
|
||||
registry.notify(res, events.AFTER_CREATE, self,
|
||||
context=mock.Mock(), **kwargs)
|
||||
# don't unsubscribe until all three types are observed
|
||||
self.assertEqual([], self.notifier._unsubscribed_resources)
|
||||
registry.notify(res, events.AFTER_UPDATE, self,
|
||||
context=mock.Mock(), **kwargs)
|
||||
self.assertEqual([], self.notifier._unsubscribed_resources)
|
||||
registry.notify(res, events.AFTER_DELETE, self,
|
||||
context=mock.Mock(), **kwargs)
|
||||
self.assertEqual([res], self.notifier._unsubscribed_resources)
|
||||
# after first time, no further unsubscribing should happen
|
||||
registry.notify(res, events.AFTER_CREATE, self,
|
||||
context=mock.Mock(), **kwargs)
|
||||
self.assertEqual([res], self.notifier._unsubscribed_resources)
|
||||
|
@ -1359,7 +1359,7 @@ class OvsDhcpAgentNotifierTestCase(test_agent.AgentDBTestMixIn,
|
||||
mock.ANY, 'agent_updated',
|
||||
{'admin_state_up': False}, DHCP_HOSTA)
|
||||
|
||||
def _network_port_create(
|
||||
def _api_network_port_create(
|
||||
self, hosts, gateway=constants.ATTR_NOT_SPECIFIED, owner=None):
|
||||
for host in hosts:
|
||||
helpers.register_dhcp_agent(host)
|
||||
@ -1374,6 +1374,22 @@ class OvsDhcpAgentNotifierTestCase(test_agent.AgentDBTestMixIn,
|
||||
with self.port(subnet=subnet1) as port:
|
||||
return [net1, subnet1, port]
|
||||
|
||||
def _network_port_create(self, *args, **kwargs):
|
||||
net, sub, port = self._api_network_port_create(*args, **kwargs)
|
||||
|
||||
dhcp_notifier = self.plugin.agent_notifiers[constants.AGENT_TYPE_DHCP]
|
||||
if (not hasattr(dhcp_notifier, 'uses_native_notifications') or
|
||||
not all(dhcp_notifier.uses_native_notifications[r]['create']
|
||||
for r in ('port', 'subnet', 'network'))):
|
||||
return net, sub, port
|
||||
# since plugin has native dhcp notifications, the payloads will be the
|
||||
# same as the getter outputs
|
||||
ctx = context.get_admin_context()
|
||||
net['network'] = self.plugin.get_network(ctx, net['network']['id'])
|
||||
sub['subnet'] = self.plugin.get_subnet(ctx, sub['subnet']['id'])
|
||||
port['port'] = self.plugin.get_port(ctx, port['port']['id'])
|
||||
return net, sub, port
|
||||
|
||||
def _notification_mocks(self, hosts, net, subnet, port):
|
||||
host_calls = {}
|
||||
for host in hosts:
|
||||
|
@ -780,6 +780,7 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
|
||||
self.assertTrue(sg_member_update.called)
|
||||
|
||||
def test_update_port_status_with_network(self):
|
||||
registry.clear() # don't care about callback behavior
|
||||
ctx = context.get_admin_context()
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
with self.port() as port:
|
||||
|
@ -20,7 +20,6 @@ from oslo_utils import uuidutils
|
||||
|
||||
from neutron.common import exceptions as c_exc
|
||||
from neutron import context
|
||||
from neutron.plugins.common import utils
|
||||
from neutron.services.auto_allocate import db
|
||||
from neutron.services.auto_allocate import exceptions
|
||||
from neutron.tests.unit import testlib_api
|
||||
@ -160,14 +159,14 @@ class AutoAllocateTestCase(testlib_api.SqlTestCaseLight):
|
||||
provisioning_exception)
|
||||
|
||||
def test__save_with_provisioning_error(self):
|
||||
with mock.patch.object(utils, "update_network", side_effect=Exception):
|
||||
with testtools.ExpectedException(
|
||||
exceptions.UnknownProvisioningError) as e:
|
||||
self.mixin._save(self.ctx, 'foo_t', 'foo_n', 'foo_r',
|
||||
[{'id': 'foo_s'}])
|
||||
self.assertEqual('foo_n', e.network_id)
|
||||
self.assertEqual('foo_r', e.router_id)
|
||||
self.assertEqual([{'id': 'foo_s'}], e.subnets)
|
||||
self.mixin._core_plugin.update_network.side_effect = Exception
|
||||
with testtools.ExpectedException(
|
||||
exceptions.UnknownProvisioningError) as e:
|
||||
self.mixin._save(self.ctx, 'foo_t', 'foo_n', 'foo_r',
|
||||
[{'id': 'foo_s'}])
|
||||
self.assertEqual('foo_n', e.network_id)
|
||||
self.assertEqual('foo_r', e.router_id)
|
||||
self.assertEqual([{'id': 'foo_s'}], e.subnets)
|
||||
|
||||
def test__provision_external_connectivity_with_provisioning_error(self):
|
||||
self.mixin._l3_plugin.create_router.side_effect = Exception
|
||||
|
Loading…
Reference in New Issue
Block a user