Add coordination feature to mistral service

Add mistrai-api, mistral-engine, mistral-executor services to their
service group respectively when they start up, so that we can retrieve
their information through service api which will be implemented in
subsequent patch.

Change-Id: Ibfcc17b35b9d2e408888b7d1cb64e63f2b32a90d
Partially-Implements: blueprint mistral-service-api
This commit is contained in:
LingxianKong 2015-06-17 11:19:28 +08:00 committed by LingxianKong
parent 223dee29e4
commit 021824341b
7 changed files with 107 additions and 5 deletions

View File

@ -19,6 +19,7 @@ import pecan
from mistral.api import access_control from mistral.api import access_control
from mistral import context as ctx from mistral import context as ctx
from mistral import coordination
from mistral.db.v2 import api as db_api_v2 from mistral.db.v2 import api as db_api_v2
from mistral.services import periodic from mistral.services import periodic
@ -49,6 +50,8 @@ def setup_app(config=None):
periodic.setup() periodic.setup()
coordination.Service('api_group').register_membership()
app = pecan.make_app( app = pecan.make_app(
app_conf.pop('root'), app_conf.pop('root'),
hooks=lambda: [ctx.ContextHook(), ctx.AuthHook()], hooks=lambda: [ctx.ContextHook(), ctx.AuthHook()],

View File

@ -51,6 +51,7 @@ from mistral import version
CONF = cfg.CONF CONF = cfg.CONF
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -72,6 +73,8 @@ def launch_executor(transport):
serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer()) serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer())
) )
executor_v2.register_membership()
server.start() server.start()
server.wait() server.wait()
@ -98,6 +101,8 @@ def launch_engine(transport):
serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer()) serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer())
) )
engine_v2.register_membership()
server.start() server.start()
server.wait() server.wait()

View File

@ -14,8 +14,10 @@
import six import six
from oslo_concurrency import lockutils
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
from oslo_service import threadgroup
from retrying import retry from retrying import retry
import tooz.coordination import tooz.coordination
@ -24,6 +26,8 @@ from mistral import utils
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
_SERVICE_COORDINATOR = None
class ServiceCoordinator(object): class ServiceCoordinator(object):
"""Service coordinator. """Service coordinator.
@ -143,3 +147,56 @@ class ServiceCoordinator(object):
LOG.warning('Group %s does not exist.', group_id) LOG.warning('Group %s does not exist.', group_id)
return [] return []
def cleanup_service_coordinator():
"""Intends to be used by tests to recreate service coordinator."""
global _SERVICE_COORDINATOR
_SERVICE_COORDINATOR = None
def get_service_coordinator(my_id=None):
global _SERVICE_COORDINATOR
if not _SERVICE_COORDINATOR:
_SERVICE_COORDINATOR = ServiceCoordinator(my_id=my_id)
_SERVICE_COORDINATOR.start()
return _SERVICE_COORDINATOR
class Service(object):
def __init__(self, group_type):
self.group_type = group_type
self._tg = None
@lockutils.synchronized('service_coordinator')
def register_membership(self):
"""Registers group membership.
Because this method will be invoked on each service startup almost at
the same time, so it must be synchronized, in case all the services
are started within same process.
"""
service_coordinator = get_service_coordinator()
if service_coordinator.is_active():
service_coordinator.join_group(self.group_type)
self._tg = threadgroup.ThreadGroup()
self._tg.add_timer(
cfg.CONF.coordination.heartbeat_interval,
service_coordinator.heartbeat
)
def stop(self):
service_coordinator = get_service_coordinator()
if service_coordinator.is_active():
self._tg.stop()
service_coordinator.stop()

View File

@ -18,6 +18,7 @@ import traceback
from oslo_log import log as logging from oslo_log import log as logging
from mistral import coordination
from mistral.db.v2 import api as db_api from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models as db_models from mistral.db.v2.sqlalchemy import models as db_models
from mistral.engine import action_handler from mistral.engine import action_handler
@ -43,10 +44,12 @@ LOG = logging.getLogger(__name__)
# the submodules are referenced. # the submodules are referenced.
class DefaultEngine(base.Engine): class DefaultEngine(base.Engine, coordination.Service):
def __init__(self, engine_client): def __init__(self, engine_client):
self._engine_client = engine_client self._engine_client = engine_client
coordination.Service.__init__(self, 'engine_group')
@u.log_exec(LOG) @u.log_exec(LOG)
def start_workflow(self, wf_name, wf_input, description='', **params): def start_workflow(self, wf_name, wf_input, description='', **params):
wf_exec_id = None wf_exec_id = None

View File

@ -13,9 +13,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from oslo_log import log as logging from oslo_log import log as logging
from mistral.actions import action_factory as a_f from mistral.actions import action_factory as a_f
from mistral import coordination
from mistral.engine import base from mistral.engine import base
from mistral import exceptions as exc from mistral import exceptions as exc
from mistral.utils import inspect_utils as i_u from mistral.utils import inspect_utils as i_u
@ -25,10 +27,12 @@ from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class DefaultExecutor(base.Executor): class DefaultExecutor(base.Executor, coordination.Service):
def __init__(self, engine_client): def __init__(self, engine_client):
self._engine_client = engine_client self._engine_client = engine_client
coordination.Service.__init__(self, 'executor_group')
def run_action(self, action_ex_id, action_class_str, attributes, def run_action(self, action_ex_id, action_class_str, attributes,
action_params): action_params):
"""Runs action. """Runs action.

View File

@ -12,15 +12,16 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import mock
from oslo_config import cfg from oslo_config import cfg
from mistral import coordination
from mistral.tests import base from mistral.tests import base
from mistral.utils import coordination
class CoordinationTest(base.BaseTest): class ServiceCoordinatorTest(base.BaseTest):
def setUp(self): def setUp(self):
super(CoordinationTest, self).setUp() super(ServiceCoordinatorTest, self).setUp()
def test_start(self): def test_start(self):
cfg.CONF.set_default( cfg.CONF.set_default(
@ -116,3 +117,31 @@ class CoordinationTest(base.BaseTest):
self.assertEqual(0, len(members_after)) self.assertEqual(0, len(members_after))
self.assertEqual(set([]), members_after) self.assertEqual(set([]), members_after)
class ServiceTest(base.BaseTest):
def setUp(self):
super(ServiceTest, self).setUp()
# Re-intialize the global service coordinator object, in order to use
# new coordination configuration.
coordination.cleanup_service_coordinator()
@mock.patch('mistral.utils.get_process_identifier', return_value='fake_id')
def test_register_membership(self, mock_get_identifier):
cfg.CONF.set_default('backend_url', 'zake://', 'coordination')
srv = coordination.Service('fake_group')
srv.register_membership()
self.addCleanup(srv.stop)
srv_coordinator = coordination.get_service_coordinator()
self.assertIsNotNone(srv_coordinator)
self.assertTrue(srv_coordinator.is_active())
members = srv_coordinator.get_members('fake_group')
mock_get_identifier.assert_called_once_with()
self.assertEqual(set(['fake_id']), members)

View File

@ -12,6 +12,7 @@ keystonemiddleware>=1.5.0
kombu>=3.0.7 kombu>=3.0.7
mock>=1.0 mock>=1.0
networkx>=1.8 networkx>=1.8
oslo.concurrency>=2.3.0 # Apache-2.0
oslo.config>=1.11.0 # Apache-2.0 oslo.config>=1.11.0 # Apache-2.0
oslo.db>=1.10.0 # Apache-2.0 oslo.db>=1.10.0 # Apache-2.0
oslo.messaging!=1.12.0,>=1.8.0 # Apache-2.0 oslo.messaging!=1.12.0,>=1.8.0 # Apache-2.0