Merge "use callback payloads for SUBNET"

This commit is contained in:
Zuul 2021-06-25 12:57:25 +00:00 committed by Gerrit Code Review
commit 7fd6d169f5
9 changed files with 102 additions and 69 deletions

View File

@ -114,7 +114,7 @@ class DhcpAgentNotifyAPI(object):
callback = self._native_event_send_dhcp_notification callback = self._native_event_send_dhcp_notification
# TODO(boden): remove shim below once all events use payloads # TODO(boden): remove shim below once all events use payloads
if resource == resources.NETWORK: if resource in [resources.NETWORK, resources.SUBNET]:
callback = self._native_event_send_dhcp_notification_payload callback = self._native_event_send_dhcp_notification_payload
if resource == resources.PORT: if resource == resources.PORT:
registry.subscribe( registry.subscribe(

View File

@ -500,10 +500,13 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
# cleanup if a network-owned port snuck in without failing # cleanup if a network-owned port snuck in without failing
for subnet in subnets: for subnet in subnets:
self._delete_subnet(context, subnet) self._delete_subnet(context, subnet)
# TODO(ralonsoh): use payloads registry.publish(
registry.notify(resources.SUBNET, events.AFTER_DELETE, resources.SUBNET, events.AFTER_DELETE, self,
self, context=context, subnet=subnet.to_dict(), payload=events.DBEventPayload(
for_net_delete=True) context,
resource_id=subnet['id'],
metadata={'for_net_delete': True},
states=(subnet.to_dict(),)))
with db_api.CONTEXT_WRITER.using(context): with db_api.CONTEXT_WRITER.using(context):
network_db = self._get_network(context, id) network_db = self._get_network(context, id)
network = self._make_network_dict(network_db, context=context) network = self._make_network_dict(network_db, context=context)
@ -942,10 +945,12 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
if gateway_ip: if gateway_ip:
self.ipam.validate_gw_out_of_pools(gateway_ip, pools) self.ipam.validate_gw_out_of_pools(gateway_ip, pools)
kwargs = {'context': context, 'original_subnet': orig, registry.publish(
'request': s} resources.SUBNET, events.BEFORE_UPDATE, self,
registry.notify(resources.SUBNET, events.BEFORE_UPDATE, payload=events.DBEventPayload(
self, **kwargs) context,
resource_id=id,
states=(orig, s,)))
with db_api.CONTEXT_WRITER.using(context): with db_api.CONTEXT_WRITER.using(context):
subnet, changes = self.ipam.update_db_subnet( subnet, changes = self.ipam.update_db_subnet(
@ -996,10 +1001,13 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
if routers: if routers:
self.l3_rpc_notifier.routers_updated(context, routers) self.l3_rpc_notifier.routers_updated(context, routers)
kwargs = {'context': context, 'subnet': result, registry.publish(
'original_subnet': orig} resources.SUBNET, events.AFTER_UPDATE, self,
registry.notify(resources.SUBNET, events.AFTER_UPDATE, self, payload=events.DBEventPayload(
**kwargs) context,
resource_id=result['id'],
states=(orig, result,)))
return result return result
def _subnet_get_user_allocation(self, context, subnet_id): def _subnet_get_user_allocation(self, context, subnet_id):
@ -1053,15 +1061,21 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
payload=events.DBEventPayload(context, resource_id=subnet.id)) payload=events.DBEventPayload(context, resource_id=subnet.id))
self._remove_subnet_ip_allocations_from_ports(context, subnet) self._remove_subnet_ip_allocations_from_ports(context, subnet)
self._delete_subnet(context, subnet) self._delete_subnet(context, subnet)
registry.notify(resources.SUBNET, events.AFTER_DELETE, registry.publish(
self, context=context, subnet=subnet.to_dict()) resources.SUBNET, events.AFTER_DELETE, self,
payload=events.DBEventPayload(
context,
resource_id=id,
states=(subnet.to_dict(),)))
def _delete_subnet(self, context, subnet): def _delete_subnet(self, context, subnet):
with db_api.exc_to_retry(sql_exc.IntegrityError), \ with db_api.exc_to_retry(sql_exc.IntegrityError), \
db_api.CONTEXT_WRITER.using(context): db_api.CONTEXT_WRITER.using(context):
registry.notify(resources.SUBNET, events.PRECOMMIT_DELETE, registry.publish(
self, context=context, subnet_id=subnet.id, resources.SUBNET, events.PRECOMMIT_DELETE, self,
subnet_obj=subnet) payload=events.DBEventPayload(context,
resource_id=subnet.id,
states=(subnet,)))
subnet.delete() subnet.delete()
# Delete related ipam subnet manually, # Delete related ipam subnet manually,
# since there is no FK relationship # since there is no FK relationship

View File

@ -1930,13 +1930,13 @@ class L3RpcNotifierMixin(object):
@staticmethod @staticmethod
@registry.receives(resources.SUBNET, [events.AFTER_UPDATE]) @registry.receives(resources.SUBNET, [events.AFTER_UPDATE])
def _notify_subnet_gateway_ip_update(resource, event, trigger, **kwargs): def _notify_subnet_gateway_ip_update(resource, event, trigger, payload):
l3plugin = directory.get_plugin(plugin_constants.L3) l3plugin = directory.get_plugin(plugin_constants.L3)
if not l3plugin: if not l3plugin:
return return
context = kwargs['context'] context = payload.context
orig = kwargs['original_subnet'] orig = payload.states[0]
updated = kwargs['subnet'] updated = payload.latest_state
if orig['gateway_ip'] == updated['gateway_ip']: if orig['gateway_ip'] == updated['gateway_ip']:
return return
network_id = updated['network_id'] network_id = updated['network_id']

View File

@ -39,7 +39,8 @@ LOG = logging.getLogger(__name__)
class _ObjectChangeHandler(object): class _ObjectChangeHandler(object):
_PAYLOAD_RESOURCES = (resources.NETWORK, _PAYLOAD_RESOURCES = (resources.NETWORK,
resources.ADDRESS_GROUP, resources.ADDRESS_GROUP,
resources.SECURITY_GROUP_RULE,) resources.SECURITY_GROUP_RULE,
resources.SUBNET)
def __init__(self, resource, object_class, resource_push_api): def __init__(self, resource, object_class, resource_push_api):
self._resource = resource self._resource = resource

View File

@ -1267,8 +1267,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def _before_create_subnet(self, context, subnet): def _before_create_subnet(self, context, subnet):
subnet_data = subnet[subnet_def.RESOURCE_NAME] subnet_data = subnet[subnet_def.RESOURCE_NAME]
registry.notify(resources.SUBNET, events.BEFORE_CREATE, self, registry.publish(
context=context, subnet=subnet_data) resources.SUBNET, events.BEFORE_CREATE, self,
payload=events.DBEventPayload(context,
states=(subnet_data,)))
def _create_subnet_db(self, context, subnet): def _create_subnet_db(self, context, subnet):
with db_api.CONTEXT_WRITER.using(context): with db_api.CONTEXT_WRITER.using(context):
@ -1299,8 +1301,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# add network to subnet dict to save a DB call on dhcp notification # add network to subnet dict to save a DB call on dhcp notification
result['network'] = mech_context.network.current result['network'] = mech_context.network.current
kwargs = {'context': context, 'subnet': result} registry.publish(
registry.notify(resources.SUBNET, events.AFTER_CREATE, self, **kwargs) resources.SUBNET, events.AFTER_CREATE, self,
payload=events.DBEventPayload(context,
resource_id=result['id'],
states=(result,)))
try: try:
self.mechanism_manager.create_subnet_postcommit(mech_context) self.mechanism_manager.create_subnet_postcommit(mech_context)
except ml2_exc.MechanismDriverError: except ml2_exc.MechanismDriverError:
@ -1351,8 +1356,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# handler deleting a subresource of the subnet. # handler deleting a subresource of the subnet.
@registry.receives(resources.SUBNET, [events.PRECOMMIT_DELETE], priority=0) @registry.receives(resources.SUBNET, [events.PRECOMMIT_DELETE], priority=0)
def _subnet_delete_precommit_handler(self, rtype, event, trigger, def _subnet_delete_precommit_handler(self, rtype, event, trigger,
context, subnet_id, **kwargs): payload=None):
subnet_obj = (kwargs.get('subnet_obj') or context = payload.context
subnet_id = payload.resource_id
subnet_obj = (payload.latest_state or
self._get_subnet_object(context, subnet_id)) self._get_subnet_object(context, subnet_id))
subnet = self._make_subnet_dict(subnet_obj, context=context) subnet = self._make_subnet_dict(subnet_obj, context=context)
mech_context = driver_context.SubnetContext(self, context, mech_context = driver_context.SubnetContext(self, context,
@ -1364,7 +1371,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
@registry.receives(resources.SUBNET, [events.AFTER_DELETE]) @registry.receives(resources.SUBNET, [events.AFTER_DELETE])
def _subnet_delete_after_delete_handler(self, rtype, event, trigger, def _subnet_delete_after_delete_handler(self, rtype, event, trigger,
context, subnet, **kwargs): payload):
context = payload.context
try: try:
self.mechanism_manager.delete_subnet_postcommit( self.mechanism_manager.delete_subnet_postcommit(
context._mech_context) context._mech_context)

View File

@ -440,13 +440,13 @@ class OVNL3RouterPlugin(service_base.ServicePluginBase,
@staticmethod @staticmethod
@registry.receives(resources.SUBNET, [events.AFTER_UPDATE]) @registry.receives(resources.SUBNET, [events.AFTER_UPDATE])
def _subnet_update(resource, event, trigger, **kwargs): def _subnet_update(resource, event, trigger, payload):
l3plugin = directory.get_plugin(plugin_constants.L3) l3plugin = directory.get_plugin(plugin_constants.L3)
if not l3plugin: if not l3plugin:
return return
context = kwargs['context'] context = payload.context
orig = kwargs['original_subnet'] orig = payload.states[0]
current = kwargs['subnet'] current = payload.latest_state
orig_gw_ip = orig['gateway_ip'] orig_gw_ip = orig['gateway_ip']
current_gw_ip = current['gateway_ip'] current_gw_ip = current['gateway_ip']
if orig_gw_ip == current_gw_ip: if orig_gw_ip == current_gw_ip:

View File

@ -225,8 +225,9 @@ class NovaSegmentNotifier(object):
segment_host_mappings=segment_host_mappings)) segment_host_mappings=segment_host_mappings))
@registry.receives(resources.SUBNET, [events.AFTER_CREATE]) @registry.receives(resources.SUBNET, [events.AFTER_CREATE])
def _notify_subnet_created(self, resource, event, trigger, context, def _notify_subnet_created(self, resource, event, trigger, payload):
subnet, **kwargs): context = payload.context
subnet = payload.latest_state
segment_id = subnet.get('segment_id') segment_id = subnet.get('segment_id')
if not segment_id or subnet['ip_version'] != constants.IP_VERSION_4: if not segment_id or subnet['ip_version'] != constants.IP_VERSION_4:
return return
@ -304,8 +305,10 @@ class NovaSegmentNotifier(object):
return total, reserved return total, reserved
@registry.receives(resources.SUBNET, [events.AFTER_UPDATE]) @registry.receives(resources.SUBNET, [events.AFTER_UPDATE])
def _notify_subnet_updated(self, resource, event, trigger, context, def _notify_subnet_updated(self, resource, event, trigger, payload):
subnet, original_subnet, **kwargs): context = payload.context
original_subnet = payload.states[0]
subnet = payload.latest_state
segment_id = subnet.get('segment_id') segment_id = subnet.get('segment_id')
original_segment_id = original_subnet.get('segment_id') original_segment_id = original_subnet.get('segment_id')
if not segment_id or subnet['ip_version'] != constants.IP_VERSION_4: if not segment_id or subnet['ip_version'] != constants.IP_VERSION_4:
@ -341,9 +344,10 @@ class NovaSegmentNotifier(object):
segment_host_mappings=segment_host_mappings)) segment_host_mappings=segment_host_mappings))
@registry.receives(resources.SUBNET, [events.AFTER_DELETE]) @registry.receives(resources.SUBNET, [events.AFTER_DELETE])
def _notify_subnet_deleted(self, resource, event, trigger, context, def _notify_subnet_deleted(self, resource, event, trigger, payload):
subnet, **kwargs): context = payload.context
if kwargs.get(db.FOR_NET_DELETE): subnet = payload.latest_state
if payload.metadata.get(db.FOR_NET_DELETE):
return # skip segment RP update if it is going to be deleted return # skip segment RP update if it is going to be deleted
segment_id = subnet.get('segment_id') segment_id = subnet.get('segment_id')
@ -613,8 +617,10 @@ class SegmentHostRoutes(object):
'host_routes': calc_host_routes}}) 'host_routes': calc_host_routes}})
@registry.receives(resources.SUBNET, [events.BEFORE_CREATE]) @registry.receives(resources.SUBNET, [events.BEFORE_CREATE])
def host_routes_before_create(self, resource, event, trigger, context, def host_routes_before_create(self, resource, event, trigger,
subnet, **kwargs): payload):
context = payload.context
subnet = payload.latest_state
segment_id = subnet.get('segment_id') segment_id = subnet.get('segment_id')
gateway_ip = subnet.get('gateway_ip') gateway_ip = subnet.get('gateway_ip')
if validators.is_attr_set(subnet.get('host_routes')): if validators.is_attr_set(subnet.get('host_routes')):
@ -635,9 +641,11 @@ class SegmentHostRoutes(object):
subnet['host_routes'] = calc_host_routes subnet['host_routes'] = calc_host_routes
@registry.receives(resources.SUBNET, [events.BEFORE_UPDATE]) @registry.receives(resources.SUBNET, [events.BEFORE_UPDATE])
def host_routes_before_update(self, resource, event, trigger, **kwargs): def host_routes_before_update(self, resource, event, trigger,
context = kwargs['context'] payload):
subnet, original_subnet = kwargs['request'], kwargs['original_subnet'] context = payload.context
original_subnet = payload.states[0]
subnet = payload.latest_state
orig_segment_id = original_subnet.get('segment_id') orig_segment_id = original_subnet.get('segment_id')
segment_id = subnet.get('segment_id', orig_segment_id) segment_id = subnet.get('segment_id', orig_segment_id)
orig_gateway_ip = original_subnet.get('gateway_ip') orig_gateway_ip = original_subnet.get('gateway_ip')
@ -659,9 +667,10 @@ class SegmentHostRoutes(object):
subnet['host_routes'] = calc_host_routes subnet['host_routes'] = calc_host_routes
@registry.receives(resources.SUBNET, [events.AFTER_CREATE]) @registry.receives(resources.SUBNET, [events.AFTER_CREATE])
def host_routes_after_create(self, resource, event, trigger, **kwargs): def host_routes_after_create(self, resource, event, trigger,
context = kwargs['context'] payload):
subnet = kwargs['subnet'] context = payload.context
subnet = payload.latest_state
# If there are other subnets on the network and subnet has segment_id # If there are other subnets on the network and subnet has segment_id
# ensure host routes for all subnets are updated. # ensure host routes for all subnets are updated.
@ -671,11 +680,13 @@ class SegmentHostRoutes(object):
subnet['network_id']) subnet['network_id'])
@registry.receives(resources.SUBNET, [events.AFTER_DELETE]) @registry.receives(resources.SUBNET, [events.AFTER_DELETE])
def host_routes_after_delete(self, resource, event, trigger, context, def host_routes_after_delete(self, resource, event, trigger,
subnet, **kwargs): payload):
# If this is a routed network, remove any routes to this subnet on # If this is a routed network, remove any routes to this subnet on
# this networks remaining subnets. # this networks remaining subnets.
if kwargs.get(db.FOR_NET_DELETE): context = payload.context
subnet = payload.latest_state
if payload.metadata.get(db.FOR_NET_DELETE):
return # skip subnet update if the network is going to be deleted return # skip subnet update if the network is going to be deleted
if subnet.get('segment_id'): if subnet.get('segment_id'):

View File

@ -267,7 +267,7 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
def test__native_notification_unsubscribes(self): def test__native_notification_unsubscribes(self):
self.assertFalse(self.notifier._unsubscribed_resources) self.assertFalse(self.notifier._unsubscribed_resources)
for res in (resources.PORT, resources.SUBNET): for res in (resources.PORT,):
self.notifier._unsubscribed_resources = [] self.notifier._unsubscribed_resources = []
kwargs = {res: {}} kwargs = {res: {}}
if res == resources.PORT: if res == resources.PORT:
@ -295,7 +295,7 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
context=mock.Mock(), **kwargs) context=mock.Mock(), **kwargs)
self.assertEqual([res], self.notifier._unsubscribed_resources) self.assertEqual([res], self.notifier._unsubscribed_resources)
for res in [resources.NETWORK]: for res in (resources.NETWORK, resources.SUBNET):
self.notifier._unsubscribed_resources = [] self.notifier._unsubscribed_resources = []
registry.publish(res, events.AFTER_CREATE, self, registry.publish(res, events.AFTER_CREATE, self,
payload=events.DBEventPayload(mock.Mock())) payload=events.DBEventPayload(mock.Mock()))

View File

@ -709,11 +709,11 @@ class TestMl2SubnetsV2(test_plugin.TestSubnetsV2,
with self.subnet() as s: with self.subnet() as s:
before_create.assert_called_once_with( before_create.assert_called_once_with(
resources.SUBNET, events.BEFORE_CREATE, mock.ANY, resources.SUBNET, events.BEFORE_CREATE, mock.ANY,
context=mock.ANY, subnet=mock.ANY) payload=mock.ANY)
kwargs = before_create.mock_calls[0][2] payload = before_create.mock_calls[0][2]['payload']
self.assertEqual(s['subnet']['cidr'], kwargs['subnet']['cidr']) self.assertEqual(s['subnet']['cidr'], payload.latest_state['cidr'])
self.assertEqual(s['subnet']['network_id'], self.assertEqual(s['subnet']['network_id'],
kwargs['subnet']['network_id']) payload.latest_state['network_id'])
def test_subnet_after_create_callback(self): def test_subnet_after_create_callback(self):
after_create = mock.Mock() after_create = mock.Mock()
@ -721,9 +721,9 @@ class TestMl2SubnetsV2(test_plugin.TestSubnetsV2,
with self.subnet() as s: with self.subnet() as s:
after_create.assert_called_once_with( after_create.assert_called_once_with(
resources.SUBNET, events.AFTER_CREATE, mock.ANY, resources.SUBNET, events.AFTER_CREATE, mock.ANY,
context=mock.ANY, subnet=mock.ANY) payload=mock.ANY)
kwargs = after_create.mock_calls[0][2] payload = after_create.mock_calls[0][2]['payload']
self.assertEqual(s['subnet']['id'], kwargs['subnet']['id']) self.assertEqual(s['subnet']['id'], payload.resource_id)
def test_port_create_subnetnotfound(self): def test_port_create_subnetnotfound(self):
with self.network() as n: with self.network() as n:
@ -779,12 +779,11 @@ class TestMl2SubnetsV2(test_plugin.TestSubnetsV2,
self.deserialize(self.fmt, req.get_response(self.api)) self.deserialize(self.fmt, req.get_response(self.api))
after_update.assert_called_once_with( after_update.assert_called_once_with(
resources.SUBNET, events.AFTER_UPDATE, mock.ANY, resources.SUBNET, events.AFTER_UPDATE, mock.ANY,
context=mock.ANY, subnet=mock.ANY, payload=mock.ANY)
original_subnet=mock.ANY) payload = after_update.mock_calls[0][2]['payload']
kwargs = after_update.mock_calls[0][2]
self.assertEqual(s['subnet']['name'], self.assertEqual(s['subnet']['name'],
kwargs['original_subnet']['name']) payload.states[0]['name'])
self.assertEqual('updated', kwargs['subnet']['name']) self.assertEqual('updated', payload.latest_state['name'])
def test_subnet_after_delete_callback(self): def test_subnet_after_delete_callback(self):
after_delete = mock.Mock() after_delete = mock.Mock()
@ -794,9 +793,9 @@ class TestMl2SubnetsV2(test_plugin.TestSubnetsV2,
req.get_response(self.api) req.get_response(self.api)
after_delete.assert_called_once_with( after_delete.assert_called_once_with(
resources.SUBNET, events.AFTER_DELETE, mock.ANY, resources.SUBNET, events.AFTER_DELETE, mock.ANY,
context=mock.ANY, subnet=mock.ANY) payload=mock.ANY)
kwargs = after_delete.mock_calls[0][2] payload = after_delete.mock_calls[0][2]['payload']
self.assertEqual(s['subnet']['id'], kwargs['subnet']['id']) self.assertEqual(s['subnet']['id'], payload.latest_state['id'])
def test_delete_subnet_race_with_dhcp_port_creation(self): def test_delete_subnet_race_with_dhcp_port_creation(self):
with self.network() as network: with self.network() as network: