Add extension_manager and support for extensions in linuxbridge agent

There is extensions mechanism for l2 agents already but it was
implemented only for openvswitch l2 agent. This patch adds support for
such extensions also for linuxbridge agent.

This patch also adds support for network_update events received by the
agent via RPC. It is required because sometimes when a network is
updated (for example with a QoS policy is attached to it) all ports that
belong to the network should also be updated.

Change-Id: Ie81c818d0eb817b044a6df1cbddc5864f118fe3f
Partial-bug: 1468803
This commit is contained in:
Sławek Kapłoński 2015-11-26 23:31:11 +01:00 committed by Ihar Hrachyshka
parent 1f8e58f538
commit 23e0e29a2b
4 changed files with 221 additions and 24 deletions

View File

@ -19,3 +19,5 @@ LOCAL_VLAN_ID = -2
VXLAN_NONE = 'not_supported'
VXLAN_MCAST = 'multicast_flooding'
VXLAN_UCAST = 'unicast_flooding'
EXTENSION_DRIVER_TYPE = 'linuxbridge'

View File

@ -19,6 +19,7 @@
# Based on the structure of the OpenVSwitch agent in the
# Neutron OpenVSwitch Plugin.
import collections
import sys
import time
@ -35,6 +36,7 @@ from oslo_utils import excutils
from six import moves
from neutron._i18n import _LE, _LI, _LW
from neutron.agent.l2.extensions import manager as ext_manager
from neutron.agent.linux import bridge_lib
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
@ -670,7 +672,8 @@ class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# history
# 1.1 Support Security Group RPC
# 1.3 Added param devices_to_update to security_groups_provider_updated
target = oslo_messaging.Target(version='1.3')
# 1.4 Added support for network_update
target = oslo_messaging.Target(version='1.4')
def __init__(self, context, agent, sg_agent):
super(LinuxBridgeRpcCallbacks, self).__init__()
@ -708,6 +711,15 @@ class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.agent.updated_devices.add(tap_name)
LOG.debug("port_update RPC received for port: %s", port_id)
def network_update(self, context, **kwargs):
network_id = kwargs['network']['id']
LOG.debug("network_update message processed for network "
"%(network_id)s, with ports: %(ports)s",
{'network_id': network_id,
'ports': self.agent.network_ports[network_id]})
for port_data in self.agent.network_ports[network_id]:
self.agent.updated_devices.add(port_data['device'])
def fdb_add(self, context, fdb_entries):
LOG.debug("fdb_add received")
for network_id, values in fdb_entries.items():
@ -810,8 +822,24 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
def start(self):
self.prevent_arp_spoofing = cfg.CONF.AGENT.prevent_arp_spoofing
self.setup_linux_bridge(self.bridge_mappings, self.interface_mappings)
configurations = {'bridge_mappings': self.bridge_mappings,
'interface_mappings': self.interface_mappings}
# stores received port_updates and port_deletes for
# processing by the main loop
self.updated_devices = set()
# stores all configured ports on agent
self.network_ports = collections.defaultdict(list)
# flag to do a sync after revival
self.fullsync = False
self.context = context.get_admin_context_without_session()
self.setup_rpc(self.interface_mappings.values())
self.init_extension_manager(self.connection)
configurations = {
'bridge_mappings': self.bridge_mappings,
'interface_mappings': self.interface_mappings,
'extensions': self.ext_manager.names()
}
if self.br_mgr.vxlan_mode != lconst.VXLAN_NONE:
configurations['tunneling_ip'] = self.br_mgr.local_ip
configurations['tunnel_types'] = [p_const.TYPE_VXLAN]
@ -824,16 +852,11 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
'agent_type': constants.AGENT_TYPE_LINUXBRIDGE,
'start_flag': True}
# stores received port_updates for processing by the main loop
self.updated_devices = set()
# flag to do a sync after revival
self.fullsync = False
self.context = context.get_admin_context_without_session()
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
self.sg_agent = sg_rpc.SecurityGroupAgentRpc(self.context,
self.sg_plugin_rpc, defer_refresh_firewall=True)
self.setup_rpc(self.interface_mappings.values())
report_interval = cfg.CONF.AGENT.report_interval
if report_interval:
heartbeat = loopingcall.FixedIntervalLoopingCall(
self._report_state)
heartbeat.start(interval=report_interval)
self.daemon_loop()
def stop(self, graceful=True):
@ -871,6 +894,12 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
LOG.error(_LE("Unable to obtain MAC address for unique ID. "
"Agent terminated!"))
exit(1)
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
self.sg_agent = sg_rpc.SecurityGroupAgentRpc(
self.context, self.sg_plugin_rpc, defer_refresh_firewall=True)
self.agent_id = '%s%s' % ('lb', (mac.replace(":", "")))
LOG.info(_LI("RPC agent_id: %s"), self.agent_id)
@ -883,17 +912,21 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE],
[topics.NETWORK, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]
if cfg.CONF.VXLAN.l2_population:
consumers.append([topics.L2POPULATION, topics.UPDATE])
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
report_interval = cfg.CONF.AGENT.report_interval
if report_interval:
heartbeat = loopingcall.FixedIntervalLoopingCall(
self._report_state)
heartbeat.start(interval=report_interval)
def init_extension_manager(self, connection):
ext_manager.register_opts(cfg.CONF)
self.ext_manager = (
ext_manager.AgentExtensionsManager(cfg.CONF))
self.ext_manager.initialize(
connection, lconst.EXTENSION_DRIVER_TYPE)
def setup_linux_bridge(self, bridge_mappings, interface_mappings):
self.br_mgr = LinuxBridgeManager(bridge_mappings, interface_mappings)
@ -907,6 +940,22 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
else:
ip_lib.IPDevice(tap_name).link.set_down()
def _clean_network_ports(self, device):
for netid, ports_list in self.network_ports.items():
for port_data in ports_list:
if device == port_data['device']:
ports_list.remove(port_data)
if ports_list == []:
self.network_ports.pop(netid)
return port_data['port_id']
def _update_network_ports(self, network_id, port_id, device):
self._clean_network_ports(device)
self.network_ports[network_id].append({
"port_id": port_id,
"device": device
})
def process_network_devices(self, device_info):
resync_a = False
resync_b = False
@ -1006,6 +1055,10 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
device,
self.agent_id,
cfg.CONF.host)
self._update_network_ports(device_details['network_id'],
device_details['port_id'],
device_details['device'])
self.ext_manager.handle_port(self.context, device_details)
else:
LOG.info(_LI("Device %s not defined on plugin"), device)
return False
@ -1029,6 +1082,10 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
LOG.info(_LI("Port %s updated."), device)
else:
LOG.debug("Device %s not defined on plugin", device)
port_id = self._clean_network_ports(device)
self.ext_manager.delete_port(self.context,
{'device': device,
'port_id': port_id})
if self.prevent_arp_spoofing:
arp_protect.delete_arp_spoofing_protection(devices)
return resync

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import sys
import mock
@ -30,12 +31,18 @@ from neutron.plugins.ml2.drivers.linuxbridge.agent \
from neutron.tests import base
LOCAL_IP = '192.168.0.33'
PORT_1 = 'abcdef01-12ddssdfds-fdsfsd'
DEVICE_1 = 'tapabcdef01-12'
NETWORK_ID = '57653b20-ed5b-4ed0-a31d-06f84e3fd909'
BRIDGE_MAPPING_VALUE = 'br-eth2'
BRIDGE_MAPPINGS = {'physnet0': BRIDGE_MAPPING_VALUE}
INTERFACE_MAPPINGS = {'physnet1': 'eth1'}
FAKE_DEFAULT_DEV = mock.Mock()
FAKE_DEFAULT_DEV.name = 'eth1'
PORT_DATA = {
"port_id": PORT_1,
"device": DEVICE_1
}
class FakeIpLinkCommand(object):
@ -128,10 +135,13 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
agent = self.agent
agent._ensure_port_admin_state = mock.Mock()
devices = [DEVICE_1]
agent.network_ports[NETWORK_ID].append(PORT_DATA)
with mock.patch.object(agent.plugin_rpc,
"update_device_down") as fn_udd,\
mock.patch.object(agent.sg_agent,
"remove_devices_filter") as fn_rdf:
"remove_devices_filter") as fn_rdf,\
mock.patch.object(agent.ext_manager,
"delete_port") as ext_mgr_delete_port:
fn_udd.return_value = {'device': DEVICE_1,
'exists': True}
with mock.patch.object(linuxbridge_neutron_agent.LOG,
@ -141,14 +151,21 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
self.assertFalse(resync)
self.assertTrue(fn_udd.called)
self.assertTrue(fn_rdf.called)
self.assertTrue(ext_mgr_delete_port.called)
self.assertTrue(
PORT_DATA not in agent.network_ports[NETWORK_ID]
)
def test_treat_devices_removed_with_not_existed_device(self):
agent = self.agent
devices = [DEVICE_1]
agent.network_ports[NETWORK_ID].append(PORT_DATA)
with mock.patch.object(agent.plugin_rpc,
"update_device_down") as fn_udd,\
mock.patch.object(agent.sg_agent,
"remove_devices_filter") as fn_rdf:
"remove_devices_filter") as fn_rdf,\
mock.patch.object(agent.ext_manager,
"delete_port") as ext_mgr_delete_port:
fn_udd.return_value = {'device': DEVICE_1,
'exists': False}
with mock.patch.object(linuxbridge_neutron_agent.LOG,
@ -158,19 +175,30 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
self.assertFalse(resync)
self.assertTrue(fn_udd.called)
self.assertTrue(fn_rdf.called)
self.assertTrue(ext_mgr_delete_port.called)
self.assertTrue(
PORT_DATA not in agent.network_ports[NETWORK_ID]
)
def test_treat_devices_removed_failed(self):
agent = self.agent
devices = [DEVICE_1]
agent.network_ports[NETWORK_ID].append(PORT_DATA)
with mock.patch.object(agent.plugin_rpc,
"update_device_down") as fn_udd,\
mock.patch.object(agent.sg_agent,
"remove_devices_filter") as fn_rdf:
"remove_devices_filter") as fn_rdf,\
mock.patch.object(agent.ext_manager,
"delete_port") as ext_mgr_delete_port:
fn_udd.side_effect = Exception()
resync = agent.treat_devices_removed(devices)
self.assertTrue(resync)
self.assertTrue(fn_udd.called)
self.assertTrue(fn_rdf.called)
self.assertTrue(ext_mgr_delete_port.called)
self.assertTrue(
PORT_DATA not in agent.network_ports[NETWORK_ID]
)
def _test_scan_devices(self, previous, updated,
fake_current, expected, sync):
@ -273,6 +301,27 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
self._test_scan_devices(previous, updated, fake_current, expected,
sync=False)
def test_scan_devices_updated_deleted_concurrently(self):
previous = {
'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set()
}
# Device 2 disappeared.
fake_current = set([1])
# Device 2 got an concurrent update via network_update
updated = set([2])
expected = {
'current': set([1]),
'updated': set(),
'added': set(),
'removed': set([2])
}
self._test_scan_devices(
previous, updated, fake_current, expected, sync=False
)
def test_scan_devices_updated_on_sync(self):
previous = {'current': set([1, 2]),
'updated': set([1]),
@ -318,6 +367,11 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
'segmentation_id': 100,
'physical_network': 'physnet1',
'device_owner': constants.DEVICE_OWNER_NETWORK_PREFIX}
mock_port_data = {
'port_id': mock_details['port_id'],
'device': mock_details['device']
}
agent.ext_manager = mock.Mock()
agent.plugin_rpc = mock.Mock()
agent.plugin_rpc.get_devices_details_list.return_value = [mock_details]
agent.br_mgr = mock.Mock()
@ -331,6 +385,10 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
100, 'port123',
constants.DEVICE_OWNER_NETWORK_PREFIX)
self.assertTrue(agent.plugin_rpc.update_device_up.called)
self.assertTrue(agent.ext_manager.handle_port.called)
self.assertTrue(
mock_port_data in agent.network_ports[mock_details['network_id']]
)
def test_set_rpc_timeout(self):
self.agent.stop()
@ -370,6 +428,63 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
def test_ensure_port_admin_state_down(self):
self._test_ensure_port_admin_state(False)
def test_update_network_ports(self):
port_1_data = PORT_DATA
NETWORK_2_ID = 'fake_second_network'
port_2_data = {
'port_id': 'fake_port_2',
'device': 'fake_port_2_device_name'
}
self.agent.network_ports[NETWORK_ID].append(
port_1_data
)
self.agent.network_ports[NETWORK_ID].append(
port_2_data
)
#check update port:
self.agent._update_network_ports(
NETWORK_2_ID, port_2_data['port_id'], port_2_data['device']
)
self.assertTrue(
port_2_data not in self.agent.network_ports[NETWORK_ID]
)
self.assertTrue(
port_2_data in self.agent.network_ports[NETWORK_2_ID]
)
def test_clean_network_ports(self):
port_1_data = PORT_DATA
port_2_data = {
'port_id': 'fake_port_2',
'device': 'fake_port_2_device_name'
}
self.agent.network_ports[NETWORK_ID].append(
port_1_data
)
self.agent.network_ports[NETWORK_ID].append(
port_2_data
)
#check removing port from network when other ports are still there:
cleaned_port_id = self.agent._clean_network_ports(DEVICE_1)
self.assertTrue(
NETWORK_ID in self.agent.network_ports.keys()
)
self.assertTrue(
port_1_data not in self.agent.network_ports[NETWORK_ID]
)
self.assertTrue(
port_2_data in self.agent.network_ports[NETWORK_ID]
)
self.assertEqual(cleaned_port_id, PORT_1)
#and now remove last port from network:
cleaned_port_id = self.agent._clean_network_ports(
port_2_data['device']
)
self.assertTrue(
NETWORK_ID not in self.agent.network_ports.keys()
)
self.assertEqual(cleaned_port_id, port_2_data['port_id'])
class TestLinuxBridgeManager(base.BaseTestCase):
def setUp(self):
@ -1048,6 +1163,8 @@ class TestLinuxBridgeRpcCallbacks(base.BaseTestCase):
segment.network_type = 'vxlan'
segment.segmentation_id = 1
self.br_mgr.network_map['net_id'] = segment
self.updated_devices = set()
self.network_ports = collections.defaultdict(list)
self.lb_rpc = linuxbridge_neutron_agent.LinuxBridgeRpcCallbacks(
object(),
@ -1059,17 +1176,30 @@ class TestLinuxBridgeRpcCallbacks(base.BaseTestCase):
mock_net = mock.Mock()
mock_net.physical_network = None
self.lb_rpc.agent.br_mgr.network_map = {'123': mock_net}
self.lb_rpc.agent.br_mgr.network_map = {NETWORK_ID: mock_net}
with mock.patch.object(self.lb_rpc.agent.br_mgr,
"get_bridge_name") as get_br_fn,\
mock.patch.object(self.lb_rpc.agent.br_mgr,
"delete_bridge") as del_fn:
get_br_fn.return_value = "br0"
self.lb_rpc.network_delete("anycontext", network_id="123")
get_br_fn.assert_called_with("123")
self.lb_rpc.network_delete("anycontext", network_id=NETWORK_ID)
get_br_fn.assert_called_with(NETWORK_ID)
del_fn.assert_called_with("br0")
def test_port_update(self):
port = {'id': PORT_1}
self.lb_rpc.port_update(context=None, port=port)
self.assertEqual(set([DEVICE_1]), self.lb_rpc.agent.updated_devices)
def test_network_update(self):
updated_network = {'id': NETWORK_ID}
self.lb_rpc.agent.network_ports = {
NETWORK_ID: [PORT_DATA]
}
self.lb_rpc.network_update(context=None, network=updated_network)
self.assertEqual(set([DEVICE_1]), self.lb_rpc.agent.updated_devices)
def test_network_delete_with_existed_brq(self):
mock_net = mock.Mock()
mock_net.physical_network = 'physnet0'

View File

@ -0,0 +1,8 @@
---
prelude: >
The Linuxbridge agent now supports l2 agent extensions.
features:
- The Linuxbridge agent can now be extended by 3rd parties using a pluggable
mechanism.
fixes:
- partially closes bug 1468803