Merge "Refactor the local engine to use an in process executor"
This commit is contained in:
commit
697f67d7e7
16
README.rst
16
README.rst
@ -17,24 +17,26 @@ This will install necessary virtual environments and run all the project tests.
|
||||
### Running Mistral API server
|
||||
To run Mistral API server perform the following command in a shell:
|
||||
|
||||
*tox -evenv -- python mistral/cmd/api.py --config-file path_to_config*
|
||||
*tox -evenv -- python mistral/cmd/launch.py --server api --config-file path_to_config*
|
||||
|
||||
Note that an example configuration file can be found in etc/mistral.conf.example.
|
||||
|
||||
### Running Mistral Task Executors
|
||||
To run Mistral Task Executor instance perform the following command in a shell::
|
||||
To run Mistral Task Executor instance perform the following command in a shell:
|
||||
|
||||
*tox -evenv -- python mistral/cmd/task_executor.py --config-file path_to_config*
|
||||
*tox -evenv -- python mistral/cmd/launch.py --server executor --config-file path_to_config*
|
||||
|
||||
Note that at least one Executor instance should be running so that workflow tasks are processed by Mistral.
|
||||
|
||||
### Debugging
|
||||
To debug the engine, create etc/mistral.conf with the settings::
|
||||
[engine]
|
||||
engine = mistral.engine.local.engine
|
||||
To debug using a local engine and executor without dependencies such as RabbitMQ, create etc/mistral.conf with the following settings::
|
||||
|
||||
[DEFAULT]
|
||||
rpc_backend = fake
|
||||
|
||||
[pecan]
|
||||
auth_enable = False
|
||||
|
||||
and run in pdb, PyDev or PyCharm::
|
||||
|
||||
mistral/cmd/api --config-file etc/mistral.conf --use-debugger
|
||||
mistral/cmd/launch.py --server all --config-file etc/mistral.conf --use-debugger
|
||||
|
@ -14,6 +14,12 @@ default_log_levels = mistral=INFO,mistral.cmd.api=INFO,mistral.api=DEBUG,wsme=DE
|
||||
# Uncomment this option to get more fine-grained control over logging configuration
|
||||
#log_config_append = etc/logging.conf
|
||||
|
||||
# Options for oslo.messaging
|
||||
#rpc_backend=rabbit
|
||||
|
||||
# Specifies which mistral server to start by the launch script. (string value)
|
||||
#server=all
|
||||
|
||||
[api]
|
||||
# Host and port to bind the API server to
|
||||
host = 0.0.0.0
|
||||
@ -46,4 +52,15 @@ auth_port=5000
|
||||
admin_user=admin
|
||||
admin_password=password
|
||||
auth_protocol=http
|
||||
admin_tenant_name=admin
|
||||
admin_tenant_name=admin
|
||||
|
||||
[executor]
|
||||
# Name of the executor node. This can be an opaque identifier.
|
||||
# It is not necessarily a hostname, FQDN, or IP address. (string value)
|
||||
#host=0.0.0.0
|
||||
|
||||
# The message topic that the executor listens on. (string value)
|
||||
#topic=executor
|
||||
|
||||
# The version of the executor. (string value)
|
||||
#version=1.0
|
||||
|
@ -41,14 +41,14 @@ def get_pecan_config():
|
||||
return pecan.configuration.conf_from_dict(cfg_dict)
|
||||
|
||||
|
||||
def setup_app(config=None):
|
||||
def setup_app(config=None, transport=None):
|
||||
if not config:
|
||||
config = get_pecan_config()
|
||||
|
||||
app_conf = dict(config.app)
|
||||
|
||||
db_api.setup_db()
|
||||
engine.load_engine()
|
||||
engine.load_engine(transport)
|
||||
##TODO(akuznetsov) move this to trigger scheduling to separate process
|
||||
periodic.setup()
|
||||
|
||||
|
@ -1,62 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright 2013 - Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Script to start Mistral API service."""
|
||||
|
||||
import eventlet
|
||||
|
||||
import os
|
||||
import sys
|
||||
from wsgiref import simple_server
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from mistral.api import app
|
||||
from mistral import config
|
||||
from mistral.openstack.common import log as logging
|
||||
|
||||
eventlet.monkey_patch(
|
||||
os=True,
|
||||
select=True,
|
||||
socket=True,
|
||||
thread=False if '--use-debugger' in sys.argv else True,
|
||||
time=True)
|
||||
|
||||
LOG = logging.getLogger('mistral.cmd.api')
|
||||
|
||||
|
||||
def main():
|
||||
try:
|
||||
config.parse_args()
|
||||
logging.setup('Mistral')
|
||||
|
||||
host = cfg.CONF.api.host
|
||||
port = cfg.CONF.api.port
|
||||
|
||||
server = simple_server.make_server(host, port, app.setup_app())
|
||||
|
||||
LOG.info("Mistral API is serving on http://%s:%s (PID=%s)" %
|
||||
(host, port, os.getpid()))
|
||||
|
||||
server.serve_forever()
|
||||
except RuntimeError, e:
|
||||
sys.stderr.write("ERROR: %s\n" % e)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -1,8 +1,6 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright 2013 - Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
@ -15,14 +13,17 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Script to start instance of Task Executor."""
|
||||
|
||||
import sys
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
|
||||
eventlet.monkey_patch(
|
||||
os=True,
|
||||
select=True,
|
||||
socket=True,
|
||||
thread=False if '--use-debugger' in sys.argv else True,
|
||||
time=True)
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
# If ../mistral/__init__.py exists, add ../ to Python search path, so that
|
||||
# it will override what happens to be installed in /usr/(local/)lib/python...
|
||||
@ -32,17 +33,51 @@ POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
|
||||
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'mistral', '__init__.py')):
|
||||
sys.path.insert(0, POSSIBLE_TOPDIR)
|
||||
|
||||
|
||||
from oslo import messaging
|
||||
from oslo.config import cfg
|
||||
|
||||
from mistral import config
|
||||
from mistral.engine import engine
|
||||
from mistral.engine.scalable.executor import server
|
||||
|
||||
from mistral.api import app
|
||||
from wsgiref import simple_server
|
||||
|
||||
from mistral.openstack.common import log as logging
|
||||
|
||||
|
||||
LOG = logging.getLogger('mistral.cmd.task_executor')
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def launch_executor(transport):
|
||||
# TODO(rakhmerov): This is a temporary hack.
|
||||
# We have to initialize engine in executor process because
|
||||
# executor now calls engine.convey_task_result() directly.
|
||||
engine.load_engine(transport)
|
||||
target = messaging.Target(topic=cfg.CONF.executor.topic,
|
||||
server=cfg.CONF.executor.host)
|
||||
endpoints = [server.Executor()]
|
||||
ex_server = messaging.get_rpc_server(transport, target, endpoints)
|
||||
ex_server.start()
|
||||
ex_server.wait()
|
||||
|
||||
|
||||
def launch_api(transport):
|
||||
host = cfg.CONF.api.host
|
||||
port = cfg.CONF.api.port
|
||||
server = simple_server.make_server(host, port,
|
||||
app.setup_app(transport=transport))
|
||||
LOG.info("Mistral API is serving on http://%s:%s (PID=%s)" %
|
||||
(host, port, os.getpid()))
|
||||
server.serve_forever()
|
||||
|
||||
|
||||
def launch_all(transport):
|
||||
# Launch the servers on different threads.
|
||||
t1 = eventlet.spawn(launch_executor, transport)
|
||||
t2 = eventlet.spawn(launch_api, transport)
|
||||
t1.wait()
|
||||
t2.wait()
|
||||
|
||||
|
||||
def main():
|
||||
@ -50,10 +85,13 @@ def main():
|
||||
config.parse_args()
|
||||
logging.setup('Mistral')
|
||||
|
||||
# TODO(rakhmerov): This is a temporary hack.
|
||||
# We have to initialize engine in executor process because
|
||||
# executor now calls engine.convey_task_result() directly.
|
||||
engine.load_engine()
|
||||
# Map cli options to appropriate functions. The cli options are
|
||||
# registered in mistral's config.py.
|
||||
launch_options = {
|
||||
'all': launch_all,
|
||||
'api': launch_api,
|
||||
'executor': launch_executor
|
||||
}
|
||||
|
||||
# Please refer to the oslo.messaging documentation for transport
|
||||
# configuration. The default transport for oslo.messaging is rabbitMQ.
|
||||
@ -68,13 +106,10 @@ def main():
|
||||
# that can be specified depending on the driver. Please refer to the
|
||||
# driver implementation for those additional options.
|
||||
transport = messaging.get_transport(cfg.CONF)
|
||||
target = messaging.Target(topic=cfg.CONF.executor.topic,
|
||||
server=cfg.CONF.executor.host)
|
||||
endpoints = [server.Executor()]
|
||||
|
||||
ex_server = messaging.get_rpc_server(transport, target, endpoints)
|
||||
ex_server.start()
|
||||
ex_server.wait()
|
||||
# Launch server(s).
|
||||
launch_options[cfg.CONF.server](transport)
|
||||
|
||||
except RuntimeError, e:
|
||||
sys.stderr.write("ERROR: %s\n" % e)
|
||||
sys.exit(1)
|
@ -78,6 +78,13 @@ executor_opts = [
|
||||
help='The version of the executor.')
|
||||
]
|
||||
|
||||
launch_opt = cfg.StrOpt(
|
||||
'server',
|
||||
default='all',
|
||||
choices=('all', 'api', 'executor'),
|
||||
help='Specifies which mistral server to start by the launch script.'
|
||||
)
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
CONF.register_opts(api_opts, group='api')
|
||||
@ -89,6 +96,7 @@ CONF.register_opts(rabbit_opts, group='rabbit')
|
||||
CONF.register_opts(executor_opts, group='executor')
|
||||
|
||||
CONF.register_cli_opt(use_debugger)
|
||||
CONF.register_cli_opt(launch_opt)
|
||||
|
||||
CONF.import_opt('verbose', 'mistral.openstack.common.log')
|
||||
CONF.import_opt('debug', 'mistral.openstack.common.log')
|
||||
|
@ -31,6 +31,8 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AbstractEngine(object):
|
||||
transport = None
|
||||
|
||||
@classmethod
|
||||
@abc.abstractmethod
|
||||
def _run_tasks(cls, tasks):
|
||||
|
@ -25,11 +25,12 @@ from oslo.config import cfg
|
||||
_engine = None
|
||||
|
||||
|
||||
def load_engine():
|
||||
def load_engine(transport):
|
||||
global _engine
|
||||
module_name = cfg.CONF.engine.engine
|
||||
module = importutils.import_module(module_name)
|
||||
_engine = module.get_engine()
|
||||
_engine.transport = transport
|
||||
|
||||
|
||||
def start_workflow_execution(workbook_name, task_name, context=None):
|
||||
|
@ -28,8 +28,9 @@ class ScalableEngine(abs_eng.AbstractEngine):
|
||||
@classmethod
|
||||
def _notify_task_executors(cls, tasks):
|
||||
# TODO(m4dcoder): Use a pool for transport and client
|
||||
transport = messaging.get_transport(cfg.CONF)
|
||||
ex_client = client.ExecutorClient(transport)
|
||||
if not cls.transport:
|
||||
cls.transport = messaging.get_transport(cfg.CONF)
|
||||
ex_client = client.ExecutorClient(cls.transport)
|
||||
for task in tasks:
|
||||
# TODO(m4dcoder): Fill request context argument with auth info
|
||||
context = {}
|
||||
|
@ -23,6 +23,11 @@ from mistral import version
|
||||
from mistral.db.sqlalchemy import api as db_api
|
||||
from mistral.openstack.common.db.sqlalchemy import session
|
||||
|
||||
from stevedore import driver
|
||||
from oslo.config import cfg
|
||||
from oslo import messaging
|
||||
from oslo.messaging import transport
|
||||
|
||||
|
||||
RESOURCES_PATH = 'tests/resources/'
|
||||
|
||||
@ -33,6 +38,23 @@ def get_resource(resource_name):
|
||||
RESOURCES_PATH + resource_name)).read()
|
||||
|
||||
|
||||
def get_fake_transport():
|
||||
# Get transport here to let oslo.messaging setup default config
|
||||
# before changing the rpc_backend to the fake driver; otherwise,
|
||||
# oslo.messaging will throw exception.
|
||||
messaging.get_transport(cfg.CONF)
|
||||
cfg.CONF.set_default('rpc_backend', 'fake')
|
||||
url = transport.TransportURL.parse(cfg.CONF, None, None)
|
||||
kwargs = dict(default_exchange=cfg.CONF.control_exchange,
|
||||
allowed_remote_exmods=[])
|
||||
mgr = driver.DriverManager('oslo.messaging.drivers',
|
||||
url.transport,
|
||||
invoke_on_load=True,
|
||||
invoke_args=[cfg.CONF, url],
|
||||
invoke_kwds=kwargs)
|
||||
return transport.Transport(mgr.driver)
|
||||
|
||||
|
||||
class BaseTest(unittest2.TestCase):
|
||||
def setUp(self):
|
||||
super(BaseTest, self).setUp()
|
||||
|
@ -20,15 +20,12 @@ import uuid
|
||||
import time
|
||||
import mock
|
||||
|
||||
from oslo import messaging
|
||||
from oslo.config import cfg
|
||||
|
||||
from mistral.tests import base
|
||||
from mistral.cmd import launch
|
||||
from mistral.engine import states
|
||||
from mistral.db import api as db_api
|
||||
from mistral.engine.actions import actions
|
||||
from mistral.engine.actions import action_types
|
||||
from mistral.engine.scalable.executor import server
|
||||
from mistral.engine.scalable.executor import client
|
||||
|
||||
|
||||
@ -86,51 +83,20 @@ SAMPLE_CONTEXT = {
|
||||
|
||||
class TestExecutor(base.DbTestCase):
|
||||
|
||||
def get_transport(self):
|
||||
# Get transport manually, oslo.messaging get_transport seems broken.
|
||||
from stevedore import driver
|
||||
from oslo.messaging import transport
|
||||
# Get transport here to let oslo.messaging setup default config before
|
||||
# changing the rpc_backend to the fake driver; otherwise,
|
||||
# oslo.messaging will throw exception.
|
||||
messaging.get_transport(cfg.CONF)
|
||||
cfg.CONF.set_default('rpc_backend', 'fake')
|
||||
url = transport.TransportURL.parse(cfg.CONF, None, None)
|
||||
kwargs = dict(default_exchange=cfg.CONF.control_exchange,
|
||||
allowed_remote_exmods=[])
|
||||
mgr = driver.DriverManager('oslo.messaging.drivers',
|
||||
url.transport,
|
||||
invoke_on_load=True,
|
||||
invoke_args=[cfg.CONF, url],
|
||||
invoke_kwds=kwargs)
|
||||
return transport.Transport(mgr.driver)
|
||||
|
||||
def mock_action_run(self):
|
||||
actions.RestAction.run = mock.MagicMock(return_value={})
|
||||
return actions.RestAction.run
|
||||
|
||||
def setUp(self):
|
||||
# Initialize configuration for the ExecutorClient.
|
||||
super(TestExecutor, self).setUp()
|
||||
if not 'executor' in cfg.CONF:
|
||||
cfg_grp = cfg.OptGroup(name='executor', title='Executor options')
|
||||
opts = [cfg.StrOpt('host', default='0.0.0.0'),
|
||||
cfg.StrOpt('topic', default='executor')]
|
||||
cfg.CONF.register_group(cfg_grp)
|
||||
cfg.CONF.register_opts(opts, group=cfg_grp)
|
||||
|
||||
# Start the Executor.
|
||||
transport = self.get_transport()
|
||||
target = messaging.Target(topic='executor', server='0.0.0.0')
|
||||
endpoints = [server.Executor()]
|
||||
self.server = messaging.get_rpc_server(transport, target,
|
||||
endpoints, executor='eventlet')
|
||||
self.server.start()
|
||||
# Run the Executor in the background.
|
||||
self.transport = base.get_fake_transport()
|
||||
self.ex_thread = eventlet.spawn(launch.launch_executor, self.transport)
|
||||
|
||||
def tearDown(self):
|
||||
# Stop the Executor.
|
||||
if self.server:
|
||||
self.server.stop()
|
||||
self.ex_thread.kill()
|
||||
|
||||
super(TestExecutor, self).tearDown()
|
||||
|
||||
@ -156,8 +122,7 @@ class TestExecutor(base.DbTestCase):
|
||||
self.assertIn('id', task)
|
||||
|
||||
# Send the task request to the Executor.
|
||||
transport = self.server.transport
|
||||
ex_client = client.ExecutorClient(transport)
|
||||
ex_client = client.ExecutorClient(self.transport)
|
||||
ex_client.handle_task(SAMPLE_CONTEXT, task=task)
|
||||
|
||||
# Check task execution state. There is no timeout mechanism in
|
||||
|
Loading…
x
Reference in New Issue
Block a user