Port to oslo.messaging

Now that all preparations are done, actually port the code to use
oslo.messaging. This patch does as little as possible. Follow up patches
that refactor and cleanup the code and configuration files, will be
merged later. The reason for this is to make the patch as slim as
possible, to make review process more smooth and concentrated.

Details:
* neutron/common/rpc.py:
  - added init() and cleanup() to set global RPC layer state.
  - added utility functions: get_server(), get_client(), get_notifier()
    that wrap up oslo.messaging API a bit, enforcing eventlet executor
    and setting serializer, among other things.
  - removed PluginRpcDispatcher, instead introduced PluginRpcSerializer
    to use as a default serializer for API callbacks.

* neutron/common/rpc_compat.py:
  - emulated incubator RPC layer behaviour thru previously introduced
    stub classes (RpcCallback, RpcProxy, ...) using new oslo.messaging
    API.
  - switched to using new oslo.messaging exception types.

* neutron/service.py:
  - expect multiple RPC listeners that are of MessageHandlingServer
    type, not GreenThread.

* neutron/common/config.py:
  - initialize RPC layer in init()

* setup.cfg:
  - added entry points for old notifier drivers to retain backward
    compatibility.

* neutron/tests/...:
  - introduced fake_notifier to replace impl_fake.
  - faked out consume_in_thread() to avoid starting RPC listeners when
    running unit tests.
  - used 'fake' transport driver.
  - made sure neutron.test.* exceptions are caught.
  - initialize and clean up RPC layer for each test case.

* Ported all affected code from using neutron.openstack.common.notifier
  API to oslo.messaging.Notifier.

* rpc.set_defaults() was renamed to rpc.set_transport_defaults()

* other changes not worth mentioning here.

blueprint oslo-messaging

DocImpact

Change-Id: I5a91c34df6e300f2dc46217b1b16352fcc3039fc
This commit is contained in:
Ihar Hrachyshka 2014-06-02 17:40:38 +02:00
parent c4181a370f
commit 3aca3f7745
56 changed files with 439 additions and 282 deletions

View File

@ -27,8 +27,8 @@ from neutron.api.v2 import attributes
from neutron.api.v2 import resource as wsgi_resource
from neutron.common import constants as const
from neutron.common import exceptions
from neutron.common import rpc as n_rpc
from neutron.openstack.common import log as logging
from neutron.openstack.common.notifier import api as notifier_api
from neutron import policy
from neutron import quota
@ -69,7 +69,7 @@ class Controller(object):
self._native_sorting = self._is_native_sorting_supported()
self._policy_attrs = [name for (name, info) in self._attr_info.items()
if info.get('required_by_policy')]
self._publisher_id = notifier_api.publisher_id('network')
self._notifier = n_rpc.get_notifier('network')
# use plugin's dhcp notifier, if this is already instantiated
agent_notifiers = getattr(plugin, 'agent_notifiers', {})
self._dhcp_agent_notifier = (
@ -372,10 +372,8 @@ class Controller(object):
def create(self, request, body=None, **kwargs):
"""Creates a new instance of the requested entity."""
parent_id = kwargs.get(self._parent_id_name)
notifier_api.notify(request.context,
self._publisher_id,
self._notifier.info(request.context,
self._resource + '.create.start',
notifier_api.CONF.default_notification_level,
body)
body = Controller.prepare_request_body(request.context, body, True,
self._resource, self._attr_info,
@ -419,10 +417,8 @@ class Controller(object):
def notify(create_result):
notifier_method = self._resource + '.create.end'
notifier_api.notify(request.context,
self._publisher_id,
self._notifier.info(request.context,
notifier_method,
notifier_api.CONF.default_notification_level,
create_result)
self._send_dhcp_notification(request.context,
create_result,
@ -458,10 +454,8 @@ class Controller(object):
def delete(self, request, id, **kwargs):
"""Deletes the specified entity."""
notifier_api.notify(request.context,
self._publisher_id,
self._notifier.info(request.context,
self._resource + '.delete.start',
notifier_api.CONF.default_notification_level,
{self._resource + '_id': id})
action = self._plugin_handlers[self.DELETE]
@ -482,10 +476,8 @@ class Controller(object):
obj_deleter = getattr(self._plugin, action)
obj_deleter(request.context, id, **kwargs)
notifier_method = self._resource + '.delete.end'
notifier_api.notify(request.context,
self._publisher_id,
self._notifier.info(request.context,
notifier_method,
notifier_api.CONF.default_notification_level,
{self._resource + '_id': id})
result = {self._resource: self._view(request.context, obj)}
self._send_nova_notification(action, {}, result)
@ -502,10 +494,8 @@ class Controller(object):
msg = _("Invalid format: %s") % request.body
raise exceptions.BadRequest(resource='body', msg=msg)
payload['id'] = id
notifier_api.notify(request.context,
self._publisher_id,
self._notifier.info(request.context,
self._resource + '.update.start',
notifier_api.CONF.default_notification_level,
payload)
body = Controller.prepare_request_body(request.context, body, False,
self._resource, self._attr_info,
@ -541,11 +531,7 @@ class Controller(object):
obj = obj_updater(request.context, id, **kwargs)
result = {self._resource: self._view(request.context, obj)}
notifier_method = self._resource + '.update.end'
notifier_api.notify(request.context,
self._publisher_id,
notifier_method,
notifier_api.CONF.default_notification_level,
result)
self._notifier.info(request.context, notifier_method, result)
self._send_dhcp_notification(request.context,
result,
notifier_method)

View File

@ -26,9 +26,9 @@ import sys
from oslo.config import cfg
from neutron.common import config
from neutron.common import rpc as n_rpc
from neutron import context
from neutron import manager
from neutron.openstack.common.notifier import api as notifier_api
def main():
@ -37,33 +37,14 @@ def main():
cxt = context.get_admin_context()
plugin = manager.NeutronManager.get_plugin()
notifier = n_rpc.get_notifier('network')
for network in plugin.get_networks(cxt):
notifier_api.notify(cxt,
notifier_api.publisher_id('network'),
'network.exists',
notifier_api.INFO,
{'network': network})
notifier.info(cxt, 'network.exists', {'network': network})
for subnet in plugin.get_subnets(cxt):
notifier_api.notify(cxt,
notifier_api.publisher_id('network'),
'subnet.exists',
notifier_api.INFO,
{'subnet': subnet})
notifier.info(cxt, 'subnet.exists', {'subnet': subnet})
for port in plugin.get_ports(cxt):
notifier_api.notify(cxt,
notifier_api.publisher_id('network'),
'port.exists',
notifier_api.INFO,
{'port': port})
notifier.info(cxt, 'port.exists', {'port': port})
for router in plugin.get_routers(cxt):
notifier_api.notify(cxt,
notifier_api.publisher_id('network'),
'router.exists',
notifier_api.INFO,
{'router': router})
notifier.info(cxt, 'router.exists', {'router': router})
for floatingip in plugin.get_floatingips(cxt):
notifier_api.notify(cxt,
notifier_api.publisher_id('network'),
'floatingip.exists',
notifier_api.INFO,
{'floatingip': floatingip})
notifier.info(cxt, 'floatingip.exists', {'floatingip': floatingip})

View File

@ -20,13 +20,13 @@ Routines for configuring Neutron
import os
from oslo.config import cfg
from oslo import messaging
from paste import deploy
from neutron.api.v2 import attributes
from neutron.common import utils
from neutron.openstack.common.db import options as db_options
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron import version
@ -125,7 +125,7 @@ cfg.CONF.register_opts(core_opts)
cfg.CONF.register_cli_opts(core_cli_opts)
# Ensure that the control exchange is set correctly
rpc.set_defaults(control_exchange='neutron')
messaging.set_transport_defaults(control_exchange='neutron')
_SQL_CONNECTION_DEFAULT = 'sqlite://'
# Update the default QueuePool parameters. These can be tweaked by the
# configuration variables - max_pool_size, max_overflow and pool_timeout
@ -139,6 +139,11 @@ def init(args, **kwargs):
version='%%prog %s' % version.version_info.release_string(),
**kwargs)
# FIXME(ihrachys): if import is put in global, circular import
# failure occurs
from neutron.common import rpc as n_rpc
n_rpc.init(cfg.CONF)
# Validate that the base_mac is of the correct format
msg = attributes._validate_regex(cfg.CONF.base_mac,
attributes.MAC_PATTERN)

View File

@ -15,31 +15,122 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from oslo import messaging
from oslo.messaging import serializer as om_serializer
from neutron.common import exceptions
from neutron import context
from neutron.openstack.common import log as logging
from neutron.openstack.common.rpc import dispatcher
LOG = logging.getLogger(__name__)
class PluginRpcDispatcher(dispatcher.RpcDispatcher):
"""This class is used to convert RPC common context into
TRANSPORT = None
NOTIFIER = None
ALLOWED_EXMODS = [
exceptions.__name__,
]
EXTRA_EXMODS = []
TRANSPORT_ALIASES = {
'neutron.openstack.common.rpc.impl_fake': 'fake',
'neutron.openstack.common.rpc.impl_qpid': 'qpid',
'neutron.openstack.common.rpc.impl_kombu': 'rabbit',
'neutron.openstack.common.rpc.impl_zmq': 'zmq',
'neutron.rpc.impl_fake': 'fake',
'neutron.rpc.impl_qpid': 'qpid',
'neutron.rpc.impl_kombu': 'rabbit',
'neutron.rpc.impl_zmq': 'zmq',
}
def init(conf):
global TRANSPORT, NOTIFIER
exmods = get_allowed_exmods()
TRANSPORT = messaging.get_transport(conf,
allowed_remote_exmods=exmods,
aliases=TRANSPORT_ALIASES)
NOTIFIER = messaging.Notifier(TRANSPORT)
def cleanup():
global TRANSPORT, NOTIFIER
assert TRANSPORT is not None
assert NOTIFIER is not None
TRANSPORT.cleanup()
TRANSPORT = NOTIFIER = None
def add_extra_exmods(*args):
EXTRA_EXMODS.extend(args)
def clear_extra_exmods():
del EXTRA_EXMODS[:]
def get_allowed_exmods():
return ALLOWED_EXMODS + EXTRA_EXMODS
def get_client(target, version_cap=None, serializer=None):
assert TRANSPORT is not None
serializer = PluginRpcSerializer(serializer)
return messaging.RPCClient(TRANSPORT,
target,
version_cap=version_cap,
serializer=serializer)
def get_server(target, endpoints, serializer=None):
assert TRANSPORT is not None
serializer = PluginRpcSerializer(serializer)
return messaging.get_rpc_server(TRANSPORT,
target,
endpoints,
executor='eventlet',
serializer=serializer)
def get_notifier(service=None, host=None, publisher_id=None):
assert NOTIFIER is not None
if not publisher_id:
publisher_id = "%s.%s" % (service, host or cfg.CONF.host)
return NOTIFIER.prepare(publisher_id=publisher_id)
class PluginRpcSerializer(om_serializer.Serializer):
"""This serializer is used to convert RPC common context into
Neutron Context.
"""
def __init__(self, base):
super(PluginRpcSerializer, self).__init__()
self._base = base
def __init__(self, callbacks):
super(PluginRpcDispatcher, self).__init__(callbacks)
def serialize_entity(self, ctxt, entity):
if not self._base:
return entity
return self._base.serialize_entity(ctxt, entity)
def dispatch(self, rpc_ctxt, version, method, namespace, **kwargs):
rpc_ctxt_dict = rpc_ctxt.to_dict()
def deserialize_entity(self, ctxt, entity):
if not self._base:
return entity
return self._base.deserialize_entity(ctxt, entity)
def serialize_context(self, ctxt):
return ctxt.to_dict()
def deserialize_context(self, ctxt):
rpc_ctxt_dict = ctxt.copy()
user_id = rpc_ctxt_dict.pop('user_id', None)
if not user_id:
user_id = rpc_ctxt_dict.pop('user', None)
tenant_id = rpc_ctxt_dict.pop('tenant_id', None)
if not tenant_id:
tenant_id = rpc_ctxt_dict.pop('project_id', None)
neutron_ctxt = context.Context(user_id, tenant_id,
load_admin_roles=False, **rpc_ctxt_dict)
return super(PluginRpcDispatcher, self).dispatch(
neutron_ctxt, version, method, namespace, **kwargs)
return context.Context(user_id, tenant_id,
load_admin_roles=False, **rpc_ctxt_dict)

View File

@ -13,24 +13,63 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from oslo import messaging
from neutron.common import rpc as n_rpc
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.openstack.common.rpc import common as rpc_common
from neutron.openstack.common.rpc import dispatcher as rpc_dispatcher
from neutron.openstack.common.rpc import proxy
from neutron.openstack.common import service
LOG = logging.getLogger(__name__)
class RpcProxy(proxy.RpcProxy):
class RpcProxy(object):
'''
This class is created to facilitate migration from oslo-incubator
RPC layer implementation to oslo.messaging and is intended to
emulate RpcProxy class behaviour using oslo.messaging API once the
migration is applied.
'''
RPC_API_NAMESPACE = None
def __init__(self, topic, default_version, version_cap=None):
self.topic = topic
target = messaging.Target(topic=topic, version=default_version)
self._client = n_rpc.get_client(target, version_cap=version_cap)
def make_msg(self, method, **kwargs):
return {'method': method,
'namespace': self.RPC_API_NAMESPACE,
'args': kwargs}
def call(self, context, msg, **kwargs):
return self.__call_rpc_method(
context, msg, rpc_method='call', **kwargs)
def cast(self, context, msg, **kwargs):
self.__call_rpc_method(context, msg, rpc_method='cast', **kwargs)
def fanout_cast(self, context, msg, **kwargs):
kwargs['fanout'] = True
self.__call_rpc_method(context, msg, rpc_method='cast', **kwargs)
def __call_rpc_method(self, context, msg, **kwargs):
options = dict(
((opt, kwargs[opt])
for opt in ('fanout', 'timeout', 'topic', 'version')
if kwargs.get(opt))
)
if msg['namespace']:
options['namespace'] = msg['namespace']
if options:
callee = self._client.prepare(**options)
else:
callee = self._client
func = getattr(callee, kwargs['rpc_method'])
return func(context, msg['method'], **msg['args'])
class RpcCallback(object):
@ -40,6 +79,11 @@ class RpcCallback(object):
callback version using oslo.messaging API once the migration is
applied.
'''
RPC_API_VERSION = '1.0'
def __init__(self):
super(RpcCallback, self).__init__()
self.target = messaging.Target(version=self.RPC_API_VERSION)
class Service(service.Service):
@ -64,8 +108,7 @@ class Service(service.Service):
LOG.debug("Creating Consumer connection for Service %s" %
self.topic)
dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],
self.serializer)
dispatcher = [self.manager]
# Share this same connection for these Consumers
self.conn.create_consumer(self.topic, dispatcher, fanout=False)
@ -93,11 +136,30 @@ class Service(service.Service):
super(Service, self).stop()
class Connection(object):
def __init__(self):
super(Connection, self).__init__()
self.servers = []
def create_consumer(self, topic, proxy, fanout=False):
target = messaging.Target(
topic=topic, server=cfg.CONF.host, fanout=fanout)
server = n_rpc.get_server(target, proxy)
self.servers.append(server)
def consume_in_thread(self):
for server in self.servers:
server.start()
return self.servers
# functions
create_connection = rpc.create_connection
def create_connection(new=True):
return Connection()
# exceptions
RPCException = rpc_common.RPCException
RemoteError = rpc_common.RemoteError
MessagingTimeout = rpc_common.Timeout
RPCException = messaging.MessagingException
RemoteError = messaging.RemoteError
MessagingTimeout = messaging.MessagingTimeout

View File

@ -21,6 +21,7 @@ from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.v2 import attributes
from neutron.common import constants as l3_constants
from neutron.common import exceptions as n_exc
from neutron.common import rpc as n_rpc
from neutron.common import utils
from neutron.db import model_base
from neutron.db import models_v2
@ -28,7 +29,6 @@ from neutron.extensions import external_net
from neutron.extensions import l3
from neutron import manager
from neutron.openstack.common import log as logging
from neutron.openstack.common.notifier import api as notifier_api
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants
@ -481,11 +481,9 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
'tenant_id': port['tenant_id'],
'port_id': port['id'],
'subnet_id': port['fixed_ips'][0]['subnet_id']}
notifier_api.notify(context,
notifier_api.publisher_id('network'),
'router.interface.create',
notifier_api.CONF.default_notification_level,
{'router_interface': info})
notifier = n_rpc.get_notifier('network')
notifier.info(
context, 'router.interface.create', {'router_interface': info})
return info
def _confirm_router_interface_not_in_use(self, context, router_id,
@ -560,11 +558,9 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
'tenant_id': port['tenant_id'],
'port_id': port['id'],
'subnet_id': subnet['id']}
notifier_api.notify(context,
notifier_api.publisher_id('network'),
'router.interface.delete',
notifier_api.CONF.default_notification_level,
{'router_interface': info})
notifier = n_rpc.get_notifier('network')
notifier.info(
context, 'router.interface.delete', {'router_interface': info})
return info
def _get_floatingip(self, context, id):

View File

@ -15,7 +15,6 @@
# under the License.
from neutron.common import constants as consts
from neutron.common import rpc as p_rpc
from neutron.common import utils
from neutron import manager
from neutron.openstack.common import log as logging
@ -32,7 +31,7 @@ class MeteringRpcCallbacks(object):
self.meter_plugin = meter_plugin
def create_rpc_dispatcher(self):
return p_rpc.PluginRpcDispatcher([self])
return [self]
def get_sync_data_metering(self, context, **kwargs):
l3_plugin = manager.NeutronManager.get_service_plugins().get(

View File

@ -45,7 +45,9 @@ from neutron.openstack.common import systemd
from neutron.openstack.common import threadgroup
rpc = importutils.try_import('neutron.openstack.common.rpc')
#rpc = importutils.try_import('neutron.openstack.common.rpc')
# TODO(ihrachys): restore once oslo-rpc code is removed from the tree
rpc = None
CONF = cfg.CONF
LOG = logging.getLogger(__name__)

View File

@ -36,7 +36,6 @@ from neutron import context as q_context
from neutron.extensions import securitygroup as ext_sg
from neutron.openstack.common import excutils
from neutron.openstack.common import log
from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.bigswitch import config as pl_config
LOG = log.getLogger(__name__)
@ -106,7 +105,7 @@ class RestProxyAgent(rpc_compat.RpcCallback,
self.topic = topics.AGENT
self.plugin_rpc = PluginApi(topics.PLUGIN)
self.context = q_context.get_admin_context_without_session()
self.dispatcher = dispatcher.RpcDispatcher([self])
self.dispatcher = [self]
consumers = [[topics.PORT, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.dispatcher,

View File

@ -57,7 +57,6 @@ from neutron.api import extensions as neutron_extensions
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.common import constants as const
from neutron.common import exceptions
from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.common import utils
@ -121,8 +120,7 @@ class RestProxyCallbacks(rpc_compat.RpcCallback,
RPC_API_VERSION = '1.1'
def create_rpc_dispatcher(self):
return q_rpc.PluginRpcDispatcher([self,
agents_db.AgentExtRpcCallback()])
return [self, agents_db.AgentExtRpcCallback()]
def get_port_from_device(self, device):
port_id = re.sub(r"^tap", "", device)

View File

@ -31,7 +31,6 @@ from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.common import constants as q_const
from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.common import utils
@ -98,8 +97,7 @@ class BridgeRpcCallbacks(rpc_compat.RpcCallback,
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
"""
return q_rpc.PluginRpcDispatcher([self,
agents_db.AgentExtRpcCallback()])
return [self, agents_db.AgentExtRpcCallback()]
@classmethod
def get_port_from_device(cls, device):

View File

@ -28,7 +28,6 @@ from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.common import utils
@ -75,8 +74,7 @@ class N1kvRpcCallbacks(rpc_compat.RpcCallback,
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
"""
return q_rpc.PluginRpcDispatcher([self,
agents_db.AgentExtRpcCallback()])
return [self, agents_db.AgentExtRpcCallback()]
class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,

View File

@ -38,7 +38,6 @@ from neutron.common import topics
from neutron import context
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.common import constants as p_const
from neutron.plugins.hyperv.agent import utils
from neutron.plugins.hyperv.agent import utilsfactory
@ -106,8 +105,7 @@ class HyperVSecurityAgent(rpc_compat.RpcCallback,
consumers)
def _create_rpc_dispatcher(self):
rpc_callback = HyperVSecurityCallbackMixin(self)
return dispatcher.RpcDispatcher([rpc_callback])
return [HyperVSecurityCallbackMixin(self)]
class HyperVSecurityCallbackMixin(rpc_compat.RpcCallback,
@ -236,7 +234,7 @@ class HyperVNeutronAgent(rpc_compat.RpcCallback):
segmentation_id, port['admin_state_up'])
def _create_rpc_dispatcher(self):
return dispatcher.RpcDispatcher([self])
return [self]
def _get_vswitch_name(self, network_type, physical_network):
if network_type != p_const.TYPE_LOCAL:

View File

@ -17,7 +17,6 @@
# @author: Alessandro Pilotti, Cloudbase Solutions Srl
from neutron.common import constants as q_const
from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.db import agents_db
from neutron.db import dhcp_rpc_base
@ -48,8 +47,7 @@ class HyperVRpcCallbacks(
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
return q_rpc.PluginRpcDispatcher([self,
agents_db.AgentExtRpcCallback()])
return [self, agents_db.AgentExtRpcCallback()]
def get_device_details(self, rpc_context, **kwargs):
"""Agent requests device details."""

View File

@ -37,7 +37,6 @@ from neutron.common import utils as n_utils
from neutron import context
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.ibm.common import config # noqa
from neutron.plugins.ibm.common import constants
@ -156,7 +155,7 @@ class SdnveNeutronAgent(rpc_compat.RpcCallback):
"out-of-band")
def create_rpc_dispatcher(self):
return dispatcher.RpcDispatcher([self])
return [self]
def setup_integration_br(self, bridge_name, reset_br, out_of_band,
controller_ip=None):

View File

@ -23,7 +23,6 @@ from oslo.config import cfg
from neutron.common import constants as n_const
from neutron.common import exceptions as n_exc
from neutron.common import rpc as n_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.db import agents_db
@ -54,8 +53,7 @@ class SdnveRpcCallbacks():
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
return n_rpc.PluginRpcDispatcher([self,
agents_db.AgentExtRpcCallback()])
return [self, agents_db.AgentExtRpcCallback()]
def sdnve_info(self, rpc_context, **kwargs):
'''Update new information.'''

View File

@ -45,7 +45,6 @@ from neutron.common import utils as q_utils
from neutron import context
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.common import constants as p_const
from neutron.plugins.linuxbridge.common import config # noqa
from neutron.plugins.linuxbridge.common import constants as lconst
@ -816,7 +815,7 @@ class LinuxBridgeRpcCallbacks(rpc_compat.RpcCallback,
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
return dispatcher.RpcDispatcher([self])
return [self]
class LinuxBridgePluginApi(agent_rpc.PluginApi,

View File

@ -23,7 +23,6 @@ from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.v2 import attributes
from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc
from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.common import utils
@ -72,8 +71,7 @@ class LinuxBridgeRpcCallbacks(rpc_compat.RpcCallback,
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
return q_rpc.PluginRpcDispatcher([self,
agents_db.AgentExtRpcCallback()])
return [self, agents_db.AgentExtRpcCallback()]
@classmethod
def get_port_from_device(cls, device):

View File

@ -29,7 +29,6 @@ from sqlalchemy.orm import exc as sa_exc
from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron.common import rpc as n_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.db import agents_db
@ -189,8 +188,7 @@ class MidoRpcCallbacks(rpc_compat.RpcCallback,
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
"""
return n_rpc.PluginRpcDispatcher([self,
agents_db.AgentExtRpcCallback()])
return [self, agents_db.AgentExtRpcCallback()]
class MidonetPluginException(n_exc.NeutronException):

View File

@ -13,9 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo import messaging
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import constants as q_const
from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.db import agents_db
@ -46,13 +47,15 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
# 1.0 Initial version (from openvswitch/linuxbridge)
# 1.1 Support Security Group RPC
# FIXME(ihrachys): we can't use rpc_compat.RpcCallback here due to
# inheritance problems
target = messaging.Target(version=RPC_API_VERSION)
def __init__(self, notifier, type_manager):
# REVISIT(kmestery): This depends on the first three super classes
# not having their own __init__ functions. If an __init__() is added
# to one, this could break. Fix this and add a unit test to cover this
# test in H3.
# FIXME(ihrachys): we can't use rpc_compat.RpcCallback here due
# to inheritance problems
super(RpcCallbacks, self).__init__(notifier, type_manager)
def create_rpc_dispatcher(self):
@ -61,8 +64,7 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
return q_rpc.PluginRpcDispatcher([self,
agents_db.AgentExtRpcCallback()])
return [self, agents_db.AgentExtRpcCallback()]
@classmethod
def _device_to_port_id(cls, device):

View File

@ -35,7 +35,6 @@ from neutron.common import utils as q_utils
from neutron import context
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.common import constants as p_const
from neutron.plugins.mlnx.agent import utils
from neutron.plugins.mlnx.common import config # noqa
@ -218,7 +217,7 @@ class MlnxEswitchRpcCallbacks(rpc_compat.RpcCallback,
or support more than one class as the target of rpc messages,
override this method.
"""
return dispatcher.RpcDispatcher([self])
return [self]
class MlnxEswitchPluginApi(agent_rpc.PluginApi,

View File

@ -17,7 +17,6 @@
from oslo.config import cfg
from neutron.common import constants as q_const
from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.db import agents_db
from neutron.db import api as db_api
@ -48,8 +47,7 @@ class MlnxRpcCallbacks(rpc_compat.RpcCallback,
or support more than one class as the target of RPC messages,
override this method.
"""
return q_rpc.PluginRpcDispatcher([self,
agents_db.AgentExtRpcCallback()])
return [self, agents_db.AgentExtRpcCallback()]
@classmethod
def get_port_from_device(cls, device):

View File

@ -38,7 +38,6 @@ from neutron import context as q_context
from neutron.extensions import securitygroup as ext_sg
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.nec.common import config
@ -157,8 +156,7 @@ class NECNeutronAgent(object):
self, self.sg_agent)
self.callback_sg = SecurityGroupAgentRpcCallback(self.context,
self.sg_agent)
self.dispatcher = dispatcher.RpcDispatcher([self.callback_nec,
self.callback_sg])
self.dispatcher = [self.callback_nec, self.callback_sg]
# Define the listening consumer for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]

View File

@ -18,7 +18,6 @@
from oslo.config import cfg
from neutron.agent.common import config
from neutron.openstack.common import rpc # noqa
from neutron.plugins.nec.common import constants as nconst

View File

@ -22,7 +22,6 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.v2 import attributes as attrs
from neutron.common import constants as const
from neutron.common import exceptions as n_exc
from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.db import agents_db
@ -147,12 +146,12 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
# NOTE: callback_sg is referred to from the sg unit test.
self.callback_sg = SecurityGroupServerRpcCallback()
callbacks = [NECPluginV2RPCCallbacks(self.safe_reference),
DhcpRpcCallback(),
L3RpcCallback(),
self.callback_sg,
agents_db.AgentExtRpcCallback()]
self.dispatcher = q_rpc.PluginRpcDispatcher(callbacks)
self.dispatcher = [
NECPluginV2RPCCallbacks(self.safe_reference),
DhcpRpcCallback(),
L3RpcCallback(),
self.callback_sg,
agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
# Consume from all consumers in a thread
@ -722,7 +721,7 @@ class NECPluginV2RPCCallbacks(rpc_compat.RpcCallback):
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
return q_rpc.PluginRpcDispatcher([self])
return [self]
def update_ports(self, rpc_context, **kwargs):
"""Update ports' information and activate/deavtivate them.

View File

@ -39,7 +39,6 @@ from neutron.common import utils as n_utils
from neutron import context
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.common import constants as p_const
from neutron.plugins.ofagent.common import config # noqa
from neutron.plugins.openvswitch.common import constants
@ -351,7 +350,7 @@ class OFANeutronAgent(rpc_compat.RpcCallback,
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
"""
return dispatcher.RpcDispatcher([self])
return [self]
def _provision_local_vlan_outbound_for_tunnel(self, lvid,
segmentation_id, ofports):

View File

@ -32,7 +32,6 @@ from neutron.common import topics
from neutron import context as n_context
from neutron.extensions import securitygroup as ext_sg
from neutron.openstack.common import log as logging
from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.oneconvergence.lib import config
LOG = logging.getLogger(__name__)
@ -120,8 +119,7 @@ class NVSDNeutronAgent(rpc_compat.RpcCallback):
self, self.sg_agent)
self.callback_sg = SecurityGroupAgentRpcCallback(self.context,
self.sg_agent)
self.dispatcher = dispatcher.RpcDispatcher([self.callback_oc,
self.callback_sg])
self.dispatcher = [self.callback_oc, self.callback_sg]
# Define the listening consumer for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]

View File

@ -23,7 +23,6 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.common import constants as q_const
from neutron.common import exceptions as nexception
from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.db import agents_db
@ -61,8 +60,7 @@ class NVSDPluginRpcCallbacks(rpc_compat.RpcCallback,
def create_rpc_dispatcher(self):
"""Get the rpc dispatcher for this manager."""
return q_rpc.PluginRpcDispatcher([self,
agents_db.AgentExtRpcCallback()])
return [self, agents_db.AgentExtRpcCallback()]
@staticmethod
def get_port_from_device(device):

View File

@ -41,7 +41,6 @@ from neutron.common import utils as q_utils
from neutron import context
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.common import constants as p_const
from neutron.plugins.openvswitch.common import config # noqa
from neutron.plugins.openvswitch.common import constants
@ -500,7 +499,7 @@ class OVSNeutronAgent(rpc_compat.RpcCallback,
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
return dispatcher.RpcDispatcher([self])
return [self]
def provision_local_vlan(self, net_uuid, network_type, physical_network,
segmentation_id):

View File

@ -23,7 +23,6 @@ from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.v2 import attributes
from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc
from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.common import utils
@ -80,8 +79,7 @@ class OVSRpcCallbacks(rpc_compat.RpcCallback,
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
return q_rpc.PluginRpcDispatcher([self,
agents_db.AgentExtRpcCallback()])
return [self, agents_db.AgentExtRpcCallback()]
@classmethod
def get_port_from_device(cls, device):

View File

@ -42,7 +42,6 @@ from neutron.common import topics
from neutron import context as q_context
from neutron.extensions import securitygroup as ext_sg
from neutron.openstack.common import log
from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.ryu.common import config # noqa
@ -209,7 +208,7 @@ class OVSNeutronOFPRyuAgent(rpc_compat.RpcCallback,
consumers)
def _create_rpc_dispatcher(self):
return dispatcher.RpcDispatcher([self])
return [self]
def _setup_integration_br(self, root_helper, integ_br,
tunnel_ip, ovsdb_port, ovsdb_ip):

View File

@ -23,7 +23,6 @@ from ryu.app import rest_nw_id
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc
from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.db import api as db
@ -59,7 +58,7 @@ class RyuRpcCallbacks(rpc_compat.RpcCallback,
self.ofp_rest_api_addr = ofp_rest_api_addr
def create_rpc_dispatcher(self):
return q_rpc.PluginRpcDispatcher([self])
return [self]
def get_ofp_rest_api(self, context, **kwargs):
LOG.debug(_("get_ofp_rest_api: %s"), self.ofp_rest_api_addr)

View File

@ -24,7 +24,6 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.v2 import attributes
from neutron.common import constants as const
from neutron.common import exceptions as ntn_exc
from neutron.common import rpc as n_rpc
from neutron.common import rpc_compat
from neutron.db import agents_db
from neutron.db import db_base_plugin_v2
@ -55,8 +54,7 @@ class NSXRpcCallbacks(rpc_compat.RpcCallback,
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
return n_rpc.PluginRpcDispatcher([self,
agents_db.AgentExtRpcCallback()])
return [self, agents_db.AgentExtRpcCallback()]
def handle_network_dhcp_access(plugin, context, network, action):

View File

@ -26,7 +26,6 @@ from oslo.config import cfg
from neutron.api.v2 import attributes
from neutron.common import exceptions
import neutron.common.utils as utils
from neutron import manager
from neutron.openstack.common import excutils
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
@ -263,6 +262,9 @@ class OwnerCheck(policy.Check):
# resource is handled by the core plugin. It might be worth
# having a way to map resources to plugins so to make this
# check more general
# FIXME(ihrachys): if import is put in global, circular
# import failure occurs
from neutron import manager
f = getattr(manager.NeutronManager.get_instance().plugin,
'get_%s' % parent_res)
# f *must* exist, if not found it is better to let neutron

View File

@ -13,13 +13,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
import inspect
import logging as std_logging
import os
import random
from oslo.config import cfg
from oslo.messaging import server as rpc_server
from neutron.common import config
from neutron.common import rpc_compat
@ -112,23 +112,25 @@ class RpcWorker(object):
"""Wraps a worker to be handled by ProcessLauncher"""
def __init__(self, plugin):
self._plugin = plugin
self._server = None
self._servers = []
def start(self):
# We may have just forked from parent process. A quick disposal of the
# existing sql connections avoids producing errors later when they are
# discovered to be broken.
session.get_engine().pool.dispose()
self._server = self._plugin.start_rpc_listener()
self._servers = self._plugin.start_rpc_listener()
def wait(self):
if isinstance(self._server, eventlet.greenthread.GreenThread):
self._server.wait()
for server in self._servers:
if isinstance(server, rpc_server.MessageHandlingServer):
server.wait()
def stop(self):
if isinstance(self._server, eventlet.greenthread.GreenThread):
self._server.kill()
self._server = None
for server in self._servers:
if isinstance(server, rpc_server.MessageHandlingServer):
server.kill()
self._servers = []
def serve_rpc():

View File

@ -20,7 +20,6 @@
from oslo.config import cfg
from neutron.common import exceptions as n_exception
from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron import context as neutron_context
@ -42,7 +41,7 @@ class FirewallCallbacks(rpc_compat.RpcCallback):
self.plugin = plugin
def create_rpc_dispatcher(self):
return q_rpc.PluginRpcDispatcher([self])
return [self]
def set_firewall_status(self, context, firewall_id, status, **kwargs):
"""Agent uses this to set a firewall's status."""

View File

@ -21,7 +21,6 @@ from oslo.config import cfg
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.common import constants as q_const
from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.db import api as qdbapi
@ -46,7 +45,7 @@ class L3RouterPluginRpcCallbacks(rpc_compat.RpcCallback,
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
"""
return q_rpc.PluginRpcDispatcher([self])
return [self]
class L3RouterPlugin(db_base_plugin_v2.CommonDbMixin,

View File

@ -22,7 +22,6 @@ from oslo.config import cfg
from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc
from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.db import agents_db
@ -66,8 +65,7 @@ class LoadBalancerCallbacks(rpc_compat.RpcCallback):
self.plugin = plugin
def create_rpc_dispatcher(self):
return q_rpc.PluginRpcDispatcher(
[self, agents_db.AgentExtRpcCallback(self.plugin)])
return [self, agents_db.AgentExtRpcCallback(self.plugin)]
def get_ready_devices(self, context, host=None):
with context.session.begin(subtransactions=True):

View File

@ -26,6 +26,7 @@ from neutron.agent.common import config
from neutron.agent import rpc as agent_rpc
from neutron.common import config as common_config
from neutron.common import constants as constants
from neutron.common import rpc as n_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.common import utils
@ -34,7 +35,6 @@ from neutron import manager
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common.notifier import api as notifier_api
from neutron.openstack.common import periodic_task
from neutron.openstack.common import service
from neutron import service as neutron_service
@ -114,11 +114,8 @@ class MeteringAgent(MeteringPluginRpc, manager.Manager):
'host': self.host}
LOG.debug(_("Send metering report: %s"), data)
notifier_api.notify(self.context,
notifier_api.publisher_id('metering'),
'l3.meter',
notifier_api.CONF.default_notification_level,
data)
notifier = n_rpc.get_notifier('metering')
notifier.info(self.context, 'l3.meter', data)
info['pkts'] = 0
info['bytes'] = 0
info['time'] = 0

View File

@ -20,10 +20,10 @@ import requests
import netaddr
from oslo.config import cfg
from oslo import messaging
import six
from neutron.common import exceptions
from neutron.common import rpc as n_rpc
from neutron.common import rpc_compat
from neutron import context as ctx
from neutron.openstack.common import lockutils
@ -184,12 +184,13 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver):
# history
# 1.0 Initial version
RPC_API_VERSION = '1.0'
# TODO(ihrachys): we can't use RpcCallback here due to inheritance
# issues
target = messaging.Target(version=RPC_API_VERSION)
def __init__(self, agent, host):
# TODO(ihrachys): we can't use RpcCallback here due to
# inheritance issues
self.host = host
self.conn = rpc_compat.create_connection(new=True)
context = ctx.get_admin_context_without_session()
@ -225,7 +226,7 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver):
for k, v in csrs_found.items()])
def create_rpc_dispatcher(self):
return n_rpc.PluginRpcDispatcher([self])
return [self]
def vpnservice_updated(self, context, **kwargs):
"""Handle VPNaaS service driver change notifications."""

View File

@ -23,11 +23,11 @@ import shutil
import jinja2
import netaddr
from oslo.config import cfg
from oslo import messaging
import six
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron import context
from neutron.openstack.common import lockutils
@ -487,9 +487,11 @@ class IPsecDriver(device_drivers.DeviceDriver):
RPC_API_VERSION = '1.0'
# TODO(ihrachys): we can't use RpcCallback here due to inheritance
# issues
target = messaging.Target(version=RPC_API_VERSION)
def __init__(self, agent, host):
# TODO(ihrachys): we can't use RpcCallback here due to
# inheritance issues
self.agent = agent
self.conf = self.agent.conf
self.root_helper = self.agent.root_helper
@ -514,7 +516,7 @@ class IPsecDriver(device_drivers.DeviceDriver):
interval=self.conf.ipsec.ipsec_status_check_interval)
def create_rpc_dispatcher(self):
return q_rpc.PluginRpcDispatcher([self])
return [self]
def _update_nat(self, vpnservice, func):
"""Setting up nat rule in iptables.

View File

@ -16,7 +16,6 @@ import netaddr
from netaddr import core as net_exc
from neutron.common import exceptions
from neutron.common import rpc as n_rpc
from neutron.common import rpc_compat
from neutron.openstack.common import excutils
from neutron.openstack.common import log as logging
@ -55,7 +54,7 @@ class CiscoCsrIPsecVpnDriverCallBack(rpc_compat.RpcCallback):
self.driver = driver
def create_rpc_dispatcher(self):
return n_rpc.PluginRpcDispatcher([self])
return [self]
def get_vpn_services_on_host(self, context, host=None):
"""Retuns info on the vpnservices on the host."""

View File

@ -16,7 +16,6 @@
# under the License.
import netaddr
from neutron.common import rpc as n_rpc
from neutron.common import rpc_compat
from neutron.openstack.common import log as logging
from neutron.services.vpn.common import topics
@ -42,7 +41,7 @@ class IPsecVpnDriverCallBack(rpc_compat.RpcCallback):
self.driver = driver
def create_rpc_dispatcher(self):
return n_rpc.PluginRpcDispatcher([self])
return [self]
def get_vpn_services_on_host(self, context, host=None):
"""Returns the vpnservices on the host."""

View File

@ -29,15 +29,14 @@ import eventlet.timeout
import fixtures
import mock
from oslo.config import cfg
from oslo.messaging import conffixture as messaging_conffixture
import testtools
from neutron.common import config
from neutron.common import rpc as n_rpc
from neutron.db import agentschedulers_db
from neutron import manager
from neutron.openstack.common.notifier import api as notifier_api
from neutron.openstack.common.notifier import test_notifier
from neutron.openstack.common import rpc
from neutron.openstack.common.rpc import impl_fake
from neutron.tests import fake_notifier
from neutron.tests import post_mortem_debug
@ -58,6 +57,10 @@ def fake_use_fatal_exceptions(*args):
return True
def fake_consume_in_threads(self):
return []
class BaseTestCase(testtools.TestCase):
def cleanup_core_plugin(self):
@ -90,16 +93,10 @@ class BaseTestCase(testtools.TestCase):
if core_plugin is not None:
cfg.CONF.set_override('core_plugin', core_plugin)
def _cleanup_test_notifier(self):
test_notifier.NOTIFICATIONS = []
def setup_notification_driver(self, notification_driver=None):
# to reload the drivers
self.addCleanup(notifier_api._reset_drivers)
self.addCleanup(self._cleanup_test_notifier)
notifier_api._reset_drivers()
self.addCleanup(fake_notifier.reset)
if notification_driver is None:
notification_driver = [test_notifier.__name__]
notification_driver = [fake_notifier.__name__]
cfg.CONF.set_override("notification_driver", notification_driver)
@staticmethod
@ -113,10 +110,6 @@ class BaseTestCase(testtools.TestCase):
else:
conf(args)
def _cleanup_rpc_backend(self):
rpc._RPCIMPL = None
impl_fake.CONSUMERS.clear()
def setUp(self):
super(BaseTestCase, self).setUp()
@ -124,8 +117,6 @@ class BaseTestCase(testtools.TestCase):
# test-specific cleanup has a chance to release references.
self.addCleanup(self.cleanup_core_plugin)
self.addCleanup(self._cleanup_rpc_backend)
# Configure this first to ensure pm debugging support for setUp()
if os.environ.get('OS_POST_MORTEM_DEBUG') in TRUE_STRING:
self.addOnException(post_mortem_debug.exception_handler)
@ -179,6 +170,25 @@ class BaseTestCase(testtools.TestCase):
'neutron.common.exceptions.NeutronException.use_fatal_exceptions',
fake_use_fatal_exceptions))
# don't actually start RPC listeners when testing
self.useFixture(fixtures.MonkeyPatch(
'neutron.common.rpc_compat.Connection.consume_in_thread',
fake_consume_in_threads))
self.useFixture(fixtures.MonkeyPatch(
'oslo.messaging.Notifier', fake_notifier.FakeNotifier))
self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
self.messaging_conf.transport_driver = 'fake'
self.messaging_conf.response_timeout = 15
self.useFixture(self.messaging_conf)
self.addCleanup(n_rpc.clear_extra_exmods)
n_rpc.add_extra_exmods('neutron.test')
self.addCleanup(n_rpc.cleanup)
n_rpc.init(CONF)
if sys.version_info < (2, 7) and getattr(self, 'fmt', '') == 'xml':
raise self.skipException('XML Testing Skipped in Py26')

View File

@ -0,0 +1,50 @@
# Copyright 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import functools
NOTIFICATIONS = []
def reset():
del NOTIFICATIONS[:]
FakeMessage = collections.namedtuple('Message',
['publisher_id', 'priority',
'event_type', 'payload'])
class FakeNotifier(object):
def __init__(self, transport, publisher_id=None):
self.transport = transport
self.publisher_id = publisher_id
for priority in ('debug', 'info', 'warn', 'error', 'critical'):
setattr(self, priority,
functools.partial(self._notify, priority=priority.upper()))
def prepare(self, publisher_id=None):
if publisher_id is None:
publisher_id = self.publisher_id
return self.__class__(self.transport, publisher_id)
def _notify(self, ctxt, event_type, payload, priority):
msg = dict(publisher_id=self.publisher_id,
priority=priority,
event_type=event_type,
payload=payload)
NOTIFICATIONS.append(msg)

View File

@ -23,9 +23,9 @@ Unit Tests for hyperv neutron rpc
import mock
from neutron.agent import rpc as agent_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.openstack.common import context
from neutron.openstack.common import rpc
from neutron.plugins.hyperv import agent_notifier_api as ana
from neutron.plugins.hyperv.common import constants
from neutron.tests import base
@ -38,19 +38,19 @@ class rpcHyperVApiTestCase(base.BaseTestCase):
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if method == 'call' else None
expected_msg = rpcapi.make_msg(method, **kwargs)
expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION
if rpc_method == 'cast' and method == 'run_instance':
kwargs['call'] = False
with mock.patch.object(rpc, rpc_method) as rpc_method_mock:
proxy = rpc_compat.RpcProxy
with mock.patch.object(proxy, rpc_method) as rpc_method_mock:
rpc_method_mock.return_value = expected_retval
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(retval, expected_retval)
expected_args = [ctxt, topic, expected_msg]
for arg, expected_arg in zip(rpc_method_mock.call_args[0],
expected_args):
self.assertEqual(arg, expected_arg)
expected = [
mock.call(ctxt, expected_msg, topic=topic)
]
rpc_method_mock.assert_has_calls(expected)
def test_delete_network(self):
rpcapi = ana.AgentNotifierApi(topics.AGENT)

View File

@ -35,7 +35,6 @@ class rpcApiTestCase(base.BaseTestCase):
expected_retval = 'foo' if method == 'call' else None
if not expected_msg:
expected_msg = rpcapi.make_msg(method, **kwargs)
expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION
if rpc_method == 'cast' and method == 'run_instance':
kwargs['call'] = False
@ -49,15 +48,19 @@ class rpcApiTestCase(base.BaseTestCase):
return expected_retval
self.useFixture(fixtures.MonkeyPatch(
'neutron.openstack.common.rpc.' + rpc_method, _fake_rpc_method))
'neutron.common.rpc_compat.RpcProxy.' + rpc_method,
_fake_rpc_method))
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(expected_retval, retval)
expected_args = [ctxt, topic, expected_msg]
expected_args = [ctxt, expected_msg]
expected_kwargs = {'topic': topic}
for arg, expected_arg in zip(self.fake_args, expected_args):
# skip the first argument which is 'self'
for arg, expected_arg in zip(self.fake_args[1:], expected_args):
self.assertEqual(expected_arg, arg)
self.assertEqual(expected_kwargs, self.fake_kwargs)
def test_delete_network(self):
rpcapi = plb.AgentNotifierApi(topics.AGENT)

View File

@ -20,9 +20,9 @@ Unit Tests for ml2 rpc
import mock
from neutron.agent import rpc as agent_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.openstack.common import context
from neutron.openstack.common import rpc
from neutron.plugins.ml2.drivers import type_tunnel
from neutron.plugins.ml2 import rpc as plugin_rpc
from neutron.tests import base
@ -34,20 +34,19 @@ class RpcApiTestCase(base.BaseTestCase):
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if method == 'call' else None
expected_msg = rpcapi.make_msg(method, **kwargs)
expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION
if rpc_method == 'cast' and method == 'run_instance':
kwargs['call'] = False
rpc = rpc_compat.RpcProxy
with mock.patch.object(rpc, rpc_method) as rpc_method_mock:
rpc_method_mock.return_value = expected_retval
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(retval, expected_retval)
expected_args = [ctxt, topic, expected_msg]
for arg, expected_arg in zip(rpc_method_mock.call_args[0],
expected_args):
self.assertEqual(arg, expected_arg)
expected = [
mock.call(ctxt, expected_msg, topic=topic)
]
rpc_method_mock.assert_has_calls(expected)
def test_delete_network(self):
rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT)

View File

@ -37,7 +37,6 @@ class rpcApiTestCase(base.BaseTestCase):
expected_retval = 'foo' if method == 'call' else None
if not expected_msg:
expected_msg = rpcapi.make_msg(method, **kwargs)
expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION
if rpc_method == 'cast' and method == 'run_instance':
kwargs['call'] = False
@ -51,15 +50,19 @@ class rpcApiTestCase(base.BaseTestCase):
return expected_retval
self.useFixture(fixtures.MonkeyPatch(
'neutron.openstack.common.rpc.' + rpc_method, _fake_rpc_method))
'neutron.common.rpc_compat.RpcProxy.' + rpc_method,
_fake_rpc_method))
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(expected_retval, retval)
expected_args = [ctxt, topic, expected_msg]
expected_args = [ctxt, expected_msg]
expected_kwargs = {'topic': topic}
for arg, expected_arg in zip(self.fake_args, expected_args):
# skip the first argument which is 'self'
for arg, expected_arg in zip(self.fake_args[1:], expected_args):
self.assertEqual(expected_arg, arg)
self.assertEqual(expected_kwargs, self.fake_kwargs)
def test_delete_network(self):
rpcapi = agent_notify_api.AgentNotifierApi(topics.AGENT)

View File

@ -34,7 +34,6 @@ class rpcApiTestCase(base.BaseTestCase):
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if method == 'call' else None
expected_msg = rpcapi.make_msg(method, **kwargs)
expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION
if rpc_method == 'cast' and method == 'run_instance':
kwargs['call'] = False
@ -48,15 +47,19 @@ class rpcApiTestCase(base.BaseTestCase):
return expected_retval
self.useFixture(fixtures.MonkeyPatch(
'neutron.openstack.common.rpc.' + rpc_method, _fake_rpc_method))
'neutron.common.rpc_compat.RpcProxy.' + rpc_method,
_fake_rpc_method))
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(retval, expected_retval)
expected_args = [ctxt, topic, expected_msg]
expected_args = [ctxt, expected_msg]
expected_kwargs = {'topic': topic}
for arg, expected_arg in zip(self.fake_args, expected_args):
# skip the first argument which is 'self'
for arg, expected_arg in zip(self.fake_args[1:], expected_args):
self.assertEqual(arg, expected_arg)
self.assertEqual(expected_kwargs, self.fake_kwargs)
def test_delete_network(self):
rpcapi = povs.AgentNotifierApi(topics.AGENT)

View File

@ -18,10 +18,10 @@ import mock
from oslo.config import cfg
from neutron.agent.common import config
from neutron.openstack.common.notifier import test_notifier
from neutron.openstack.common import uuidutils
from neutron.services.metering.agents import metering_agent
from neutron.tests import base
from neutron.tests import fake_notifier
_uuid = uuidutils.generate_uuid
@ -96,8 +96,8 @@ class TestMeteringOperations(base.BaseTestCase):
'bytes': 444}}
self.agent._metering_loop()
self.assertNotEqual(len(test_notifier.NOTIFICATIONS), 0)
for n in test_notifier.NOTIFICATIONS:
self.assertNotEqual(len(fake_notifier.NOTIFICATIONS), 0)
for n in fake_notifier.NOTIFICATIONS:
if n['event_type'] == 'l3.meter':
break

View File

@ -27,7 +27,7 @@ class AgentRPCPluginApi(base.BaseTestCase):
agent = rpc.PluginApi('fake_topic')
ctxt = context.RequestContext('fake_user', 'fake_project')
expect_val = 'foo'
with mock.patch('neutron.openstack.common.rpc.call') as rpc_call:
with mock.patch('neutron.common.rpc_compat.RpcProxy.call') as rpc_call:
rpc_call.return_value = expect_val
func_obj = getattr(agent, method)
if method == 'tunnel_sync':

View File

@ -33,12 +33,12 @@ from neutron.api.v2 import router
from neutron.common import exceptions as n_exc
from neutron import context
from neutron import manager
from neutron.openstack.common.notifier import api as notifer_api
from neutron.openstack.common import policy as common_policy
from neutron.openstack.common import uuidutils
from neutron import policy
from neutron import quota
from neutron.tests import base
from neutron.tests import fake_notifier
from neutron.tests.unit import testlib_api
@ -1242,41 +1242,42 @@ class V2Views(base.BaseTestCase):
class NotificationTest(APIv2TestBase):
def _resource_op_notifier(self, opname, resource, expected_errors=False,
notification_level='INFO'):
def setUp(self):
super(NotificationTest, self).setUp()
fake_notifier.reset()
def _resource_op_notifier(self, opname, resource, expected_errors=False):
initial_input = {resource: {'name': 'myname'}}
instance = self.plugin.return_value
instance.get_networks.return_value = initial_input
instance.get_networks_count.return_value = 0
expected_code = exc.HTTPCreated.code
with mock.patch.object(notifer_api, 'notify') as mynotifier:
if opname == 'create':
initial_input[resource]['tenant_id'] = _uuid()
res = self.api.post_json(
_get_path('networks'),
initial_input, expect_errors=expected_errors)
if opname == 'update':
res = self.api.put_json(
_get_path('networks', id=_uuid()),
initial_input, expect_errors=expected_errors)
expected_code = exc.HTTPOk.code
if opname == 'delete':
initial_input[resource]['tenant_id'] = _uuid()
res = self.api.delete(
_get_path('networks', id=_uuid()),
expect_errors=expected_errors)
expected_code = exc.HTTPNoContent.code
expected = [mock.call(mock.ANY,
'network.' + cfg.CONF.host,
resource + "." + opname + ".start",
notification_level,
mock.ANY),
mock.call(mock.ANY,
'network.' + cfg.CONF.host,
resource + "." + opname + ".end",
notification_level,
mock.ANY)]
self.assertEqual(expected, mynotifier.call_args_list)
if opname == 'create':
initial_input[resource]['tenant_id'] = _uuid()
res = self.api.post_json(
_get_path('networks'),
initial_input, expect_errors=expected_errors)
if opname == 'update':
res = self.api.put_json(
_get_path('networks', id=_uuid()),
initial_input, expect_errors=expected_errors)
expected_code = exc.HTTPOk.code
if opname == 'delete':
initial_input[resource]['tenant_id'] = _uuid()
res = self.api.delete(
_get_path('networks', id=_uuid()),
expect_errors=expected_errors)
expected_code = exc.HTTPNoContent.code
expected_events = ('.'.join([resource, opname, "start"]),
'.'.join([resource, opname, "end"]))
self.assertEqual(len(fake_notifier.NOTIFICATIONS),
len(expected_events))
for msg, event in zip(fake_notifier.NOTIFICATIONS, expected_events):
self.assertEqual('INFO', msg['priority'])
self.assertEqual(event, msg['event_type'])
self.assertEqual(res.status_int, expected_code)
def test_network_create_notifer(self):
@ -1288,11 +1289,6 @@ class NotificationTest(APIv2TestBase):
def test_network_update_notifer(self):
self._resource_op_notifier('update', 'network')
def test_network_create_notifer_with_log_level(self):
cfg.CONF.set_override('default_notification_level', 'DEBUG')
self._resource_op_notifier('create', 'network',
notification_level='DEBUG')
class DHCPNotificationTest(APIv2TestBase):
def _test_dhcp_notifier(self, opname, resource, initial_input=None):

View File

@ -38,9 +38,9 @@ from neutron.extensions import l3
from neutron import manager
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common.notifier import test_notifier
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants as service_constants
from neutron.tests import fake_notifier
from neutron.tests.unit import test_agent_ext_plugin
from neutron.tests.unit import test_api_v2
from neutron.tests.unit import test_api_v2_extension
@ -660,7 +660,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
'subnet.create.end',
'router.interface.create',
'router.interface.delete']
test_notifier.NOTIFICATIONS = []
fake_notifier.reset()
with self.router() as r:
with self.subnet() as s:
body = self._router_interface_action('add',
@ -683,9 +683,9 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
self.assertEqual(
set(exp_notifications),
set(n['event_type'] for n in test_notifier.NOTIFICATIONS))
set(n['event_type'] for n in fake_notifier.NOTIFICATIONS))
for n in test_notifier.NOTIFICATIONS:
for n in fake_notifier.NOTIFICATIONS:
if n['event_type'].startswith('router.interface.'):
payload = n['payload']['router_interface']
self.assertIn('id', payload)

View File

@ -23,6 +23,7 @@ alembic>=0.4.1
six>=1.7.0
stevedore>=0.14
oslo.config>=1.2.1
oslo.messaging>=1.3.0
oslo.rootwrap
python-novaclient>=2.17.0

View File

@ -169,6 +169,14 @@ neutron.ml2.mechanism_drivers =
fslsdn = neutron.plugins.ml2.drivers.mechanism_fslsdn:FslsdnMechanismDriver
neutron.openstack.common.cache.backends =
memory = neutron.openstack.common.cache._backends.memory:MemoryBackend
# These are for backwards compat with Icehouse notification_driver configuration values
oslo.messaging.notify.drivers =
neutron.openstack.common.notifier.log_notifier = oslo.messaging.notify._impl_log:LogDriver
neutron.openstack.common.notifier.no_op_notifier = oslo.messaging.notify._impl_noop:NoOpDriver
neutron.openstack.common.notifier.rpc_notifier2 = oslo.messaging.notify._impl_messaging:MessagingV2Driver
neutron.openstack.common.notifier.rpc_notifier = oslo.messaging.notify._impl_messaging:MessagingDriver
neutron.openstack.common.notifier.test_notifier = oslo.messaging.notify._impl_test:TestDriver
[build_sphinx]
all_files = 1