New RPC layer implementation

* Implemented using kombu library

TODO (next commits):
 - Integrate it with Mistral
 - Make RPC implementation configurable
 - Wrap exceptions
 - Unit tests

Partially implements blueprint mistral-alternative-rpc

Co-Authored-By: Dawid Deja <dawid.deja@intel.com>
Change-Id: Ie7db5cce0c1fd6074d4f866b6c3ee1d008614355
This commit is contained in:
Nikolay Mahotkin 2015-06-17 13:17:09 +03:00 committed by Dawid Deja
parent 70944e5d82
commit f45482422d
12 changed files with 702 additions and 0 deletions

View File

View File

@ -0,0 +1,77 @@
# Copyright 2015 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
class RPCClient(object):
def __init__(self, conf):
"""Base class for RPCClient's drivers
RPC Client is responsible for sending requests to RPC Server.
All RPC client drivers have to inherit from this class.
:param conf: dict containing credentials such as
'user_id', 'password', and also exchange and connection info:
'exchange', 'topic', 'host', 'port', 'virtual_host'.
"""
self.conf = conf
@abc.abstractmethod
def sync_call(self, ctx, method, target=None, **kwargs):
"""Synchronous call of RPC method.
Blocks the thread and wait for method result.
"""
raise NotImplementedError
@abc.abstractmethod
def async_call(self, ctx, method, target=None, **kwargs):
"""Asynchronous call of RPC method.
Does not block the thread, just send invoking data to
the RPC server and immediately returns nothing.
"""
raise NotImplementedError
class RPCServer(object):
def __init__(self, conf):
"""Base class for RPCServer's drivers
RPC Server should listen for request coming from RPC Clients and
respond to them respectively to the registered endpoints.
All RPC server drivers have to inherit from this class.
:param conf: dict containing credentials such as
'user_id', 'password', and also exchange and connection info:
'exchange', 'topic', 'rabbit_host', 'rabbit_port', 'virtual_host'.
"""
self.conf = conf
@abc.abstractmethod
def register_endpoint(self, endpoint):
"""Registers a new RPC endpoint.
:param endpoint: an object containing methods which
will be used as RPC methods.
"""
raise NotImplementedError
@abc.abstractmethod
def run(self):
"""Runs the RPC server.
"""
raise NotImplementedError

View File

@ -0,0 +1,99 @@
# Copyright 2015 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import kombu
class Base(object):
"""Base class for Client and Server."""
@staticmethod
def _make_connection(amqp_host, amqp_port, amqp_user, amqp_password,
amqp_vhost):
"""Create connection.
This method creates object representing the connection to RabbitMQ.
:param amqp_host: Address of RabbitMQ server.
:param amqp_user: Username for connecting to RabbitMQ.
:param amqp_password: Password matching the given username.
:param amqp_vhost: Virtual host to connect to.
:param amqp_port: Port of RabbitMQ server.
:return: New connection to RabbitMQ.
"""
return kombu.BrokerConnection(
hostname=amqp_host,
userid=amqp_user,
password=amqp_password,
virtual_host=amqp_vhost,
port=amqp_port
)
@staticmethod
def _make_exchange(name, durable=False, auto_delete=True,
exchange_type='topic'):
"""Make named exchange.
This method creates object representing exchange on RabbitMQ. It would
create a new exchange if exchange with given name don't exists.
:param name: Name of the exchange.
:param durable: If set to True, messages on this exchange would be
store on disk - therefore can be retrieve after
failure.
:param auto_delete: If set to True, exchange would be automatically
deleted when none is connected.
:param exchange_type: Type of the exchange. Can be one of 'direct',
'topic', 'fanout', 'headers'. See Kombu docs for
further details.
:return: Kombu exchange object.
"""
return kombu.Exchange(
name=name,
type=exchange_type,
durable=durable,
auto_delete=auto_delete
)
@staticmethod
def _make_queue(name, exchange, routing_key='',
durable=False, auto_delete=True, **kwargs):
"""Make named queue for a given exchange.
This method creates object representing queue in RabbitMQ. It would
create a new queue if queue with given name don't exists.
:param name: Name of the queue
:param exchange: Kombu Exchange object (can be created using
_make_exchange).
:param routing_key: Routing key for queue. It behaves differently
depending the exchange type. See Kombu docs for
further details.
:param durable: If set to True, messages on this queue would be
store on disk - therefore can be retrieve after
failure.
:param auto_delete: If set to True, queue would be automatically
deleted when none is connected.
:param kwargs: See kombu documentation for all parameters than may be
may be passed to Queue.
:return: Kombu Queue object.
"""
return kombu.Queue(
name=name,
routing_key=routing_key,
exchange=exchange,
durable=durable,
auto_delete=auto_delete,
**kwargs
)

View File

@ -0,0 +1,42 @@
# Copyright 2015 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.engine.rpc_direct.kombu import kombu_client
# Example of using Kombu based RPC client.
def main():
conf = {
'user_id': 'guest',
'password': 'secret',
'exchange': 'my_exchange',
'topic': 'my_topic',
'server_id': 'host',
'host': 'localhost',
'port': 5672,
'virtual_host': '/'
}
kombu_rpc = kombu_client.KombuRPCClient(conf)
print(" [x] Requesting ...")
ctx = type('context', (object,), {'to_dict': lambda self: {}})()
response = kombu_rpc.sync_call(ctx, 'fib', n=44)
print(" [.] Got %r" % (response,))
if __name__ == '__main__':
main()

View File

@ -0,0 +1,51 @@
# Copyright 2015 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.engine.rpc_direct.kombu import kombu_server
# Simple example of endpoint of RPC server, which just
# calculates given fibonacci number.
class MyServer(object):
cache = {0: 0, 1: 1}
def fib(self, rpc_ctx, n):
if self.cache.get(n) is None:
self.cache[n] = (self.fib(rpc_ctx, n - 1)
+ self.fib(rpc_ctx, n - 2))
return self.cache[n]
def get_name(self, rpc_ctx):
return self.__class__.__name__
# Example of using Kombu based RPC server.
def main():
conf = {
'user_id': 'guest',
'password': 'secret',
'exchange': 'my_exchange',
'topic': 'my_topic',
'server_id': 'host',
'host': 'localhost',
'port': 5672,
'virtual_host': '/'
}
rpc_server = kombu_server.KombuRPCServer(conf)
rpc_server.register_endpoint(MyServer())
rpc_server.run()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,188 @@
# Copyright 2015 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import socket
import time
import kombu
from oslo_log import log as logging
from mistral.engine.rpc_direct import base as rpc_base
from mistral.engine.rpc_direct.kombu import base as kombu_base
from mistral import exceptions as exc
from mistral import utils
LOG = logging.getLogger(__name__)
IS_RECEIVED = 'kombu_rpc_is_received'
RESULT = 'kombu_rpc_result'
CORR_ID = 'kombu_rpc_correlation_id'
class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
def __init__(self, conf):
super(KombuRPCClient, self).__init__(conf)
self.exchange = conf.get('exchange', '')
self.user_id = conf.get('user_id', 'guest')
self.password = conf.get('password', 'guest')
self.topic = conf.get('topic', 'mistral')
self.server_id = conf.get('server_id', '')
self.host = conf.get('host', 'localhost')
self.port = conf.get('port', 5672)
self.virtual_host = conf.get('virtual_host', '/')
self.durable_queue = conf.get('durable_queues', False)
self.auto_delete = conf.get('auto_delete', False)
self._timeout = 180
self.conn = self._make_connection(
self.host,
self.port,
self.user_id,
self.password,
self.virtual_host
)
# Create exchange.
exchange = self._make_exchange(
self.exchange,
durable=self.durable_queue,
auto_delete=self.auto_delete
)
# Create queue.
queue_name = utils.generate_unicode_uuid()
self.callback_queue = kombu.Queue(
queue_name,
exchange=exchange,
routing_key=queue_name,
durable=False,
exclusive=True,
auto_delete=True
)
# Create consumer.
self.consumer = kombu.Consumer(
channel=self.conn.channel(),
queues=self.callback_queue,
callbacks=[self._on_response],
accept=['pickle', 'json']
)
self.consumer.qos(prefetch_count=1)
@staticmethod
def _on_response(response, message):
"""Callback on response.
This method is automatically called when a response is incoming and
decides if it is the message we are waiting for - the message with the
result.
:param response: the body of the amqp message already deserialized
by kombu
:param message: the plain amqp kombu.message with additional
information
"""
LOG.debug("Got response: {0}".format(response))
try:
message.ack()
except Exception as e:
LOG.exception("Failed to acknowledge AMQP message: %s" % e)
else:
LOG.debug("AMQP message acknowledged.")
# Process response.
if (utils.get_thread_local(CORR_ID) ==
message.properties['correlation_id']):
utils.set_thread_local(IS_RECEIVED, True)
if message.properties.get('type') == 'error':
raise response
utils.set_thread_local(RESULT, response)
def _wait_for_result(self):
"""Waits for the result from the server.
Waits for the result from the server, checks every second if
a timeout occurred. If a timeout occurred - the `RpcTimeout` exception
will be raised.
"""
start_time = time.time()
while not utils.get_thread_local(IS_RECEIVED):
try:
self.conn.drain_events()
except socket.timeout:
if self._timeout > 0:
if time.time() - start_time > self._timeout:
raise exc.MistralException("RPC Request timeout")
def _call(self, ctx, method, target, async=False, **kwargs):
"""Performs a remote call for the given method.
:param ctx: authentication context associated with mistral
:param method: name of the method that should be executed
:param kwargs: keyword parameters for the remote-method
:param target: Server name
:param async: bool value means whether the request is
asynchronous or not.
:return: result of the method or None if async.
"""
utils.set_thread_local(CORR_ID, utils.generate_unicode_uuid())
utils.set_thread_local(IS_RECEIVED, False)
self.consumer.consume()
body = {
'rpc_ctx': ctx.to_dict(),
'rpc_method': method,
'arguments': kwargs,
'async': async
}
LOG.debug("Publish request: {0}".format(body))
# Publish request.
with kombu.producers[self.conn].acquire(block=True) as producer:
producer.publish(
body=body,
exchange=self.exchange,
routing_key=self.topic,
reply_to=self.callback_queue.name,
correlation_id=utils.get_thread_local(CORR_ID),
delivery_mode=2
)
# Start waiting for response.
if async:
return
self._wait_for_result()
result = utils.get_thread_local(RESULT)
self._clear_thread_local()
return result
@staticmethod
def _clear_thread_local():
utils.set_thread_local(RESULT, None)
utils.set_thread_local(CORR_ID, None)
utils.set_thread_local(IS_RECEIVED, None)
def sync_call(self, ctx, method, target=None, **kwargs):
return self._call(ctx, method, async=False, target=target, **kwargs)
def async_call(self, ctx, method, target=None, **kwargs):
return self._call(ctx, method, async=True, target=target, **kwargs)

View File

@ -0,0 +1,170 @@
# Copyright 2015 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import socket
import threading
import kombu
from oslo_log import log as logging
from mistral import context as auth_context
from mistral.engine.rpc_direct import base as rpc_base
from mistral.engine.rpc_direct.kombu import base as kombu_base
from mistral import exceptions as exc
LOG = logging.getLogger(__name__)
class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
def __init__(self, conf):
super(KombuRPCServer, self).__init__(conf)
self.exchange = conf.get('exchange', '')
self.user_id = conf.get('user_id', 'guest')
self.password = conf.get('password', 'guest')
self.topic = conf.get('topic', 'mistral')
self.server_id = conf.get('server_id', '')
self.host = conf.get('host', 'localhost')
self.port = conf.get('port', 5672)
self.virtual_host = conf.get('virtual_host', '/')
self.durable_queue = conf.get('durable_queues', False)
self.auto_delete = conf.get('auto_delete', False)
self.routing_key = self.topic
self.channel = None
self.conn = None
self._running = threading.Event()
self.endpoints = []
@property
def is_running(self):
"""Return whether server is running."""
return self._running.is_set()
def run(self):
"""Start the server."""
self.conn = self._make_connection(
self.host,
self.port,
self.user_id,
self.password,
self.virtual_host,
)
LOG.info("Connected to AMQP at %s:%s" % (self.host, self.port))
try:
conn = kombu.connections[self.conn].acquire(block=True)
exchange = self._make_exchange(
self.exchange,
durable=self.durable_queue,
auto_delete=self.auto_delete
)
queue = self._make_queue(
self.topic,
exchange,
routing_key=self.routing_key,
durable=self.durable_queue,
auto_delete=self.auto_delete
)
with conn.Consumer(
queues=queue,
callbacks=[self._on_message_safe],
) as consumer:
consumer.qos(prefetch_count=1)
self._running.set()
while self.is_running:
try:
conn.drain_events(timeout=1)
except socket.timeout:
pass
except KeyboardInterrupt:
self.stop()
LOG.info("Server with id='{0}' stopped.".format(
self.server_id))
return
except socket.error as e:
raise exc.MistralException("Broker connection failed: %s" % e)
def stop(self):
"""Stop the server."""
self._running.clear()
def _get_rpc_method(self, method_name):
for endpoint in self.endpoints:
if hasattr(endpoint, method_name):
return getattr(endpoint, method_name)
return None
@staticmethod
def _set_auth_ctx(ctx):
if not isinstance(ctx, dict):
return
context = auth_context.MistralContext(**ctx)
auth_context.set_ctx(context)
def publish_message(self, body, reply_to, corr_id, type='response'):
with kombu.producers[self.conn].acquire(block=True) as producer:
producer.publish(
body=body,
exchange=self.exchange,
routing_key=reply_to,
correlation_id=corr_id,
type=type,
serializer='pickle' if type == 'error' else None
)
def _on_message_safe(self, request, message):
try:
return self._on_message(request, message)
except Exception as e:
self.publish_message(
e,
message.properties['reply_to'],
message.properties['correlation_id'],
type='error'
)
finally:
message.ack()
def _on_message(self, request, message):
LOG.debug('Received message %s',
request)
is_async = request.get('async', False)
rpc_context = request.get('rpc_ctx')
rpc_method_name = request.get('rpc_method')
arguments = request.get('arguments')
correlation_id = message.properties['correlation_id']
reply_to = message.properties['reply_to']
self._set_auth_ctx(rpc_context)
rpc_method = self._get_rpc_method(rpc_method_name)
if not rpc_method:
raise exc.MistralException("No such method: %s" % rpc_method_name)
response = rpc_method(rpc_ctx=rpc_context, **arguments)
if not is_async:
self.publish_message(
response,
reply_to,
correlation_id
)
def register_endpoint(self, endpoint):
self.endpoints.append(endpoint)

View File

@ -0,0 +1,41 @@
# Copyright 2015 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from mistral.tests.unit.engine import base
from mistral.utils import rpc_utils
class RPCTest(base.EngineTestCase):
def test_get_rabbit_config(self):
conf = cfg.CONF
rpc_info = rpc_utils.get_rabbit_info_from_oslo(conf.engine)
self.assertDictEqual(
{
'topic': 'mistral_engine',
'port': 5672,
'server_id': '0.0.0.0',
'user_id': 'guest',
'virtual_host': '/',
'host': 'localhost',
'exchange': 'openstack',
'password': 'guest',
'auto_delete': False,
'durable_queues': False,
},
rpc_info
)

View File

@ -0,0 +1,34 @@
# Copyright 2015 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
CONF = cfg.CONF
def get_rabbit_info_from_oslo(additional_conf):
return {
'user_id': CONF.oslo_messaging_rabbit.rabbit_userid,
'password': CONF.oslo_messaging_rabbit.rabbit_password,
'exchange': CONF.control_exchange,
'topic': additional_conf.topic,
'server_id': additional_conf.host,
'host': CONF.oslo_messaging_rabbit.rabbit_host,
'port': CONF.oslo_messaging_rabbit.rabbit_port,
'virtual_host': CONF.oslo_messaging_rabbit.rabbit_virtual_host,
'durable_queues': CONF.oslo_messaging_rabbit.amqp_durable_queues,
'auto_delete': CONF.oslo_messaging_rabbit.amqp_auto_delete
}