Merge "Make DHCP notifier use core resource events"
This commit is contained in:
commit
27eccafa5b
@ -52,6 +52,7 @@ class DhcpAgentNotifyAPI(object):
|
|||||||
'port.delete.end']
|
'port.delete.end']
|
||||||
|
|
||||||
def __init__(self, topic=topics.DHCP_AGENT, plugin=None):
|
def __init__(self, topic=topics.DHCP_AGENT, plugin=None):
|
||||||
|
self._unsubscribed_resources = []
|
||||||
self._plugin = plugin
|
self._plugin = plugin
|
||||||
target = oslo_messaging.Target(topic=topic, version='1.0')
|
target = oslo_messaging.Target(topic=topic, version='1.0')
|
||||||
self.client = n_rpc.get_client(target)
|
self.client = n_rpc.get_client(target)
|
||||||
@ -69,9 +70,22 @@ class DhcpAgentNotifyAPI(object):
|
|||||||
resources.SUBNET,
|
resources.SUBNET,
|
||||||
resources.SUBNETS,
|
resources.SUBNETS,
|
||||||
)
|
)
|
||||||
|
if not cfg.CONF.dhcp_agent_notification:
|
||||||
|
return
|
||||||
for resource in callback_resources:
|
for resource in callback_resources:
|
||||||
registry.subscribe(self._send_dhcp_notification,
|
registry.subscribe(self._send_dhcp_notification,
|
||||||
resource, events.BEFORE_RESPONSE)
|
resource, events.BEFORE_RESPONSE)
|
||||||
|
self.uses_native_notifications = {}
|
||||||
|
for resource in (resources.NETWORK, resources.PORT, resources.SUBNET):
|
||||||
|
self.uses_native_notifications[resource] = {'create': False,
|
||||||
|
'update': False,
|
||||||
|
'delete': False}
|
||||||
|
registry.subscribe(self._native_event_send_dhcp_notification,
|
||||||
|
resource, events.AFTER_CREATE)
|
||||||
|
registry.subscribe(self._native_event_send_dhcp_notification,
|
||||||
|
resource, events.AFTER_UPDATE)
|
||||||
|
registry.subscribe(self._native_event_send_dhcp_notification,
|
||||||
|
resource, events.AFTER_DELETE)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def plugin(self):
|
def plugin(self):
|
||||||
@ -212,16 +226,37 @@ class DhcpAgentNotifyAPI(object):
|
|||||||
{'port_id': kwargs['port']['id']},
|
{'port_id': kwargs['port']['id']},
|
||||||
kwargs['port']['network_id'])
|
kwargs['port']['network_id'])
|
||||||
|
|
||||||
|
def _native_event_send_dhcp_notification(self, resource, event, trigger,
|
||||||
|
context, **kwargs):
|
||||||
|
action = event.replace('after_', '')
|
||||||
|
# we unsubscribe the _send_dhcp_notification method now that we know
|
||||||
|
# the loaded core plugin emits native resource events
|
||||||
|
if resource not in self._unsubscribed_resources:
|
||||||
|
self.uses_native_notifications[resource][action] = True
|
||||||
|
if all(self.uses_native_notifications[resource].values()):
|
||||||
|
# only unsubscribe the API level listener if we are
|
||||||
|
# receiving all event types for this resource
|
||||||
|
self._unsubscribed_resources.append(resource)
|
||||||
|
registry.unsubscribe_by_resource(self._send_dhcp_notification,
|
||||||
|
resource)
|
||||||
|
method_name = '.'.join((resource, action, 'end'))
|
||||||
|
payload = kwargs[resource]
|
||||||
|
data = {resource: payload}
|
||||||
|
self.notify(context, data, method_name)
|
||||||
|
|
||||||
def _send_dhcp_notification(self, resource, event, trigger, context=None,
|
def _send_dhcp_notification(self, resource, event, trigger, context=None,
|
||||||
data=None, method_name=None, collection=None,
|
data=None, method_name=None, collection=None,
|
||||||
**kwargs):
|
action='', **kwargs):
|
||||||
if cfg.CONF.dhcp_agent_notification:
|
action = action.split('_')[0]
|
||||||
if collection and collection in data:
|
if (resource in self.uses_native_notifications and
|
||||||
for body in data[collection]:
|
self.uses_native_notifications[resource][action]):
|
||||||
item = {resource: body}
|
return
|
||||||
self.notify(context, item, method_name)
|
if collection and collection in data:
|
||||||
else:
|
for body in data[collection]:
|
||||||
self.notify(context, data, method_name)
|
item = {resource: body}
|
||||||
|
self.notify(context, item, method_name)
|
||||||
|
else:
|
||||||
|
self.notify(context, data, method_name)
|
||||||
|
|
||||||
def notify(self, context, data, method_name):
|
def notify(self, context, data, method_name):
|
||||||
# data is {'key' : 'value'} with only one key
|
# data is {'key' : 'value'} with only one key
|
||||||
|
@ -47,6 +47,8 @@ class NotifierHook(hooks.PecanHook):
|
|||||||
self._nova_notifier.send_network_change(action_resource, *args)
|
self._nova_notifier.send_network_change(action_resource, *args)
|
||||||
|
|
||||||
def _notify_dhcp_agent(self, context, resource_name, action, resources):
|
def _notify_dhcp_agent(self, context, resource_name, action, resources):
|
||||||
|
# NOTE(kevinbenton): we should remove this whole method in Ocata and
|
||||||
|
# make plugins emit the core resource events
|
||||||
plugin = manager.NeutronManager.get_plugin_for_resource(resource_name)
|
plugin = manager.NeutronManager.get_plugin_for_resource(resource_name)
|
||||||
notifier_method = '%s.%s.end' % (resource_name, action)
|
notifier_method = '%s.%s.end' % (resource_name, action)
|
||||||
# use plugin's dhcp notifier, if this is already instantiated
|
# use plugin's dhcp notifier, if this is already instantiated
|
||||||
@ -55,6 +57,10 @@ class NotifierHook(hooks.PecanHook):
|
|||||||
agent_notifiers.get(constants.AGENT_TYPE_DHCP) or
|
agent_notifiers.get(constants.AGENT_TYPE_DHCP) or
|
||||||
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||||
)
|
)
|
||||||
|
native_map = getattr(dhcp_agent_notifier, 'uses_native_notifications',
|
||||||
|
{})
|
||||||
|
if native_map.get(resource_name, {}).get(action):
|
||||||
|
return
|
||||||
# The DHCP Agent does not accept bulk notifications
|
# The DHCP Agent does not accept bulk notifications
|
||||||
for resource in resources:
|
for resource in resources:
|
||||||
item = {resource_name: resource}
|
item = {resource_name: resource}
|
||||||
|
@ -19,6 +19,7 @@ Common utilities and helper functions for OpenStack Networking Plugins.
|
|||||||
import contextlib
|
import contextlib
|
||||||
import hashlib
|
import hashlib
|
||||||
|
|
||||||
|
import debtcollector
|
||||||
from neutron_lib import constants as n_const
|
from neutron_lib import constants as n_const
|
||||||
from neutron_lib import exceptions
|
from neutron_lib import exceptions
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
@ -172,6 +173,10 @@ def create_network(core_plugin, context, net, check_allow_post=True):
|
|||||||
return core_plugin.create_network(context, {'network': net_data})
|
return core_plugin.create_network(context, {'network': net_data})
|
||||||
|
|
||||||
|
|
||||||
|
@debtcollector.removals.remove(
|
||||||
|
message="This will be removed in the O cycle. "
|
||||||
|
"Please call update_network directly on the plugin."
|
||||||
|
)
|
||||||
def update_network(core_plugin, context, network_id, net_data):
|
def update_network(core_plugin, context, network_id, net_data):
|
||||||
network = core_plugin.update_network(
|
network = core_plugin.update_network(
|
||||||
context, network_id, {resources.NETWORK: net_data})
|
context, network_id, {resources.NETWORK: net_data})
|
||||||
|
@ -341,9 +341,9 @@ class AutoAllocatedTopologyMixin(common_db_mixin.CommonDbMixin):
|
|||||||
tenant_id=tenant_id,
|
tenant_id=tenant_id,
|
||||||
network_id=network_id,
|
network_id=network_id,
|
||||||
router_id=router_id))
|
router_id=router_id))
|
||||||
p_utils.update_network(
|
self.core_plugin.update_network(
|
||||||
self.core_plugin, context,
|
context, network_id,
|
||||||
network_id, {'admin_state_up': True})
|
{'network': {'admin_state_up': True}})
|
||||||
except db_exc.DBDuplicateEntry:
|
except db_exc.DBDuplicateEntry:
|
||||||
LOG.error(_LE("Multiple auto-allocated networks detected for "
|
LOG.error(_LE("Multiple auto-allocated networks detected for "
|
||||||
"tenant %(tenant)s. Attempting clean up for "
|
"tenant %(tenant)s. Attempting clean up for "
|
||||||
|
@ -222,7 +222,7 @@ class TestPolicyEnforcementHook(test_functional.PecanFunctionalTest):
|
|||||||
self.assertNotIn('restricted_attr', json_response['mehs'][0])
|
self.assertNotIn('restricted_attr', json_response['mehs'][0])
|
||||||
|
|
||||||
|
|
||||||
class TestDHCPNotifierHook(test_functional.PecanFunctionalTest):
|
class DHCPNotifierTestBase(test_functional.PecanFunctionalTest):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
# the DHCP notifier needs to be mocked so that correct operations can
|
# the DHCP notifier needs to be mocked so that correct operations can
|
||||||
@ -232,29 +232,42 @@ class TestDHCPNotifierHook(test_functional.PecanFunctionalTest):
|
|||||||
patcher = mock.patch('neutron.api.rpc.agentnotifiers.'
|
patcher = mock.patch('neutron.api.rpc.agentnotifiers.'
|
||||||
'dhcp_rpc_agent_api.DhcpAgentNotifyAPI.notify')
|
'dhcp_rpc_agent_api.DhcpAgentNotifyAPI.notify')
|
||||||
self.mock_notifier = patcher.start()
|
self.mock_notifier = patcher.start()
|
||||||
super(TestDHCPNotifierHook, self).setUp()
|
super(DHCPNotifierTestBase, self).setUp()
|
||||||
|
|
||||||
|
|
||||||
|
class TestDHCPNotifierHookNegative(DHCPNotifierTestBase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
cfg.CONF.set_override('dhcp_agent_notification', False)
|
||||||
|
super(TestDHCPNotifierHookNegative, self).setUp()
|
||||||
|
|
||||||
def test_dhcp_notifications_disabled(self):
|
def test_dhcp_notifications_disabled(self):
|
||||||
cfg.CONF.set_override('dhcp_agent_notification', False)
|
|
||||||
self.app.post_json(
|
self.app.post_json(
|
||||||
'/v2.0/networks.json',
|
'/v2.0/networks.json',
|
||||||
params={'network': {'name': 'meh'}},
|
params={'network': {'name': 'meh'}},
|
||||||
headers={'X-Project-Id': 'tenid'})
|
headers={'X-Project-Id': 'tenid'})
|
||||||
self.assertEqual(0, self.mock_notifier.call_count)
|
self.assertEqual(0, self.mock_notifier.call_count)
|
||||||
|
|
||||||
|
|
||||||
|
class TestDHCPNotifierHook(DHCPNotifierTestBase):
|
||||||
|
|
||||||
def test_get_does_not_trigger_notification(self):
|
def test_get_does_not_trigger_notification(self):
|
||||||
self.do_request('/v2.0/networks', tenant_id='tenid')
|
self.do_request('/v2.0/networks', tenant_id='tenid')
|
||||||
self.assertEqual(0, self.mock_notifier.call_count)
|
self.assertEqual(0, self.mock_notifier.call_count)
|
||||||
|
|
||||||
def test_post_put_delete_triggers_notification(self):
|
def test_post_put_delete_triggers_notification(self):
|
||||||
|
ctx = context.get_admin_context()
|
||||||
|
plugin = manager.NeutronManager.get_plugin()
|
||||||
|
|
||||||
req_headers = {'X-Project-Id': 'tenid', 'X-Roles': 'admin'}
|
req_headers = {'X-Project-Id': 'tenid', 'X-Roles': 'admin'}
|
||||||
response = self.app.post_json(
|
response = self.app.post_json(
|
||||||
'/v2.0/networks.json',
|
'/v2.0/networks.json',
|
||||||
params={'network': {'name': 'meh'}}, headers=req_headers)
|
params={'network': {'name': 'meh'}}, headers=req_headers)
|
||||||
self.assertEqual(201, response.status_int)
|
self.assertEqual(201, response.status_int)
|
||||||
json_body = jsonutils.loads(response.body)
|
json_body = jsonutils.loads(response.body)
|
||||||
|
net = {'network': plugin.get_network(ctx, json_body['network']['id'])}
|
||||||
self.assertEqual(1, self.mock_notifier.call_count)
|
self.assertEqual(1, self.mock_notifier.call_count)
|
||||||
self.assertEqual(mock.call(mock.ANY, json_body, 'network.create.end'),
|
self.assertEqual(mock.call(mock.ANY, net, 'network.create.end'),
|
||||||
self.mock_notifier.mock_calls[-1])
|
self.mock_notifier.mock_calls[-1])
|
||||||
network_id = json_body['network']['id']
|
network_id = json_body['network']['id']
|
||||||
|
|
||||||
@ -264,8 +277,9 @@ class TestDHCPNotifierHook(test_functional.PecanFunctionalTest):
|
|||||||
headers=req_headers)
|
headers=req_headers)
|
||||||
self.assertEqual(200, response.status_int)
|
self.assertEqual(200, response.status_int)
|
||||||
json_body = jsonutils.loads(response.body)
|
json_body = jsonutils.loads(response.body)
|
||||||
|
net = {'network': plugin.get_network(ctx, json_body['network']['id'])}
|
||||||
self.assertEqual(2, self.mock_notifier.call_count)
|
self.assertEqual(2, self.mock_notifier.call_count)
|
||||||
self.assertEqual(mock.call(mock.ANY, json_body, 'network.update.end'),
|
self.assertEqual(mock.call(mock.ANY, net, 'network.update.end'),
|
||||||
self.mock_notifier.mock_calls[-1])
|
self.mock_notifier.mock_calls[-1])
|
||||||
|
|
||||||
response = self.app.delete(
|
response = self.app.delete(
|
||||||
|
@ -19,6 +19,9 @@ import mock
|
|||||||
from oslo_utils import timeutils
|
from oslo_utils import timeutils
|
||||||
|
|
||||||
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
|
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
|
||||||
|
from neutron.callbacks import events
|
||||||
|
from neutron.callbacks import registry
|
||||||
|
from neutron.callbacks import resources
|
||||||
from neutron.common import utils
|
from neutron.common import utils
|
||||||
from neutron.db import agents_db
|
from neutron.db import agents_db
|
||||||
from neutron.db.agentschedulers_db import cfg
|
from neutron.db.agentschedulers_db import cfg
|
||||||
@ -215,3 +218,23 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
|
|||||||
def test__cast_message(self):
|
def test__cast_message(self):
|
||||||
self.notifier._cast_message(mock.ANY, mock.ANY, mock.ANY)
|
self.notifier._cast_message(mock.ANY, mock.ANY, mock.ANY)
|
||||||
self.assertEqual(1, self.mock_cast.call_count)
|
self.assertEqual(1, self.mock_cast.call_count)
|
||||||
|
|
||||||
|
def test__native_notification_unsubscribes(self):
|
||||||
|
self.assertFalse(self.notifier._unsubscribed_resources)
|
||||||
|
for res in (resources.PORT, resources.NETWORK, resources.SUBNET):
|
||||||
|
self.notifier._unsubscribed_resources = []
|
||||||
|
kwargs = {res: {}}
|
||||||
|
registry.notify(res, events.AFTER_CREATE, self,
|
||||||
|
context=mock.Mock(), **kwargs)
|
||||||
|
# don't unsubscribe until all three types are observed
|
||||||
|
self.assertEqual([], self.notifier._unsubscribed_resources)
|
||||||
|
registry.notify(res, events.AFTER_UPDATE, self,
|
||||||
|
context=mock.Mock(), **kwargs)
|
||||||
|
self.assertEqual([], self.notifier._unsubscribed_resources)
|
||||||
|
registry.notify(res, events.AFTER_DELETE, self,
|
||||||
|
context=mock.Mock(), **kwargs)
|
||||||
|
self.assertEqual([res], self.notifier._unsubscribed_resources)
|
||||||
|
# after first time, no further unsubscribing should happen
|
||||||
|
registry.notify(res, events.AFTER_CREATE, self,
|
||||||
|
context=mock.Mock(), **kwargs)
|
||||||
|
self.assertEqual([res], self.notifier._unsubscribed_resources)
|
||||||
|
@ -1359,7 +1359,7 @@ class OvsDhcpAgentNotifierTestCase(test_agent.AgentDBTestMixIn,
|
|||||||
mock.ANY, 'agent_updated',
|
mock.ANY, 'agent_updated',
|
||||||
{'admin_state_up': False}, DHCP_HOSTA)
|
{'admin_state_up': False}, DHCP_HOSTA)
|
||||||
|
|
||||||
def _network_port_create(
|
def _api_network_port_create(
|
||||||
self, hosts, gateway=constants.ATTR_NOT_SPECIFIED, owner=None):
|
self, hosts, gateway=constants.ATTR_NOT_SPECIFIED, owner=None):
|
||||||
for host in hosts:
|
for host in hosts:
|
||||||
helpers.register_dhcp_agent(host)
|
helpers.register_dhcp_agent(host)
|
||||||
@ -1374,6 +1374,22 @@ class OvsDhcpAgentNotifierTestCase(test_agent.AgentDBTestMixIn,
|
|||||||
with self.port(subnet=subnet1) as port:
|
with self.port(subnet=subnet1) as port:
|
||||||
return [net1, subnet1, port]
|
return [net1, subnet1, port]
|
||||||
|
|
||||||
|
def _network_port_create(self, *args, **kwargs):
|
||||||
|
net, sub, port = self._api_network_port_create(*args, **kwargs)
|
||||||
|
|
||||||
|
dhcp_notifier = self.plugin.agent_notifiers[constants.AGENT_TYPE_DHCP]
|
||||||
|
if (not hasattr(dhcp_notifier, 'uses_native_notifications') or
|
||||||
|
not all(dhcp_notifier.uses_native_notifications[r]['create']
|
||||||
|
for r in ('port', 'subnet', 'network'))):
|
||||||
|
return net, sub, port
|
||||||
|
# since plugin has native dhcp notifications, the payloads will be the
|
||||||
|
# same as the getter outputs
|
||||||
|
ctx = context.get_admin_context()
|
||||||
|
net['network'] = self.plugin.get_network(ctx, net['network']['id'])
|
||||||
|
sub['subnet'] = self.plugin.get_subnet(ctx, sub['subnet']['id'])
|
||||||
|
port['port'] = self.plugin.get_port(ctx, port['port']['id'])
|
||||||
|
return net, sub, port
|
||||||
|
|
||||||
def _notification_mocks(self, hosts, net, subnet, port):
|
def _notification_mocks(self, hosts, net, subnet, port):
|
||||||
host_calls = {}
|
host_calls = {}
|
||||||
for host in hosts:
|
for host in hosts:
|
||||||
|
@ -780,6 +780,7 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
|
|||||||
self.assertTrue(sg_member_update.called)
|
self.assertTrue(sg_member_update.called)
|
||||||
|
|
||||||
def test_update_port_status_with_network(self):
|
def test_update_port_status_with_network(self):
|
||||||
|
registry.clear() # don't care about callback behavior
|
||||||
ctx = context.get_admin_context()
|
ctx = context.get_admin_context()
|
||||||
plugin = manager.NeutronManager.get_plugin()
|
plugin = manager.NeutronManager.get_plugin()
|
||||||
with self.port() as port:
|
with self.port() as port:
|
||||||
|
@ -20,7 +20,6 @@ from oslo_utils import uuidutils
|
|||||||
|
|
||||||
from neutron.common import exceptions as c_exc
|
from neutron.common import exceptions as c_exc
|
||||||
from neutron import context
|
from neutron import context
|
||||||
from neutron.plugins.common import utils
|
|
||||||
from neutron.services.auto_allocate import db
|
from neutron.services.auto_allocate import db
|
||||||
from neutron.services.auto_allocate import exceptions
|
from neutron.services.auto_allocate import exceptions
|
||||||
from neutron.tests.unit import testlib_api
|
from neutron.tests.unit import testlib_api
|
||||||
@ -160,14 +159,14 @@ class AutoAllocateTestCase(testlib_api.SqlTestCaseLight):
|
|||||||
provisioning_exception)
|
provisioning_exception)
|
||||||
|
|
||||||
def test__save_with_provisioning_error(self):
|
def test__save_with_provisioning_error(self):
|
||||||
with mock.patch.object(utils, "update_network", side_effect=Exception):
|
self.mixin._core_plugin.update_network.side_effect = Exception
|
||||||
with testtools.ExpectedException(
|
with testtools.ExpectedException(
|
||||||
exceptions.UnknownProvisioningError) as e:
|
exceptions.UnknownProvisioningError) as e:
|
||||||
self.mixin._save(self.ctx, 'foo_t', 'foo_n', 'foo_r',
|
self.mixin._save(self.ctx, 'foo_t', 'foo_n', 'foo_r',
|
||||||
[{'id': 'foo_s'}])
|
[{'id': 'foo_s'}])
|
||||||
self.assertEqual('foo_n', e.network_id)
|
self.assertEqual('foo_n', e.network_id)
|
||||||
self.assertEqual('foo_r', e.router_id)
|
self.assertEqual('foo_r', e.router_id)
|
||||||
self.assertEqual([{'id': 'foo_s'}], e.subnets)
|
self.assertEqual([{'id': 'foo_s'}], e.subnets)
|
||||||
|
|
||||||
def test__provision_external_connectivity_with_provisioning_error(self):
|
def test__provision_external_connectivity_with_provisioning_error(self):
|
||||||
self.mixin._l3_plugin.create_router.side_effect = Exception
|
self.mixin._l3_plugin.create_router.side_effect = Exception
|
||||||
|
Loading…
Reference in New Issue
Block a user