Merge "Remove race and simplify conntrack state management"
This commit is contained in:
commit
408d3d035f
@ -17,7 +17,6 @@ import eventlet
|
||||
import netaddr
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_log import log as logging
|
||||
from six.moves import queue as Queue
|
||||
|
||||
from neutron.agent.linux import utils as linux_utils
|
||||
from neutron.common import constants as n_const
|
||||
@ -28,6 +27,8 @@ CONTRACK_MGRS = {}
|
||||
MAX_CONNTRACK_ZONES = 65535
|
||||
ZONE_START = 4097
|
||||
|
||||
WORKERS = 8
|
||||
|
||||
|
||||
class IpConntrackUpdate(object):
|
||||
"""Encapsulates a conntrack update
|
||||
@ -40,20 +41,10 @@ class IpConntrackUpdate(object):
|
||||
self.rule = rule
|
||||
self.remote_ips = remote_ips
|
||||
|
||||
|
||||
class IpConntrackProcessingQueue(object):
|
||||
"""Manager of the queue of conntrack updates to process."""
|
||||
def __init__(self):
|
||||
self._queue = Queue.Queue()
|
||||
|
||||
def add(self, update):
|
||||
self._queue.put(update)
|
||||
|
||||
def updates(self):
|
||||
"""Grabs the next conntrack update from the queue and processes."""
|
||||
while not self._queue.empty():
|
||||
update = self._queue.get()
|
||||
yield update
|
||||
def __repr__(self):
|
||||
return ('<IpConntrackUpdate(device_info_list=%s, rule=%s, '
|
||||
'remote_ips=%s>' % (self.device_info_list, self.rule,
|
||||
self.remote_ips))
|
||||
|
||||
|
||||
@lockutils.synchronized('conntrack')
|
||||
@ -82,32 +73,43 @@ class IpConntrackManager(object):
|
||||
self.unfiltered_ports = unfiltered_ports
|
||||
self.zone_per_port = zone_per_port # zone per port vs per network
|
||||
self._populate_initial_zone_map()
|
||||
self._queue = IpConntrackProcessingQueue()
|
||||
self.start_process_queue()
|
||||
self._queue = eventlet.queue.LightQueue()
|
||||
self._start_process_queue()
|
||||
|
||||
def start_process_queue(self):
|
||||
eventlet.spawn_n(self._process_queue_loop)
|
||||
def _start_process_queue(self):
|
||||
LOG.debug("Starting ip_conntrack _process_queue_worker() threads")
|
||||
pool = eventlet.GreenPool(size=WORKERS)
|
||||
for i in range(WORKERS):
|
||||
pool.spawn_n(self._process_queue_worker)
|
||||
|
||||
def _process_queue_loop(self):
|
||||
LOG.debug("Starting ipconntrack _process_queue_loop()")
|
||||
pool = eventlet.GreenPool(size=8)
|
||||
def _process_queue_worker(self):
|
||||
# While it's technically not necessary to have this method, the
|
||||
# 'while True' could just be in _process_queue(), the tests have
|
||||
# to be able to drain the queue without blocking, so _process_queue()
|
||||
# is made standalone.
|
||||
while True:
|
||||
pool.spawn_n(self._process_queue)
|
||||
self._process_queue()
|
||||
|
||||
def _process_queue(self):
|
||||
for update in self._queue.updates():
|
||||
update = None
|
||||
try:
|
||||
# this will block until an entry gets added to the queue
|
||||
update = self._queue.get()
|
||||
if update.remote_ips:
|
||||
for remote_ip in update.remote_ips:
|
||||
self._delete_conntrack_state(
|
||||
update.device_info_list, update.rule, remote_ip)
|
||||
else:
|
||||
self._delete_conntrack_state(update.device_info_list,
|
||||
update.rule)
|
||||
self._delete_conntrack_state(
|
||||
update.device_info_list, update.rule)
|
||||
except Exception:
|
||||
LOG.exception("Failed to process ip_conntrack queue entry: %s",
|
||||
update)
|
||||
|
||||
def _process(self, device_info_list, rule, remote_ips=None):
|
||||
# queue the update to allow the caller to resume its work
|
||||
update = IpConntrackUpdate(device_info_list, rule, remote_ips)
|
||||
self._queue.add(update)
|
||||
self._queue.put(update)
|
||||
|
||||
@staticmethod
|
||||
def _generate_conntrack_cmd_by_rule(rule, namespace):
|
||||
|
@ -1249,7 +1249,8 @@ class IptablesFirewallTestCase(BaseIptablesFirewallTestCase):
|
||||
self.assertFalse(self.utils_exec.called)
|
||||
return
|
||||
# process conntrack updates in the queue
|
||||
self.firewall.ipconntrack._process_queue()
|
||||
while not self.firewall.ipconntrack._queue.empty():
|
||||
self.firewall.ipconntrack._process_queue()
|
||||
cmd = ['conntrack', '-D']
|
||||
if protocol:
|
||||
cmd.extend(['-p', protocol])
|
||||
@ -1339,7 +1340,8 @@ class IptablesFirewallTestCase(BaseIptablesFirewallTestCase):
|
||||
self.assertFalse(self.utils_exec.called)
|
||||
return
|
||||
# process conntrack updates in the queue
|
||||
self.firewall.ipconntrack._process_queue()
|
||||
while not self.firewall.ipconntrack._queue.empty():
|
||||
self.firewall.ipconntrack._process_queue()
|
||||
calls = self._get_expected_conntrack_calls(
|
||||
[('ipv4', '10.0.0.1'), ('ipv6', 'fe80::1')], ct_zone)
|
||||
self.utils_exec.assert_has_calls(calls)
|
||||
@ -1404,7 +1406,8 @@ class IptablesFirewallTestCase(BaseIptablesFirewallTestCase):
|
||||
"ipv6": ['fe80::1', 'fe80::2']}
|
||||
calls = []
|
||||
# process conntrack updates in the queue
|
||||
self.firewall.ipconntrack._process_queue()
|
||||
while not self.firewall.ipconntrack._queue.empty():
|
||||
self.firewall.ipconntrack._process_queue()
|
||||
for direction in ['ingress', 'egress']:
|
||||
direction = '-d' if direction == 'ingress' else '-s'
|
||||
remote_ip_direction = '-s' if direction == '-d' else '-d'
|
||||
@ -1650,7 +1653,8 @@ class IptablesFirewallTestCase(BaseIptablesFirewallTestCase):
|
||||
self.assertFalse(self.utils_exec.called)
|
||||
return
|
||||
# process conntrack updates in the queue
|
||||
self.firewall.ipconntrack._process_queue()
|
||||
while not self.firewall.ipconntrack._queue.empty():
|
||||
self.firewall.ipconntrack._process_queue()
|
||||
calls = self._get_expected_conntrack_calls(
|
||||
[('ipv4', '10.0.0.1'), ('ipv6', 'fe80::1')], ct_zone)
|
||||
self.utils_exec.assert_has_calls(calls)
|
||||
|
Loading…
Reference in New Issue
Block a user