Minimizing L3 agent QoS extensions lock granularity
If agent is concurrently processing large set of resources, the bottleneck lock will multiply increase processing time of those resources which have been waiting for the lock for a long time. This patch moves the lock to the core cache resource, and leverage the coordination lock to the resource prcessing and notification thread functions. Closes-Bug: #1824911 Change-Id: Id43829b11631727f1a46362ffea5c22d2177fd79
This commit is contained in:
parent
771a2a191c
commit
ab57410ec8
@ -19,6 +19,7 @@ from neutron_lib import constants
|
||||
from neutron_lib.db import constants as db_consts
|
||||
from neutron_lib import rpc as n_rpc
|
||||
from neutron_lib.services.qos import constants as qos_consts
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_log import log as logging
|
||||
|
||||
from neutron.agent.linux import l3_tc_lib as tc_lib
|
||||
@ -51,23 +52,44 @@ IP_DEFAULT_BURST = 0
|
||||
|
||||
class RateLimitMaps(object):
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, lock_name):
|
||||
self.qos_policy_resources = collections.defaultdict(dict)
|
||||
self.known_policies = {}
|
||||
self.resource_policies = {}
|
||||
self.lock_name = lock_name
|
||||
|
||||
def update_policy(self, policy):
|
||||
self.known_policies[policy.id] = policy
|
||||
|
||||
@lockutils.synchronized(self.lock_name)
|
||||
def _update_policy():
|
||||
self.known_policies[policy.id] = policy
|
||||
|
||||
return _update_policy()
|
||||
|
||||
def get_policy(self, policy_id):
|
||||
return self.known_policies.get(policy_id)
|
||||
|
||||
@lockutils.synchronized(self.lock_name)
|
||||
def _get_policy():
|
||||
return self.known_policies.get(policy_id)
|
||||
|
||||
return _get_policy()
|
||||
|
||||
def get_resources(self, policy):
|
||||
return self.qos_policy_resources[policy.id].values()
|
||||
|
||||
@lockutils.synchronized(self.lock_name)
|
||||
def _get_resources():
|
||||
return self.qos_policy_resources[policy.id].values()
|
||||
|
||||
return _get_resources()
|
||||
|
||||
def get_resource_policy(self, resource):
|
||||
policy_id = self.resource_policies.get(resource)
|
||||
return self.get_policy(policy_id)
|
||||
|
||||
@lockutils.synchronized(self.lock_name)
|
||||
def _get_resource_policy():
|
||||
policy_id = self.resource_policies.get(resource)
|
||||
return self.known_policies.get(policy_id)
|
||||
|
||||
return _get_resource_policy()
|
||||
|
||||
def set_resource_policy(self, resource, policy):
|
||||
"""Attach a resource to policy
|
||||
@ -75,12 +97,17 @@ class RateLimitMaps(object):
|
||||
and return any previous policy on resource.
|
||||
"""
|
||||
|
||||
old_policy = self.get_resource_policy(resource)
|
||||
self.update_policy(policy)
|
||||
self.resource_policies[resource] = policy.id
|
||||
self.qos_policy_resources[policy.id][resource] = resource
|
||||
if old_policy and old_policy.id != policy.id:
|
||||
del self.qos_policy_resources[old_policy.id][resource]
|
||||
@lockutils.synchronized(self.lock_name)
|
||||
def _set_resource_policy():
|
||||
policy_id = self.resource_policies.get(resource)
|
||||
old_policy = self.known_policies.get(policy_id)
|
||||
self.known_policies[policy.id] = policy
|
||||
self.resource_policies[resource] = policy.id
|
||||
self.qos_policy_resources[policy.id][resource] = resource
|
||||
if old_policy and old_policy.id != policy.id:
|
||||
del self.qos_policy_resources[old_policy.id][resource]
|
||||
|
||||
_set_resource_policy()
|
||||
|
||||
def clean_by_resource(self, resource):
|
||||
"""Detach resource from policy
|
||||
@ -88,16 +115,21 @@ class RateLimitMaps(object):
|
||||
and cleanup data we don't need anymore.
|
||||
"""
|
||||
|
||||
if resource in self.resource_policies:
|
||||
del self.resource_policies[resource]
|
||||
for qos_policy_id, res_dict in self.qos_policy_resources.items():
|
||||
if resource in res_dict:
|
||||
del res_dict[resource]
|
||||
if not res_dict:
|
||||
self._clean_policy_info(qos_policy_id)
|
||||
return
|
||||
LOG.debug("L3 QoS extension did not have "
|
||||
"information on floating IP %s", resource)
|
||||
@lockutils.synchronized(self.lock_name)
|
||||
def _clean_by_resource():
|
||||
if resource in self.resource_policies:
|
||||
del self.resource_policies[resource]
|
||||
for (qos_policy_id,
|
||||
res_dict) in self.qos_policy_resources.items():
|
||||
if resource in res_dict:
|
||||
del res_dict[resource]
|
||||
if not res_dict:
|
||||
self._clean_policy_info(qos_policy_id)
|
||||
return
|
||||
LOG.debug("L3 QoS extension did not have "
|
||||
"information on floating IP %s", resource)
|
||||
|
||||
_clean_by_resource()
|
||||
|
||||
def _clean_policy_info(self, qos_policy_id):
|
||||
del self.qos_policy_resources[qos_policy_id]
|
||||
|
@ -24,11 +24,14 @@ from neutron.agent.linux import ip_lib
|
||||
from neutron.api.rpc.callbacks import events
|
||||
from neutron.api.rpc.callbacks import resources
|
||||
from neutron.api.rpc.handlers import resources_rpc
|
||||
from neutron.common import coordination
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RouterFipRateLimitMaps(qos_base.RateLimitMaps):
|
||||
LOCK_NAME = "fip-qos-cache"
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize RouterFipRateLimitMaps
|
||||
|
||||
@ -51,12 +54,58 @@ class RouterFipRateLimitMaps(qos_base.RateLimitMaps):
|
||||
"""
|
||||
self.ingress_ratelimits = {}
|
||||
self.egress_ratelimits = {}
|
||||
super(RouterFipRateLimitMaps, self).__init__()
|
||||
super(RouterFipRateLimitMaps, self).__init__(self.LOCK_NAME)
|
||||
|
||||
def find_fip_router_id(self, fip):
|
||||
for router_id, ips in self.router_floating_ips.items():
|
||||
if fip in ips:
|
||||
return router_id
|
||||
|
||||
@lockutils.synchronized(self.lock_name)
|
||||
def _find_fip_router_id():
|
||||
for router_id, ips in self.router_floating_ips.items():
|
||||
if fip in ips:
|
||||
return router_id
|
||||
|
||||
return _find_fip_router_id()
|
||||
|
||||
def get_router_floating_ips(self, router_id):
|
||||
|
||||
@lockutils.synchronized(self.lock_name)
|
||||
def _get_router_floating_ips():
|
||||
return self.router_floating_ips.pop(
|
||||
router_id, [])
|
||||
|
||||
return _get_router_floating_ips()
|
||||
|
||||
def remove_fip_ratelimit_cache(self, direction, fip):
|
||||
|
||||
@lockutils.synchronized(self.lock_name)
|
||||
def _remove_fip_ratelimit_cache():
|
||||
rate_limits_direction = direction + "_ratelimits"
|
||||
rate_limits = getattr(self, rate_limits_direction, {})
|
||||
rate_limits.pop(fip, None)
|
||||
|
||||
_remove_fip_ratelimit_cache()
|
||||
|
||||
def set_fip_ratelimit_cache(self, direction, fip, rate, burst):
|
||||
|
||||
@lockutils.synchronized(self.lock_name)
|
||||
def _set_fip_ratelimit_cache():
|
||||
rate_limits_direction = direction + "_ratelimits"
|
||||
rate_limits = getattr(self, rate_limits_direction, {})
|
||||
rate_limits[fip] = (rate, burst)
|
||||
|
||||
_set_fip_ratelimit_cache()
|
||||
|
||||
def get_fip_ratelimit_cache(self, direction, fip):
|
||||
|
||||
@lockutils.synchronized(self.lock_name)
|
||||
def _get_fip_ratelimit_cache():
|
||||
rate_limits_direction = direction + "_ratelimits"
|
||||
rate_limits = getattr(self, rate_limits_direction, {})
|
||||
rate, burst = rate_limits.get(fip, (qos_base.IP_DEFAULT_RATE,
|
||||
qos_base.IP_DEFAULT_BURST))
|
||||
return rate, burst
|
||||
|
||||
return _get_fip_ratelimit_cache()
|
||||
|
||||
|
||||
class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase,
|
||||
@ -68,7 +117,6 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase,
|
||||
self.fip_qos_map = RouterFipRateLimitMaps()
|
||||
self._register_rpc_consumers()
|
||||
|
||||
@lockutils.synchronized('qos-fip')
|
||||
def _handle_notification(self, context, resource_type,
|
||||
qos_policies, event_type):
|
||||
if event_type == events.UPDATED:
|
||||
@ -98,20 +146,16 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase,
|
||||
fip, dvr_fip_device, rates, with_cache=False)
|
||||
self.fip_qos_map.update_policy(qos_policy)
|
||||
|
||||
def _remove_fip_rate_limit_cache(self, fip):
|
||||
for direction in constants.VALID_DIRECTIONS:
|
||||
self.fip_qos_map.remove_fip_ratelimit_cache(direction, fip)
|
||||
|
||||
def _process_reset_fip(self, fip):
|
||||
self.fip_qos_map.clean_by_resource(fip)
|
||||
|
||||
def process_ip_rate_limit(self, ip, direction, device, rate, burst):
|
||||
rate_limits_direction = direction + "_ratelimits"
|
||||
rate_limits = getattr(self.fip_qos_map, rate_limits_direction, {})
|
||||
old_rate, old_burst = rate_limits.get(ip, (qos_base.IP_DEFAULT_RATE,
|
||||
qos_base.IP_DEFAULT_BURST))
|
||||
|
||||
if old_rate == rate and old_burst == burst:
|
||||
# Two possibilities here:
|
||||
# 1. Floating IP rate limit does not change.
|
||||
# 2. Floating IP bandwidth does not limit.
|
||||
return
|
||||
@coordination.synchronized('qos-floating-ip-{ip}')
|
||||
def process_ip_rate_limit(self, ip, direction,
|
||||
device, rate, burst):
|
||||
|
||||
tc_wrapper = self._get_tc_wrapper(device)
|
||||
|
||||
@ -121,12 +165,11 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase,
|
||||
# floating IP bandwidth was changed to default value (no limit).
|
||||
# NOTE: l3_tc_lib will ignore exception FilterIDForIPNotFound.
|
||||
tc_wrapper.clear_ip_rate_limit(direction, ip)
|
||||
rate_limits.pop(ip, None)
|
||||
self.fip_qos_map.remove_fip_ratelimit_cache(direction, ip)
|
||||
return
|
||||
|
||||
# Finally just set it, l3_tc_lib will clean the old rules if exists.
|
||||
tc_wrapper.set_ip_rate_limit(direction, ip, rate, burst)
|
||||
rate_limits[ip] = (rate, burst)
|
||||
|
||||
def _get_rate_limit_ip_device(self, router_info):
|
||||
ex_gw_port = router_info.get_ex_gw_port()
|
||||
@ -152,17 +195,12 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase,
|
||||
namespace = router_info.get_gw_ns_name()
|
||||
return ip_lib.IPDevice(name, namespace=namespace)
|
||||
|
||||
def _remove_ip_rate_limit_cache(self, ip, direction):
|
||||
rate_limits_direction = direction + "_ratelimits"
|
||||
rate_limits = getattr(self.fip_qos_map, rate_limits_direction, {})
|
||||
rate_limits.pop(ip, None)
|
||||
|
||||
def _remove_fip_rate_limit(self, device, fip_ip):
|
||||
tc_wrapper = self._get_tc_wrapper(device)
|
||||
for direction in constants.VALID_DIRECTIONS:
|
||||
if device.exists():
|
||||
tc_wrapper.clear_ip_rate_limit(direction, fip_ip)
|
||||
self._remove_ip_rate_limit_cache(fip_ip, direction)
|
||||
self.fip_qos_map.remove_fip_ratelimit_cache(direction, fip_ip)
|
||||
|
||||
def get_fip_qos_rates(self, context, fip, policy_id):
|
||||
if policy_id is None:
|
||||
@ -184,9 +222,21 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase,
|
||||
for direction in constants.VALID_DIRECTIONS:
|
||||
rate = rates.get(direction)
|
||||
if with_cache:
|
||||
|
||||
old_rate, old_burst = self.fip_qos_map.get_fip_ratelimit_cache(
|
||||
direction, fip)
|
||||
if old_rate == rate['rate'] and old_burst == rate['burst']:
|
||||
# Two possibilities here:
|
||||
# 1. Floating IP rate limit does not change.
|
||||
# 2. Floating IP bandwidth does not limit.
|
||||
continue
|
||||
|
||||
self.process_ip_rate_limit(
|
||||
fip, direction, device,
|
||||
rate['rate'], rate['burst'])
|
||||
|
||||
self.fip_qos_map.set_fip_ratelimit_cache(
|
||||
direction, fip, rate['rate'], rate['burst'])
|
||||
else:
|
||||
tc_wrapper = self._get_tc_wrapper(device)
|
||||
if (rate['rate'] == qos_base.IP_DEFAULT_RATE and
|
||||
@ -280,13 +330,11 @@ class FipQosAgentExtension(qos_base.L3QosAgentExtensionBase,
|
||||
self._remove_fip_rate_limit(dvr_fip_device, fip)
|
||||
self._process_reset_fip(fip)
|
||||
|
||||
@lockutils.synchronized('qos-fip')
|
||||
def add_router(self, context, data):
|
||||
router_info = self._get_router_info(data['id'])
|
||||
if router_info:
|
||||
self.process_floating_ip_addresses(context, router_info)
|
||||
|
||||
@lockutils.synchronized('qos-fip')
|
||||
def update_router(self, context, data):
|
||||
router_info = self._get_router_info(data['id'])
|
||||
if router_info:
|
||||
|
@ -17,7 +17,6 @@ import netaddr
|
||||
|
||||
from neutron_lib.agent import l3_extension
|
||||
from neutron_lib import constants
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_log import log as logging
|
||||
|
||||
|
||||
@ -26,6 +25,7 @@ from neutron.agent.linux import ip_lib
|
||||
from neutron.api.rpc.callbacks import events
|
||||
from neutron.api.rpc.callbacks import resources
|
||||
from neutron.api.rpc.handlers import resources_rpc
|
||||
from neutron.common import coordination
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -37,9 +37,9 @@ class RouterGatewayIPQosAgentExtension(qos_base.L3QosAgentExtensionBase,
|
||||
"""Initialize agent extension."""
|
||||
self.resource_rpc = resources_rpc.ResourcesPullRpcApi()
|
||||
self._register_rpc_consumers()
|
||||
self.gateway_ip_qos_map = qos_base.RateLimitMaps()
|
||||
self.gateway_ip_qos_map = qos_base.RateLimitMaps(
|
||||
"gateway-ip-qos-cache")
|
||||
|
||||
@lockutils.synchronized('qos-gateway-ip')
|
||||
def _handle_notification(self, context, resource_type,
|
||||
qos_policies, event_type):
|
||||
if event_type == events.UPDATED:
|
||||
@ -81,13 +81,11 @@ class RouterGatewayIPQosAgentExtension(qos_base.L3QosAgentExtensionBase,
|
||||
router_id, qos_policy)
|
||||
self.gateway_ip_qos_map.update_policy(qos_policy)
|
||||
|
||||
@lockutils.synchronized('qos-gateway-ip')
|
||||
def add_router(self, context, data):
|
||||
router_info = self._get_router_info(data['id'])
|
||||
if router_info:
|
||||
self.process_gateway_rate_limit(context, router_info)
|
||||
|
||||
@lockutils.synchronized('qos-gateway-ip')
|
||||
def update_router(self, context, data):
|
||||
router_info = self._get_router_info(data['id'])
|
||||
if router_info:
|
||||
@ -120,6 +118,7 @@ class RouterGatewayIPQosAgentExtension(qos_base.L3QosAgentExtensionBase,
|
||||
|
||||
self._handle_router_gateway_rate_limit(context, router_info)
|
||||
|
||||
@coordination.synchronized('qos-gateway-ip-{router_info.router_id}')
|
||||
def _empty_router_gateway_rate_limits(self, router_info, tc_wrapper):
|
||||
self.gateway_ip_qos_map.clean_by_resource(router_info.router_id)
|
||||
for ip in router_info.qos_gateway_ips:
|
||||
@ -172,6 +171,7 @@ class RouterGatewayIPQosAgentExtension(qos_base.L3QosAgentExtensionBase,
|
||||
router_info.router_id, policy)
|
||||
return self.get_policy_rates(policy)
|
||||
|
||||
@coordination.synchronized('qos-gateway-ip-{router_info.router_id}')
|
||||
def _set_gateway_tc_rules(self, router_info, tc_wrapper,
|
||||
ex_gw_port, rates):
|
||||
for ip_addr in ex_gw_port['fixed_ips']:
|
||||
|
@ -34,7 +34,7 @@ class RateLimitMapsTestCase(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(RateLimitMapsTestCase, self).setUp()
|
||||
self.policy_map = qos_base.RateLimitMaps()
|
||||
self.policy_map = qos_base.RateLimitMaps("cache-lock")
|
||||
|
||||
def test_update_policy(self):
|
||||
self.policy_map.update_policy(TEST_POLICY)
|
||||
|
@ -384,3 +384,37 @@ class RouterFipRateLimitMapsTestCase(base.BaseTestCase):
|
||||
self.assertIsNone(self.policy_map.find_fip_router_id("8.8.8.8"))
|
||||
self.assertEqual(router_id,
|
||||
self.policy_map.find_fip_router_id(TEST_FIP))
|
||||
|
||||
def test_get_router_floating_ips(self):
|
||||
router_id = _uuid()
|
||||
test_ips = [TEST_FIP, TEST_FIP2]
|
||||
self.policy_map.router_floating_ips[router_id] = set([TEST_FIP,
|
||||
TEST_FIP2])
|
||||
get_ips = self.policy_map.get_router_floating_ips(router_id)
|
||||
self.assertEqual(len(test_ips), len(get_ips))
|
||||
|
||||
def test_remove_fip_ratelimit_cache(self):
|
||||
fip = "1.1.1.1"
|
||||
self.policy_map.set_fip_ratelimit_cache(
|
||||
"ingress", fip, 100, 200)
|
||||
self.policy_map.set_fip_ratelimit_cache(
|
||||
"egress", fip, 100, 200)
|
||||
self.policy_map.remove_fip_ratelimit_cache("ingress", fip)
|
||||
self.assertIsNone(self.policy_map.ingress_ratelimits.get(fip))
|
||||
self.policy_map.remove_fip_ratelimit_cache("egress", fip)
|
||||
self.assertIsNone(self.policy_map.egress_ratelimits.get(fip))
|
||||
|
||||
def test_set_fip_ratelimit_cache(self):
|
||||
fip = "1.1.1.1"
|
||||
self.policy_map.set_fip_ratelimit_cache(
|
||||
"ingress", fip, 100, 200)
|
||||
self.policy_map.set_fip_ratelimit_cache(
|
||||
"egress", fip, 300, 400)
|
||||
in_rate, in_burst = self.policy_map.get_fip_ratelimit_cache(
|
||||
"ingress", fip)
|
||||
self.assertEqual(100, in_rate)
|
||||
self.assertEqual(200, in_burst)
|
||||
e_rate, e_burst = self.policy_map.get_fip_ratelimit_cache(
|
||||
"egress", fip)
|
||||
self.assertEqual(300, e_rate)
|
||||
self.assertEqual(400, e_burst)
|
||||
|
@ -0,0 +1,6 @@
|
||||
---
|
||||
fixes:
|
||||
- |
|
||||
Leverage the coordination lock to the resource processing
|
||||
and notification thread functions to minimize the lock
|
||||
granularity.
|
Loading…
Reference in New Issue
Block a user