diff --git a/mistral/engine/rpc_direct/__init__.py b/mistral/engine/rpc_direct/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mistral/engine/rpc_direct/base.py b/mistral/engine/rpc_direct/base.py new file mode 100644 index 000000000..e8ab0f174 --- /dev/null +++ b/mistral/engine/rpc_direct/base.py @@ -0,0 +1,77 @@ +# Copyright 2015 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc + + +class RPCClient(object): + def __init__(self, conf): + """Base class for RPCClient's drivers + + RPC Client is responsible for sending requests to RPC Server. + All RPC client drivers have to inherit from this class. + + :param conf: dict containing credentials such as + 'user_id', 'password', and also exchange and connection info: + 'exchange', 'topic', 'host', 'port', 'virtual_host'. + """ + self.conf = conf + + @abc.abstractmethod + def sync_call(self, ctx, method, target=None, **kwargs): + """Synchronous call of RPC method. + + Blocks the thread and wait for method result. + """ + raise NotImplementedError + + @abc.abstractmethod + def async_call(self, ctx, method, target=None, **kwargs): + """Asynchronous call of RPC method. + + Does not block the thread, just send invoking data to + the RPC server and immediately returns nothing. + """ + raise NotImplementedError + + +class RPCServer(object): + def __init__(self, conf): + """Base class for RPCServer's drivers + + RPC Server should listen for request coming from RPC Clients and + respond to them respectively to the registered endpoints. + All RPC server drivers have to inherit from this class. + + :param conf: dict containing credentials such as + 'user_id', 'password', and also exchange and connection info: + 'exchange', 'topic', 'rabbit_host', 'rabbit_port', 'virtual_host'. + """ + self.conf = conf + + @abc.abstractmethod + def register_endpoint(self, endpoint): + """Registers a new RPC endpoint. + + :param endpoint: an object containing methods which + will be used as RPC methods. + """ + raise NotImplementedError + + @abc.abstractmethod + def run(self): + """Runs the RPC server. + + """ + raise NotImplementedError diff --git a/mistral/engine/rpc_direct/kombu/__init__.py b/mistral/engine/rpc_direct/kombu/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mistral/engine/rpc_direct/kombu/base.py b/mistral/engine/rpc_direct/kombu/base.py new file mode 100644 index 000000000..83327fb08 --- /dev/null +++ b/mistral/engine/rpc_direct/kombu/base.py @@ -0,0 +1,99 @@ +# Copyright 2015 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import kombu + + +class Base(object): + """Base class for Client and Server.""" + + @staticmethod + def _make_connection(amqp_host, amqp_port, amqp_user, amqp_password, + amqp_vhost): + """Create connection. + + This method creates object representing the connection to RabbitMQ. + + :param amqp_host: Address of RabbitMQ server. + :param amqp_user: Username for connecting to RabbitMQ. + :param amqp_password: Password matching the given username. + :param amqp_vhost: Virtual host to connect to. + :param amqp_port: Port of RabbitMQ server. + :return: New connection to RabbitMQ. + """ + return kombu.BrokerConnection( + hostname=amqp_host, + userid=amqp_user, + password=amqp_password, + virtual_host=amqp_vhost, + port=amqp_port + ) + + @staticmethod + def _make_exchange(name, durable=False, auto_delete=True, + exchange_type='topic'): + """Make named exchange. + + This method creates object representing exchange on RabbitMQ. It would + create a new exchange if exchange with given name don't exists. + + :param name: Name of the exchange. + :param durable: If set to True, messages on this exchange would be + store on disk - therefore can be retrieve after + failure. + :param auto_delete: If set to True, exchange would be automatically + deleted when none is connected. + :param exchange_type: Type of the exchange. Can be one of 'direct', + 'topic', 'fanout', 'headers'. See Kombu docs for + further details. + :return: Kombu exchange object. + """ + return kombu.Exchange( + name=name, + type=exchange_type, + durable=durable, + auto_delete=auto_delete + ) + + @staticmethod + def _make_queue(name, exchange, routing_key='', + durable=False, auto_delete=True, **kwargs): + """Make named queue for a given exchange. + + This method creates object representing queue in RabbitMQ. It would + create a new queue if queue with given name don't exists. + + :param name: Name of the queue + :param exchange: Kombu Exchange object (can be created using + _make_exchange). + :param routing_key: Routing key for queue. It behaves differently + depending the exchange type. See Kombu docs for + further details. + :param durable: If set to True, messages on this queue would be + store on disk - therefore can be retrieve after + failure. + :param auto_delete: If set to True, queue would be automatically + deleted when none is connected. + :param kwargs: See kombu documentation for all parameters than may be + may be passed to Queue. + :return: Kombu Queue object. + """ + return kombu.Queue( + name=name, + routing_key=routing_key, + exchange=exchange, + durable=durable, + auto_delete=auto_delete, + **kwargs + ) diff --git a/mistral/engine/rpc_direct/kombu/examples/__init__.py b/mistral/engine/rpc_direct/kombu/examples/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mistral/engine/rpc_direct/kombu/examples/client.py b/mistral/engine/rpc_direct/kombu/examples/client.py new file mode 100644 index 000000000..2304f2240 --- /dev/null +++ b/mistral/engine/rpc_direct/kombu/examples/client.py @@ -0,0 +1,42 @@ +# Copyright 2015 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.engine.rpc_direct.kombu import kombu_client + + +# Example of using Kombu based RPC client. +def main(): + conf = { + 'user_id': 'guest', + 'password': 'secret', + 'exchange': 'my_exchange', + 'topic': 'my_topic', + 'server_id': 'host', + 'host': 'localhost', + 'port': 5672, + 'virtual_host': '/' + } + kombu_rpc = kombu_client.KombuRPCClient(conf) + + print(" [x] Requesting ...") + + ctx = type('context', (object,), {'to_dict': lambda self: {}})() + + response = kombu_rpc.sync_call(ctx, 'fib', n=44) + + print(" [.] Got %r" % (response,)) + + +if __name__ == '__main__': + main() diff --git a/mistral/engine/rpc_direct/kombu/examples/server.py b/mistral/engine/rpc_direct/kombu/examples/server.py new file mode 100644 index 000000000..93f24de87 --- /dev/null +++ b/mistral/engine/rpc_direct/kombu/examples/server.py @@ -0,0 +1,51 @@ +# Copyright 2015 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.engine.rpc_direct.kombu import kombu_server + + +# Simple example of endpoint of RPC server, which just +# calculates given fibonacci number. +class MyServer(object): + cache = {0: 0, 1: 1} + + def fib(self, rpc_ctx, n): + if self.cache.get(n) is None: + self.cache[n] = (self.fib(rpc_ctx, n - 1) + + self.fib(rpc_ctx, n - 2)) + return self.cache[n] + + def get_name(self, rpc_ctx): + return self.__class__.__name__ + + +# Example of using Kombu based RPC server. +def main(): + conf = { + 'user_id': 'guest', + 'password': 'secret', + 'exchange': 'my_exchange', + 'topic': 'my_topic', + 'server_id': 'host', + 'host': 'localhost', + 'port': 5672, + 'virtual_host': '/' + } + rpc_server = kombu_server.KombuRPCServer(conf) + rpc_server.register_endpoint(MyServer()) + rpc_server.run() + + +if __name__ == '__main__': + main() diff --git a/mistral/engine/rpc_direct/kombu/kombu_client.py b/mistral/engine/rpc_direct/kombu/kombu_client.py new file mode 100644 index 000000000..21366ba04 --- /dev/null +++ b/mistral/engine/rpc_direct/kombu/kombu_client.py @@ -0,0 +1,188 @@ +# Copyright 2015 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import socket +import time + +import kombu +from oslo_log import log as logging + +from mistral.engine.rpc_direct import base as rpc_base +from mistral.engine.rpc_direct.kombu import base as kombu_base +from mistral import exceptions as exc +from mistral import utils + + +LOG = logging.getLogger(__name__) +IS_RECEIVED = 'kombu_rpc_is_received' +RESULT = 'kombu_rpc_result' +CORR_ID = 'kombu_rpc_correlation_id' + + +class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base): + def __init__(self, conf): + super(KombuRPCClient, self).__init__(conf) + + self.exchange = conf.get('exchange', '') + self.user_id = conf.get('user_id', 'guest') + self.password = conf.get('password', 'guest') + self.topic = conf.get('topic', 'mistral') + self.server_id = conf.get('server_id', '') + self.host = conf.get('host', 'localhost') + self.port = conf.get('port', 5672) + self.virtual_host = conf.get('virtual_host', '/') + self.durable_queue = conf.get('durable_queues', False) + self.auto_delete = conf.get('auto_delete', False) + self._timeout = 180 + self.conn = self._make_connection( + self.host, + self.port, + self.user_id, + self.password, + self.virtual_host + ) + + # Create exchange. + exchange = self._make_exchange( + self.exchange, + durable=self.durable_queue, + auto_delete=self.auto_delete + ) + + # Create queue. + queue_name = utils.generate_unicode_uuid() + self.callback_queue = kombu.Queue( + queue_name, + exchange=exchange, + routing_key=queue_name, + durable=False, + exclusive=True, + auto_delete=True + ) + + # Create consumer. + self.consumer = kombu.Consumer( + channel=self.conn.channel(), + queues=self.callback_queue, + callbacks=[self._on_response], + accept=['pickle', 'json'] + ) + self.consumer.qos(prefetch_count=1) + + @staticmethod + def _on_response(response, message): + """Callback on response. + + This method is automatically called when a response is incoming and + decides if it is the message we are waiting for - the message with the + result. + + :param response: the body of the amqp message already deserialized + by kombu + :param message: the plain amqp kombu.message with additional + information + """ + LOG.debug("Got response: {0}".format(response)) + + try: + message.ack() + except Exception as e: + LOG.exception("Failed to acknowledge AMQP message: %s" % e) + else: + LOG.debug("AMQP message acknowledged.") + + # Process response. + if (utils.get_thread_local(CORR_ID) == + message.properties['correlation_id']): + utils.set_thread_local(IS_RECEIVED, True) + + if message.properties.get('type') == 'error': + raise response + utils.set_thread_local(RESULT, response) + + def _wait_for_result(self): + """Waits for the result from the server. + + Waits for the result from the server, checks every second if + a timeout occurred. If a timeout occurred - the `RpcTimeout` exception + will be raised. + """ + start_time = time.time() + + while not utils.get_thread_local(IS_RECEIVED): + try: + self.conn.drain_events() + except socket.timeout: + if self._timeout > 0: + if time.time() - start_time > self._timeout: + raise exc.MistralException("RPC Request timeout") + + def _call(self, ctx, method, target, async=False, **kwargs): + """Performs a remote call for the given method. + + :param ctx: authentication context associated with mistral + :param method: name of the method that should be executed + :param kwargs: keyword parameters for the remote-method + :param target: Server name + :param async: bool value means whether the request is + asynchronous or not. + :return: result of the method or None if async. + """ + utils.set_thread_local(CORR_ID, utils.generate_unicode_uuid()) + utils.set_thread_local(IS_RECEIVED, False) + + self.consumer.consume() + + body = { + 'rpc_ctx': ctx.to_dict(), + 'rpc_method': method, + 'arguments': kwargs, + 'async': async + } + + LOG.debug("Publish request: {0}".format(body)) + + # Publish request. + with kombu.producers[self.conn].acquire(block=True) as producer: + producer.publish( + body=body, + exchange=self.exchange, + routing_key=self.topic, + reply_to=self.callback_queue.name, + correlation_id=utils.get_thread_local(CORR_ID), + delivery_mode=2 + ) + + # Start waiting for response. + if async: + return + + self._wait_for_result() + result = utils.get_thread_local(RESULT) + + self._clear_thread_local() + + return result + + @staticmethod + def _clear_thread_local(): + utils.set_thread_local(RESULT, None) + utils.set_thread_local(CORR_ID, None) + utils.set_thread_local(IS_RECEIVED, None) + + def sync_call(self, ctx, method, target=None, **kwargs): + return self._call(ctx, method, async=False, target=target, **kwargs) + + def async_call(self, ctx, method, target=None, **kwargs): + return self._call(ctx, method, async=True, target=target, **kwargs) diff --git a/mistral/engine/rpc_direct/kombu/kombu_server.py b/mistral/engine/rpc_direct/kombu/kombu_server.py new file mode 100644 index 000000000..090f5add7 --- /dev/null +++ b/mistral/engine/rpc_direct/kombu/kombu_server.py @@ -0,0 +1,170 @@ +# Copyright 2015 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import socket +import threading + +import kombu +from oslo_log import log as logging + +from mistral import context as auth_context +from mistral.engine.rpc_direct import base as rpc_base +from mistral.engine.rpc_direct.kombu import base as kombu_base +from mistral import exceptions as exc + + +LOG = logging.getLogger(__name__) + + +class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): + def __init__(self, conf): + super(KombuRPCServer, self).__init__(conf) + + self.exchange = conf.get('exchange', '') + self.user_id = conf.get('user_id', 'guest') + self.password = conf.get('password', 'guest') + self.topic = conf.get('topic', 'mistral') + self.server_id = conf.get('server_id', '') + self.host = conf.get('host', 'localhost') + self.port = conf.get('port', 5672) + self.virtual_host = conf.get('virtual_host', '/') + self.durable_queue = conf.get('durable_queues', False) + self.auto_delete = conf.get('auto_delete', False) + self.routing_key = self.topic + self.channel = None + self.conn = None + self._running = threading.Event() + self.endpoints = [] + + @property + def is_running(self): + """Return whether server is running.""" + return self._running.is_set() + + def run(self): + """Start the server.""" + self.conn = self._make_connection( + self.host, + self.port, + self.user_id, + self.password, + self.virtual_host, + ) + LOG.info("Connected to AMQP at %s:%s" % (self.host, self.port)) + + try: + conn = kombu.connections[self.conn].acquire(block=True) + exchange = self._make_exchange( + self.exchange, + durable=self.durable_queue, + auto_delete=self.auto_delete + ) + queue = self._make_queue( + self.topic, + exchange, + routing_key=self.routing_key, + durable=self.durable_queue, + auto_delete=self.auto_delete + ) + with conn.Consumer( + queues=queue, + callbacks=[self._on_message_safe], + ) as consumer: + consumer.qos(prefetch_count=1) + self._running.set() + while self.is_running: + try: + conn.drain_events(timeout=1) + except socket.timeout: + pass + except KeyboardInterrupt: + self.stop() + LOG.info("Server with id='{0}' stopped.".format( + self.server_id)) + return + except socket.error as e: + raise exc.MistralException("Broker connection failed: %s" % e) + + def stop(self): + """Stop the server.""" + self._running.clear() + + def _get_rpc_method(self, method_name): + for endpoint in self.endpoints: + if hasattr(endpoint, method_name): + return getattr(endpoint, method_name) + + return None + + @staticmethod + def _set_auth_ctx(ctx): + if not isinstance(ctx, dict): + return + + context = auth_context.MistralContext(**ctx) + auth_context.set_ctx(context) + + def publish_message(self, body, reply_to, corr_id, type='response'): + with kombu.producers[self.conn].acquire(block=True) as producer: + producer.publish( + body=body, + exchange=self.exchange, + routing_key=reply_to, + correlation_id=corr_id, + type=type, + serializer='pickle' if type == 'error' else None + ) + + def _on_message_safe(self, request, message): + try: + return self._on_message(request, message) + except Exception as e: + self.publish_message( + e, + message.properties['reply_to'], + message.properties['correlation_id'], + type='error' + ) + finally: + message.ack() + + def _on_message(self, request, message): + LOG.debug('Received message %s', + request) + + is_async = request.get('async', False) + rpc_context = request.get('rpc_ctx') + rpc_method_name = request.get('rpc_method') + arguments = request.get('arguments') + correlation_id = message.properties['correlation_id'] + reply_to = message.properties['reply_to'] + + self._set_auth_ctx(rpc_context) + + rpc_method = self._get_rpc_method(rpc_method_name) + + if not rpc_method: + raise exc.MistralException("No such method: %s" % rpc_method_name) + + response = rpc_method(rpc_ctx=rpc_context, **arguments) + + if not is_async: + self.publish_message( + response, + reply_to, + correlation_id + ) + + def register_endpoint(self, endpoint): + self.endpoints.append(endpoint) diff --git a/mistral/tests/unit/engine/rpc_direct/__init__.py b/mistral/tests/unit/engine/rpc_direct/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mistral/tests/unit/engine/rpc_direct/test_rpc.py b/mistral/tests/unit/engine/rpc_direct/test_rpc.py new file mode 100644 index 000000000..e5824303f --- /dev/null +++ b/mistral/tests/unit/engine/rpc_direct/test_rpc.py @@ -0,0 +1,41 @@ +# Copyright 2015 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg + +from mistral.tests.unit.engine import base +from mistral.utils import rpc_utils + + +class RPCTest(base.EngineTestCase): + def test_get_rabbit_config(self): + conf = cfg.CONF + + rpc_info = rpc_utils.get_rabbit_info_from_oslo(conf.engine) + + self.assertDictEqual( + { + 'topic': 'mistral_engine', + 'port': 5672, + 'server_id': '0.0.0.0', + 'user_id': 'guest', + 'virtual_host': '/', + 'host': 'localhost', + 'exchange': 'openstack', + 'password': 'guest', + 'auto_delete': False, + 'durable_queues': False, + }, + rpc_info + ) diff --git a/mistral/utils/rpc_utils.py b/mistral/utils/rpc_utils.py new file mode 100644 index 000000000..ad006a765 --- /dev/null +++ b/mistral/utils/rpc_utils.py @@ -0,0 +1,34 @@ + +# Copyright 2015 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg + + +CONF = cfg.CONF + + +def get_rabbit_info_from_oslo(additional_conf): + return { + 'user_id': CONF.oslo_messaging_rabbit.rabbit_userid, + 'password': CONF.oslo_messaging_rabbit.rabbit_password, + 'exchange': CONF.control_exchange, + 'topic': additional_conf.topic, + 'server_id': additional_conf.host, + 'host': CONF.oslo_messaging_rabbit.rabbit_host, + 'port': CONF.oslo_messaging_rabbit.rabbit_port, + 'virtual_host': CONF.oslo_messaging_rabbit.rabbit_virtual_host, + 'durable_queues': CONF.oslo_messaging_rabbit.amqp_durable_queues, + 'auto_delete': CONF.oslo_messaging_rabbit.amqp_auto_delete + }