Enable rpc notifications only when rpc_workers >= 1
rpc_workers can be set < 1 with 'ovn' backend when no other agent is running apart from ovn agents to consume these rpc notifications. Add and apply disable_notifications decorator on methods which do rpc cast calls to agents, the decorator makes the caller method execute only when rpc_workers >=1. This patch not changing default behavior and utilizes the rpc_workers config option to enable rpc notification on resources updates only when rpc_workers >= 1. Also set rpc_workers=0 in ovn jobs to cover this scenario. Closes-Bug: #1889737 Closes-Bug: #1992352 Change-Id: I700fe2cd422bc1eb8b5144ec116e7f0a60238419
This commit is contained in:
parent
1374b01cfb
commit
3e1e2d63b3
@ -28,6 +28,8 @@ from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
|
||||
from neutron.common import utils
|
||||
|
||||
|
||||
# Priorities - lower value is higher priority
|
||||
PRIORITY_NETWORK_CREATE = 0
|
||||
@ -87,6 +89,8 @@ class DhcpAgentNotifyAPI(object):
|
||||
self._plugin = plugin
|
||||
target = oslo_messaging.Target(topic=topic, version='1.0')
|
||||
self.client = n_rpc.get_client(target)
|
||||
if not cfg.CONF.dhcp_agent_notification:
|
||||
return
|
||||
# register callbacks for router interface changes
|
||||
registry.subscribe(self._after_router_interface_created,
|
||||
resources.ROUTER_INTERFACE, events.AFTER_CREATE)
|
||||
@ -101,8 +105,6 @@ class DhcpAgentNotifyAPI(object):
|
||||
resources.SUBNET,
|
||||
resources.SUBNETS,
|
||||
)
|
||||
if not cfg.CONF.dhcp_agent_notification:
|
||||
return
|
||||
for resource in callback_resources:
|
||||
registry.subscribe(self._send_dhcp_notification,
|
||||
resource, events.BEFORE_RESPONSE)
|
||||
@ -182,6 +184,7 @@ class DhcpAgentNotifyAPI(object):
|
||||
def _is_reserved_dhcp_port(self, port):
|
||||
return port.get('device_id') == constants.DEVICE_ID_RESERVED_DHCP_PORT
|
||||
|
||||
@utils.disable_notifications
|
||||
def _notify_agents(
|
||||
self, context, method, payload, network_id, network=None):
|
||||
"""Notify all the agents that are hosting the network."""
|
||||
@ -242,12 +245,14 @@ class DhcpAgentNotifyAPI(object):
|
||||
self._cast_message(context, "port_create_end",
|
||||
payload, agent.host, agent.topic)
|
||||
|
||||
@utils.disable_notifications
|
||||
def _cast_message(self, context, method, payload, host,
|
||||
topic=topics.DHCP_AGENT):
|
||||
"""Cast the payload to the dhcp agent running on the host."""
|
||||
cctxt = self.client.prepare(topic=topic, server=host)
|
||||
cctxt.cast(context, method, payload=payload)
|
||||
|
||||
@utils.disable_notifications
|
||||
def _fanout_message(self, context, method, payload):
|
||||
"""Fanout the payload to all dhcp agents."""
|
||||
cctxt = self.client.prepare(fanout=True)
|
||||
|
@ -25,6 +25,7 @@ from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
|
||||
from neutron.api.rpc.agentnotifiers import utils as ag_utils
|
||||
from neutron.common import utils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -41,6 +42,7 @@ class L3AgentNotifyAPI(object):
|
||||
target = oslo_messaging.Target(topic=topic, version='1.0')
|
||||
self.client = n_rpc.get_client(target)
|
||||
|
||||
@utils.disable_notifications
|
||||
def _notification_host(self, context, method, host, use_call=False,
|
||||
**kwargs):
|
||||
"""Notify the agent that is hosting the router."""
|
||||
@ -52,6 +54,7 @@ class L3AgentNotifyAPI(object):
|
||||
if use_call else cctxt.cast)
|
||||
rpc_method(context, method, **kwargs)
|
||||
|
||||
@utils.disable_notifications
|
||||
def _agent_notification(self, context, method, router_ids, operation,
|
||||
shuffle_agents):
|
||||
"""Notify changed routers to hosting l3 agents."""
|
||||
@ -72,6 +75,7 @@ class L3AgentNotifyAPI(object):
|
||||
version='1.1')
|
||||
cctxt.cast(context, method, routers=[router_id])
|
||||
|
||||
@utils.disable_notifications
|
||||
def _agent_notification_arp(self, context, method, router_id,
|
||||
operation, data):
|
||||
"""Notify arp details to l3 agents hosting router."""
|
||||
@ -82,6 +86,7 @@ class L3AgentNotifyAPI(object):
|
||||
cctxt = self.client.prepare(fanout=True, version='1.2')
|
||||
cctxt.cast(context, method, payload=dvr_arptable)
|
||||
|
||||
@utils.disable_notifications
|
||||
def _notification(self, context, method, router_ids, operation,
|
||||
shuffle_agents, schedule_routers=True):
|
||||
"""Notify all the agents that are hosting the routers."""
|
||||
@ -102,6 +107,7 @@ class L3AgentNotifyAPI(object):
|
||||
cctxt = self.client.prepare(fanout=True)
|
||||
cctxt.cast(context, method, routers=router_ids)
|
||||
|
||||
@utils.disable_notifications
|
||||
def _notification_fanout(self, context, method, router_id=None, **kwargs):
|
||||
"""Fanout the information to all L3 agents.
|
||||
|
||||
|
@ -27,6 +27,7 @@ from oslo_utils import versionutils
|
||||
|
||||
from neutron.api.rpc.callbacks import resources
|
||||
from neutron.api.rpc.handlers import resources_rpc
|
||||
from neutron.common import utils
|
||||
from neutron.db import securitygroups_rpc_base as sg_rpc_base
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -169,6 +170,7 @@ class SecurityGroupAgentRpcApiMixin(object):
|
||||
topics.SECURITY_GROUP,
|
||||
topics.UPDATE)
|
||||
|
||||
@utils.disable_notifications
|
||||
def security_groups_rule_updated(self, context, security_groups):
|
||||
"""Notify rule updated security groups."""
|
||||
if not security_groups:
|
||||
@ -179,6 +181,7 @@ class SecurityGroupAgentRpcApiMixin(object):
|
||||
cctxt.cast(context, 'security_groups_rule_updated',
|
||||
security_groups=security_groups)
|
||||
|
||||
@utils.disable_notifications
|
||||
def security_groups_member_updated(self, context, security_groups):
|
||||
"""Notify member updated security groups."""
|
||||
if not security_groups:
|
||||
|
@ -1031,6 +1031,16 @@ def get_az_hints(resource):
|
||||
cfg.CONF.default_availability_zones)
|
||||
|
||||
|
||||
def disable_notifications(function):
|
||||
"""Decorator to disable notifications"""
|
||||
|
||||
@functools.wraps(function)
|
||||
def wrapper(*args, **kwargs):
|
||||
if cfg.CONF.rpc_workers is None or cfg.CONF.rpc_workers >= 1:
|
||||
return function(*args, **kwargs)
|
||||
return wrapper
|
||||
|
||||
|
||||
def skip_exceptions(exceptions):
|
||||
"""Decorator to catch and hide any provided exception in the argument"""
|
||||
|
||||
|
@ -400,6 +400,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
@log_helpers.log_method_call
|
||||
def _start_rpc_notifiers(self):
|
||||
"""Initialize RPC notifiers for agents."""
|
||||
self.ovo_notifier = None
|
||||
if cfg.CONF.rpc_workers is None or cfg.CONF.rpc_workers >= 1:
|
||||
self.ovo_notifier = ovo_rpc.OVOServerRpcInterface()
|
||||
self.notifier = rpc.AgentNotifierApi(topics.AGENT)
|
||||
if cfg.CONF.enable_traditional_dhcp:
|
||||
|
@ -31,6 +31,7 @@ from sqlalchemy.orm import exc
|
||||
|
||||
from neutron.api.rpc.handlers import dvr_rpc
|
||||
from neutron.api.rpc.handlers import securitygroups_rpc as sg_rpc
|
||||
from neutron.common import utils
|
||||
from neutron.db import l3_hamode_db
|
||||
from neutron.db import provisioning_blocks
|
||||
from neutron.plugins.ml2 import db as ml2_db
|
||||
@ -475,11 +476,13 @@ class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
|
||||
target = oslo_messaging.Target(topic=topic, version='1.0')
|
||||
self.client = n_rpc.get_client(target)
|
||||
|
||||
@utils.disable_notifications
|
||||
def network_delete(self, context, network_id):
|
||||
cctxt = self.client.prepare(topic=self.topic_network_delete,
|
||||
fanout=True)
|
||||
cctxt.cast(context, 'network_delete', network_id=network_id)
|
||||
|
||||
@utils.disable_notifications
|
||||
def port_update(self, context, port, network_type, segmentation_id,
|
||||
physical_network):
|
||||
cctxt = self.client.prepare(topic=self.topic_port_update,
|
||||
@ -488,22 +491,26 @@ class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
|
||||
network_type=network_type, segmentation_id=segmentation_id,
|
||||
physical_network=physical_network)
|
||||
|
||||
@utils.disable_notifications
|
||||
def port_delete(self, context, port_id):
|
||||
cctxt = self.client.prepare(topic=self.topic_port_delete,
|
||||
fanout=True)
|
||||
cctxt.cast(context, 'port_delete', port_id=port_id)
|
||||
|
||||
@utils.disable_notifications
|
||||
def network_update(self, context, network):
|
||||
cctxt = self.client.prepare(topic=self.topic_network_update,
|
||||
fanout=True, version='1.4')
|
||||
cctxt.cast(context, 'network_update', network=network)
|
||||
|
||||
@utils.disable_notifications
|
||||
def binding_deactivate(self, context, port_id, host, network_id):
|
||||
cctxt = self.client.prepare(topic=self.topic_port_binding_deactivate,
|
||||
fanout=True, version='1.5')
|
||||
cctxt.cast(context, 'binding_deactivate', port_id=port_id, host=host,
|
||||
network_id=network_id)
|
||||
|
||||
@utils.disable_notifications
|
||||
def binding_activate(self, context, port_id, host):
|
||||
cctxt = self.client.prepare(topic=self.topic_port_binding_activate,
|
||||
fanout=True, version='1.5')
|
||||
|
@ -23,6 +23,7 @@ import eventlet
|
||||
from eventlet import queue
|
||||
import netaddr
|
||||
from neutron_lib import constants
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from osprofiler import profiler
|
||||
import testscenarios
|
||||
@ -627,6 +628,25 @@ class SkipDecoratorTestCase(base.BaseTestCase):
|
||||
self.assertRaises(AttributeError, raise_attribute_error)
|
||||
|
||||
|
||||
class DisableNotificationTestCase(base.BaseTestCase):
|
||||
|
||||
@utils.disable_notifications
|
||||
def sample_method(self):
|
||||
raise AttributeError()
|
||||
|
||||
def test_notification_rpc_workers_lt_one(self):
|
||||
cfg.CONF.set_override('rpc_workers', 0)
|
||||
self.assertIsNone(self.sample_method())
|
||||
|
||||
def test_notification_rpc_workers_none(self):
|
||||
cfg.CONF.set_override('rpc_workers', None)
|
||||
self.assertRaises(AttributeError, self.sample_method)
|
||||
|
||||
def test_notification_rpc_workers_one(self):
|
||||
cfg.CONF.set_override('rpc_workers', 1)
|
||||
self.assertRaises(AttributeError, self.sample_method)
|
||||
|
||||
|
||||
class SignatureTestCase(base.BaseTestCase):
|
||||
|
||||
def test_sign_instance_id(self):
|
||||
|
@ -477,6 +477,10 @@
|
||||
$TEMPEST_CONFIG:
|
||||
neutron_plugin_options:
|
||||
is_igmp_snooping_enabled: True
|
||||
post-config:
|
||||
$NEUTRON_CONF:
|
||||
DEFAULT:
|
||||
rpc_workers: 0
|
||||
devstack_localrc:
|
||||
CIRROS_VERSION: 0.5.1
|
||||
DEFAULT_IMAGE_NAME: cirros-0.5.1-x86_64-uec
|
||||
@ -723,6 +727,11 @@
|
||||
s-container: false
|
||||
s-object: false
|
||||
s-proxy: false
|
||||
devstack_local_conf:
|
||||
post-config:
|
||||
$NEUTRON_CONF:
|
||||
DEFAULT:
|
||||
rpc_workers: 0
|
||||
zuul_copy_output:
|
||||
'/var/log/ovn': 'logs'
|
||||
'/var/log/openvswitch': 'logs'
|
||||
|
Loading…
Reference in New Issue
Block a user