Process conntrack updates in worker threads
With a large number of instances and/or security group rules, conntrack updates when ports are removed or rules are changed can take a long time to process. By enqueuing these to a set or worker threads, the agent can continue with other work while they are processed in the background. This is a change in behavior in the agent since it could program a new set of security group rules before all existing conntrack entries are deleted, but since the iptables or OVSfw NAT rules will have been removed, it should not pose a security issue. Change-Id: Ibf858c7fdf7a822a30e4a0c4722d70fd272741b6 Closes-bug: #1745468
This commit is contained in:
parent
c7e144e9b4
commit
65a81623fc
@ -13,9 +13,11 @@
|
|||||||
|
|
||||||
import re
|
import re
|
||||||
|
|
||||||
|
import eventlet
|
||||||
import netaddr
|
import netaddr
|
||||||
from oslo_concurrency import lockutils
|
from oslo_concurrency import lockutils
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
|
from six.moves import queue as Queue
|
||||||
|
|
||||||
from neutron.agent.linux import utils as linux_utils
|
from neutron.agent.linux import utils as linux_utils
|
||||||
from neutron.common import constants as n_const
|
from neutron.common import constants as n_const
|
||||||
@ -27,6 +29,33 @@ MAX_CONNTRACK_ZONES = 65535
|
|||||||
ZONE_START = 4097
|
ZONE_START = 4097
|
||||||
|
|
||||||
|
|
||||||
|
class IpConntrackUpdate(object):
|
||||||
|
"""Encapsulates a conntrack update
|
||||||
|
|
||||||
|
An instance of this object carries the information necessary to
|
||||||
|
process a request to update the conntrack table.
|
||||||
|
"""
|
||||||
|
def __init__(self, device_info_list, rule, remote_ips):
|
||||||
|
self.device_info_list = device_info_list
|
||||||
|
self.rule = rule
|
||||||
|
self.remote_ips = remote_ips
|
||||||
|
|
||||||
|
|
||||||
|
class IpConntrackProcessingQueue(object):
|
||||||
|
"""Manager of the queue of conntrack updates to process."""
|
||||||
|
def __init__(self):
|
||||||
|
self._queue = Queue.Queue()
|
||||||
|
|
||||||
|
def add(self, update):
|
||||||
|
self._queue.put(update)
|
||||||
|
|
||||||
|
def updates(self):
|
||||||
|
"""Grabs the next conntrack update from the queue and processes."""
|
||||||
|
while not self._queue.empty():
|
||||||
|
update = self._queue.get()
|
||||||
|
yield update
|
||||||
|
|
||||||
|
|
||||||
@lockutils.synchronized('conntrack')
|
@lockutils.synchronized('conntrack')
|
||||||
def get_conntrack(get_rules_for_table_func, filtered_ports, unfiltered_ports,
|
def get_conntrack(get_rules_for_table_func, filtered_ports, unfiltered_ports,
|
||||||
execute=None, namespace=None, zone_per_port=False):
|
execute=None, namespace=None, zone_per_port=False):
|
||||||
@ -53,6 +82,32 @@ class IpConntrackManager(object):
|
|||||||
self.unfiltered_ports = unfiltered_ports
|
self.unfiltered_ports = unfiltered_ports
|
||||||
self.zone_per_port = zone_per_port # zone per port vs per network
|
self.zone_per_port = zone_per_port # zone per port vs per network
|
||||||
self._populate_initial_zone_map()
|
self._populate_initial_zone_map()
|
||||||
|
self._queue = IpConntrackProcessingQueue()
|
||||||
|
self.start_process_queue()
|
||||||
|
|
||||||
|
def start_process_queue(self):
|
||||||
|
eventlet.spawn_n(self._process_queue_loop)
|
||||||
|
|
||||||
|
def _process_queue_loop(self):
|
||||||
|
LOG.debug("Starting ipconntrack _process_queue_loop()")
|
||||||
|
pool = eventlet.GreenPool(size=8)
|
||||||
|
while True:
|
||||||
|
pool.spawn_n(self._process_queue)
|
||||||
|
|
||||||
|
def _process_queue(self):
|
||||||
|
for update in self._queue.updates():
|
||||||
|
if update.remote_ips:
|
||||||
|
for remote_ip in update.remote_ips:
|
||||||
|
self._delete_conntrack_state(
|
||||||
|
update.device_info_list, update.rule, remote_ip)
|
||||||
|
else:
|
||||||
|
self._delete_conntrack_state(update.device_info_list,
|
||||||
|
update.rule)
|
||||||
|
|
||||||
|
def _process(self, device_info_list, rule, remote_ips=None):
|
||||||
|
# queue the update to allow the caller to resume its work
|
||||||
|
update = IpConntrackUpdate(device_info_list, rule, remote_ips)
|
||||||
|
self._queue.add(update)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _generate_conntrack_cmd_by_rule(rule, namespace):
|
def _generate_conntrack_cmd_by_rule(rule, namespace):
|
||||||
@ -110,19 +165,14 @@ class IpConntrackManager(object):
|
|||||||
LOG.exception("Failed execute conntrack command %s", cmd)
|
LOG.exception("Failed execute conntrack command %s", cmd)
|
||||||
|
|
||||||
def delete_conntrack_state_by_rule(self, device_info_list, rule):
|
def delete_conntrack_state_by_rule(self, device_info_list, rule):
|
||||||
self._delete_conntrack_state(device_info_list, rule)
|
self._process(device_info_list, rule)
|
||||||
|
|
||||||
def delete_conntrack_state_by_remote_ips(self, device_info_list,
|
def delete_conntrack_state_by_remote_ips(self, device_info_list,
|
||||||
ethertype, remote_ips):
|
ethertype, remote_ips):
|
||||||
for direction in ['ingress', 'egress']:
|
for direction in ['ingress', 'egress']:
|
||||||
rule = {'ethertype': str(ethertype).lower(),
|
rule = {'ethertype': str(ethertype).lower(),
|
||||||
'direction': direction}
|
'direction': direction}
|
||||||
if remote_ips:
|
self._process(device_info_list, rule, remote_ips)
|
||||||
for remote_ip in remote_ips:
|
|
||||||
self._delete_conntrack_state(
|
|
||||||
device_info_list, rule, remote_ip)
|
|
||||||
else:
|
|
||||||
self._delete_conntrack_state(device_info_list, rule)
|
|
||||||
|
|
||||||
def _populate_initial_zone_map(self):
|
def _populate_initial_zone_map(self):
|
||||||
"""Setup the map between devices and zones based on current rules."""
|
"""Setup the map between devices and zones based on current rules."""
|
||||||
|
@ -70,6 +70,7 @@ COMMIT
|
|||||||
class BaseIptablesFirewallTestCase(base.BaseTestCase):
|
class BaseIptablesFirewallTestCase(base.BaseTestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(BaseIptablesFirewallTestCase, self).setUp()
|
super(BaseIptablesFirewallTestCase, self).setUp()
|
||||||
|
mock.patch('eventlet.spawn_n').start()
|
||||||
security_config.register_securitygroups_opts()
|
security_config.register_securitygroups_opts()
|
||||||
agent_config.register_root_helper(cfg.CONF)
|
agent_config.register_root_helper(cfg.CONF)
|
||||||
cfg.CONF.set_override('comment_iptables_rules', False, 'AGENT')
|
cfg.CONF.set_override('comment_iptables_rules', False, 'AGENT')
|
||||||
@ -1247,6 +1248,8 @@ class IptablesFirewallTestCase(BaseIptablesFirewallTestCase):
|
|||||||
if not ct_zone:
|
if not ct_zone:
|
||||||
self.assertFalse(self.utils_exec.called)
|
self.assertFalse(self.utils_exec.called)
|
||||||
return
|
return
|
||||||
|
# process conntrack updates in the queue
|
||||||
|
self.firewall.ipconntrack._process_queue()
|
||||||
cmd = ['conntrack', '-D']
|
cmd = ['conntrack', '-D']
|
||||||
if protocol:
|
if protocol:
|
||||||
cmd.extend(['-p', protocol])
|
cmd.extend(['-p', protocol])
|
||||||
@ -1335,6 +1338,8 @@ class IptablesFirewallTestCase(BaseIptablesFirewallTestCase):
|
|||||||
if not ct_zone:
|
if not ct_zone:
|
||||||
self.assertFalse(self.utils_exec.called)
|
self.assertFalse(self.utils_exec.called)
|
||||||
return
|
return
|
||||||
|
# process conntrack updates in the queue
|
||||||
|
self.firewall.ipconntrack._process_queue()
|
||||||
calls = self._get_expected_conntrack_calls(
|
calls = self._get_expected_conntrack_calls(
|
||||||
[('ipv4', '10.0.0.1'), ('ipv6', 'fe80::1')], ct_zone)
|
[('ipv4', '10.0.0.1'), ('ipv6', 'fe80::1')], ct_zone)
|
||||||
self.utils_exec.assert_has_calls(calls)
|
self.utils_exec.assert_has_calls(calls)
|
||||||
@ -1398,6 +1403,8 @@ class IptablesFirewallTestCase(BaseIptablesFirewallTestCase):
|
|||||||
ips = {"ipv4": ['10.0.0.1', '10.0.0.2'],
|
ips = {"ipv4": ['10.0.0.1', '10.0.0.2'],
|
||||||
"ipv6": ['fe80::1', 'fe80::2']}
|
"ipv6": ['fe80::1', 'fe80::2']}
|
||||||
calls = []
|
calls = []
|
||||||
|
# process conntrack updates in the queue
|
||||||
|
self.firewall.ipconntrack._process_queue()
|
||||||
for direction in ['ingress', 'egress']:
|
for direction in ['ingress', 'egress']:
|
||||||
direction = '-d' if direction == 'ingress' else '-s'
|
direction = '-d' if direction == 'ingress' else '-s'
|
||||||
remote_ip_direction = '-s' if direction == '-d' else '-d'
|
remote_ip_direction = '-s' if direction == '-d' else '-d'
|
||||||
@ -1642,6 +1649,8 @@ class IptablesFirewallTestCase(BaseIptablesFirewallTestCase):
|
|||||||
if not ct_zone:
|
if not ct_zone:
|
||||||
self.assertFalse(self.utils_exec.called)
|
self.assertFalse(self.utils_exec.called)
|
||||||
return
|
return
|
||||||
|
# process conntrack updates in the queue
|
||||||
|
self.firewall.ipconntrack._process_queue()
|
||||||
calls = self._get_expected_conntrack_calls(
|
calls = self._get_expected_conntrack_calls(
|
||||||
[('ipv4', '10.0.0.1'), ('ipv6', 'fe80::1')], ct_zone)
|
[('ipv4', '10.0.0.1'), ('ipv6', 'fe80::1')], ct_zone)
|
||||||
self.utils_exec.assert_has_calls(calls)
|
self.utils_exec.assert_has_calls(calls)
|
||||||
|
@ -0,0 +1,13 @@
|
|||||||
|
---
|
||||||
|
prelude: >
|
||||||
|
In order to reduce the time spent processing security group updates in
|
||||||
|
the L2 agent, conntrack deletion is now performed in a set of worker
|
||||||
|
threads instead of the main agent thread, so it can return to processing
|
||||||
|
other events quickly.
|
||||||
|
upgrade:
|
||||||
|
- |
|
||||||
|
On an upgrade, conntrack entries will now be cleaned-up in a worker
|
||||||
|
thread, instead of in the calling thread.
|
||||||
|
fixes:
|
||||||
|
- |
|
||||||
|
Fixes bug `1745468 <https://bugs.launchpad.net/neutron/+bug/1745468>`_.
|
Loading…
x
Reference in New Issue
Block a user