2566be199a
Using blocking executor is not recommended for application. But it looks like some use it just because it's the default and are not aware their should change it despite of documentation and logging message. Choosing the application threading model is an important step of an application. This change deprecates it, in the future we will just make executor mandatory. This will ensure that application make a choice. Also this will reduce headache of oslo.messaging developers to make the driver code working in a sync and async. And to finish test coverage of blocking executor is 0%... This rework some tests to remove logging.captureWarnings() that can catch unwanted warning of other tests. Tests mocks warning instead. Related-bug: #694728 Change-Id: Ic67164d12e7a9bed76d6e64ca2ced12e3984ff5f
258 lines
9.3 KiB
Python
258 lines
9.3 KiB
Python
|
|
# Copyright 2013 Red Hat, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""
|
|
An RPC server exposes a number of endpoints, each of which contain a set of
|
|
methods which may be invoked remotely by clients over a given transport.
|
|
|
|
To create an RPC server, you supply a transport, target and a list of
|
|
endpoints.
|
|
|
|
A transport can be obtained simply by calling the get_rpc_transport() method::
|
|
|
|
transport = messaging.get_rpc_transport(conf)
|
|
|
|
which will load the appropriate transport driver according to the user's
|
|
messaging configuration. See get_rpc_transport() for more details.
|
|
|
|
The target supplied when creating an RPC server expresses the topic, server
|
|
name and - optionally - the exchange to listen on. See Target for more details
|
|
on these attributes.
|
|
|
|
Multiple RPC Servers may listen to the same topic (and exchange)
|
|
simultaneously. See RPCClient for details regarding how RPC requests are
|
|
distributed to the Servers in this case.
|
|
|
|
Each endpoint object may have a target attribute which may have namespace and
|
|
version fields set. By default, we use the 'null namespace' and version 1.0.
|
|
Incoming method calls will be dispatched to the first endpoint with the
|
|
requested method, a matching namespace and a compatible version number.
|
|
|
|
The first parameter to method invocations is always the request context
|
|
supplied by the client. The remaining parameters are the arguments supplied to
|
|
the method by the client. Endpoint methods may return a value. If so the RPC
|
|
Server will send the returned value back to the requesting client via the
|
|
transport.
|
|
|
|
The executor parameter controls how incoming messages will be received and
|
|
dispatched. Refer to the Executor documentation for descriptions of the types
|
|
of executors.
|
|
|
|
*Note:* If the "eventlet" executor is used, the threading and time library need
|
|
to be monkeypatched.
|
|
|
|
The RPC reply operation is best-effort: the server will consider the message
|
|
containing the reply successfully sent once it is accepted by the messaging
|
|
transport. The server does not guarantee that the reply is processed by the
|
|
RPC client. If the send fails an error will be logged and the server will
|
|
continue to processing incoming RPC requests.
|
|
|
|
Parameters to the method invocation and values returned from the method are
|
|
python primitive types. However the actual encoding of the data in the message
|
|
may not be in primitive form (e.g. the message payload may be a dictionary
|
|
encoded as an ASCII string using JSON). A serializer object is used to convert
|
|
incoming encoded message data to primitive types. The serializer is also used
|
|
to convert the return value from primitive types to an encoding suitable for
|
|
the message payload.
|
|
|
|
RPC servers have start(), stop() and wait() methods to begin handling
|
|
requests, stop handling requests, and wait for all in-process requests to
|
|
complete after the Server has been stopped.
|
|
|
|
A simple example of an RPC server with multiple endpoints might be::
|
|
|
|
from oslo_config import cfg
|
|
import oslo_messaging
|
|
import time
|
|
|
|
class ServerControlEndpoint(object):
|
|
|
|
target = oslo_messaging.Target(namespace='control',
|
|
version='2.0')
|
|
|
|
def __init__(self, server):
|
|
self.server = server
|
|
|
|
def stop(self, ctx):
|
|
if self.server:
|
|
self.server.stop()
|
|
|
|
class TestEndpoint(object):
|
|
|
|
def test(self, ctx, arg):
|
|
return arg
|
|
|
|
transport = oslo_messaging.get_rpc_transport(cfg.CONF)
|
|
target = oslo_messaging.Target(topic='test', server='server1')
|
|
endpoints = [
|
|
ServerControlEndpoint(None),
|
|
TestEndpoint(),
|
|
]
|
|
server = oslo_messaging.get_rpc_server(transport, target, endpoints,
|
|
executor='eventlet')
|
|
try:
|
|
server.start()
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
print("Stopping server")
|
|
|
|
server.stop()
|
|
server.wait()
|
|
|
|
"""
|
|
|
|
__all__ = [
|
|
'get_rpc_server',
|
|
'expected_exceptions',
|
|
'expose'
|
|
]
|
|
|
|
import logging
|
|
import sys
|
|
|
|
from debtcollector.updating import updated_kwarg_default_value
|
|
|
|
from oslo_messaging._i18n import _LE
|
|
from oslo_messaging.rpc import dispatcher as rpc_dispatcher
|
|
from oslo_messaging import server as msg_server
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class RPCServer(msg_server.MessageHandlingServer):
|
|
def __init__(self, transport, target, dispatcher, executor='blocking'):
|
|
super(RPCServer, self).__init__(transport, dispatcher, executor)
|
|
self._target = target
|
|
|
|
def _create_listener(self):
|
|
return self.transport._listen(self._target, 1, None)
|
|
|
|
def _process_incoming(self, incoming):
|
|
message = incoming[0]
|
|
try:
|
|
message.acknowledge()
|
|
except Exception:
|
|
LOG.exception(_LE("Can not acknowledge message. Skip processing"))
|
|
return
|
|
|
|
failure = None
|
|
try:
|
|
res = self.dispatcher.dispatch(message)
|
|
except rpc_dispatcher.ExpectedException as e:
|
|
failure = e.exc_info
|
|
LOG.debug(u'Expected exception during message handling (%s)', e)
|
|
except Exception:
|
|
# current sys.exc_info() content can be overridden
|
|
# by another exception raised by a log handler during
|
|
# LOG.exception(). So keep a copy and delete it later.
|
|
failure = sys.exc_info()
|
|
LOG.exception(_LE('Exception during message handling'))
|
|
|
|
try:
|
|
if failure is None:
|
|
message.reply(res)
|
|
else:
|
|
message.reply(failure=failure)
|
|
except Exception:
|
|
LOG.exception(_LE("Can not send reply for message"))
|
|
finally:
|
|
# NOTE(dhellmann): Remove circular object reference
|
|
# between the current stack frame and the traceback in
|
|
# exc_info.
|
|
del failure
|
|
|
|
|
|
@updated_kwarg_default_value('access_policy', None,
|
|
rpc_dispatcher.DefaultRPCAccessPolicy,
|
|
message='access_policy defaults to '
|
|
'LegacyRPCAccessPolicy which '
|
|
'exposes private methods. Explicitly '
|
|
'set access_policy to '
|
|
'DefaultRPCAccessPolicy or '
|
|
'ExplicitRPCAccessPolicy.',
|
|
version='?')
|
|
def get_rpc_server(transport, target, endpoints,
|
|
executor='blocking', serializer=None, access_policy=None):
|
|
"""Construct an RPC server.
|
|
|
|
:param transport: the messaging transport
|
|
:type transport: Transport
|
|
:param target: the exchange, topic and server to listen on
|
|
:type target: Target
|
|
:param endpoints: a list of endpoint objects
|
|
:type endpoints: list
|
|
:param executor: name of message executor - available values are
|
|
'eventlet' and 'threading'
|
|
:type executor: str
|
|
:param serializer: an optional entity serializer
|
|
:type serializer: Serializer
|
|
:param access_policy: an optional access policy.
|
|
Defaults to LegacyRPCAccessPolicy
|
|
:type access_policy: RPCAccessPolicyBase
|
|
"""
|
|
dispatcher = rpc_dispatcher.RPCDispatcher(endpoints, serializer,
|
|
access_policy)
|
|
return RPCServer(transport, target, dispatcher, executor)
|
|
|
|
|
|
def expected_exceptions(*exceptions):
|
|
"""Decorator for RPC endpoint methods that raise expected exceptions.
|
|
|
|
Marking an endpoint method with this decorator allows the declaration
|
|
of expected exceptions that the RPC server should not consider fatal,
|
|
and not log as if they were generated in a real error scenario.
|
|
|
|
Note that this will cause listed exceptions to be wrapped in an
|
|
ExpectedException, which is used internally by the RPC sever. The RPC
|
|
client will see the original exception type.
|
|
"""
|
|
def outer(func):
|
|
def inner(*args, **kwargs):
|
|
try:
|
|
return func(*args, **kwargs)
|
|
# Take advantage of the fact that we can catch
|
|
# multiple exception types using a tuple of
|
|
# exception classes, with subclass detection
|
|
# for free. Any exception that is not in or
|
|
# derived from the args passed to us will be
|
|
# ignored and thrown as normal.
|
|
except exceptions:
|
|
raise rpc_dispatcher.ExpectedException()
|
|
return inner
|
|
return outer
|
|
|
|
|
|
def expose(func):
|
|
"""Decorator for RPC endpoint methods that are exposed to the RPC client.
|
|
|
|
If the dispatcher's access_policy is set to ExplicitRPCAccessPolicy then
|
|
endpoint methods need to be explicitly exposed.::
|
|
|
|
# foo() cannot be invoked by an RPC client
|
|
def foo(self):
|
|
pass
|
|
|
|
# bar() can be invoked by an RPC client
|
|
@rpc.expose
|
|
def bar(self):
|
|
pass
|
|
|
|
"""
|
|
|
|
func.exposed = True
|
|
|
|
return func
|