From 4960da949682186ba9a2e2b799b0b4efe4277048 Mon Sep 17 00:00:00 2001 From: LingxianKong Date: Wed, 17 Jun 2015 11:12:59 +0800 Subject: [PATCH] Add coordination util for service management Tooz is a Python library that provides a coordination API. Its primary goal is to handle groups and membership of these groups in distributed systems. This patch adds coordination util to Mistral, which makes use of Tooz libary. Change-Id: Icbf3086d01649af813727f0972d6d5b0631d6afb Partially-Implements: blueprint mistral-service-api --- mistral/config.py | 12 ++ mistral/exceptions.py | 4 + mistral/tests/unit/utils/test_coordination.py | 118 ++++++++++++++ mistral/utils/__init__.py | 7 + mistral/utils/coordination.py | 145 ++++++++++++++++++ requirements.txt | 2 + 6 files changed, 288 insertions(+) create mode 100644 mistral/tests/unit/utils/test_coordination.py create mode 100644 mistral/utils/coordination.py diff --git a/mistral/config.py b/mistral/config.py index b18904fe6..aa69b950e 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -96,18 +96,29 @@ wf_trace_log_name_opt = cfg.StrOpt( 'workflow trace output.' ) +coordination_opts = [ + cfg.StrOpt('backend_url', + default=None, + help='The backend URL to be used for coordination'), + cfg.FloatOpt('heartbeat_interval', + default=5.0, + help='Number of seconds between heartbeats for coordination.') +] + CONF = cfg.CONF API_GROUP = 'api' ENGINE_GROUP = 'engine' EXECUTOR_GROUP = 'executor' PECAN_GROUP = 'pecan' +COORDINATION_GROUP = 'coordination' CONF.register_opts(api_opts, group=API_GROUP) CONF.register_opts(engine_opts, group=ENGINE_GROUP) CONF.register_opts(pecan_opts, group=PECAN_GROUP) CONF.register_opts(executor_opts, group=EXECUTOR_GROUP) CONF.register_opt(wf_trace_log_name_opt) +CONF.register_opts(coordination_opts, group=COORDINATION_GROUP) CLI_OPTS = [ use_debugger, @@ -134,6 +145,7 @@ def list_opts(): (ENGINE_GROUP, engine_opts), (EXECUTOR_GROUP, executor_opts), (PECAN_GROUP, pecan_opts), + (COORDINATION_GROUP, coordination_opts), (None, itertools.chain( CLI_OPTS, [wf_trace_log_name_opt] diff --git a/mistral/exceptions.py b/mistral/exceptions.py index fdbcab172..f2c583b49 100644 --- a/mistral/exceptions.py +++ b/mistral/exceptions.py @@ -119,3 +119,7 @@ class SizeLimitExceededException(MistralException): super(SizeLimitExceededException, self).__init__( "Size of '%s' is %dKB which exceeds the limit of %dKB" % (field_name, size_kb, size_limit_kb)) + + +class CoordinationException(MistralException): + http_code = 500 diff --git a/mistral/tests/unit/utils/test_coordination.py b/mistral/tests/unit/utils/test_coordination.py new file mode 100644 index 000000000..e11edcc76 --- /dev/null +++ b/mistral/tests/unit/utils/test_coordination.py @@ -0,0 +1,118 @@ +# Copyright 2015 Huawei Technologies Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg + +from mistral.tests import base +from mistral.utils import coordination + + +class CoordinationTest(base.BaseTest): + def setUp(self): + super(CoordinationTest, self).setUp() + + def test_start(self): + cfg.CONF.set_default( + 'backend_url', + 'zake://', + 'coordination' + ) + + coordinator = coordination.ServiceCoordinator('fake_id') + coordinator.start() + + self.assertTrue(coordinator.is_active()) + + def test_start_without_backend(self): + cfg.CONF.set_default('backend_url', None, 'coordination') + + coordinator = coordination.ServiceCoordinator() + coordinator.start() + + self.assertFalse(coordinator.is_active()) + + def test_stop_not_active(self): + cfg.CONF.set_default('backend_url', None, 'coordination') + + coordinator = coordination.ServiceCoordinator() + coordinator.start() + + coordinator.stop() + + self.assertFalse(coordinator.is_active()) + + def test_stop(self): + cfg.CONF.set_default( + 'backend_url', + 'zake://', + 'coordination' + ) + + coordinator = coordination.ServiceCoordinator() + coordinator.start() + + coordinator.stop() + + self.assertFalse(coordinator.is_active()) + + def test_join_group_not_active(self): + cfg.CONF.set_default('backend_url', None, 'coordination') + + coordinator = coordination.ServiceCoordinator() + coordinator.start() + + coordinator.join_group('fake_group') + members = coordinator.get_members('fake_group') + + self.assertFalse(coordinator.is_active()) + + self.assertEqual(0, len(members)) + + def test_join_group_and_get_members(self): + cfg.CONF.set_default( + 'backend_url', + 'zake://', + 'coordination' + ) + + coordinator = coordination.ServiceCoordinator(my_id='fake_id') + coordinator.start() + + coordinator.join_group('fake_group') + members = coordinator.get_members('fake_group') + + self.assertEqual(1, len(members)) + self.assertItemsEqual(('fake_id',), members) + + def test_join_group_and_leave_group(self): + cfg.CONF.set_default( + 'backend_url', + 'zake://', + 'coordination' + ) + + coordinator = coordination.ServiceCoordinator(my_id='fake_id') + coordinator.start() + + coordinator.join_group('fake_group') + members_before = coordinator.get_members('fake_group') + + coordinator.leave_group('fake_group') + members_after = coordinator.get_members('fake_group') + + self.assertEqual(1, len(members_before)) + self.assertEqual(set(['fake_id']), members_before) + + self.assertEqual(0, len(members_after)) + self.assertEqual(set([]), members_after) diff --git a/mistral/utils/__init__.py b/mistral/utils/__init__.py index 454a816b3..fb2f3a14a 100644 --- a/mistral/utils/__init__.py +++ b/mistral/utils/__init__.py @@ -19,6 +19,7 @@ import json import logging import os from os import path +import socket import threading import uuid @@ -241,3 +242,9 @@ def get_input_dict(inputs): input_dict[x] = NotDefined return input_dict + + +def get_process_identifier(): + """Gets current running process identifier.""" + + return "%s_%s" % (socket.gethostname(), os.getpid()) diff --git a/mistral/utils/coordination.py b/mistral/utils/coordination.py new file mode 100644 index 000000000..c2ade8007 --- /dev/null +++ b/mistral/utils/coordination.py @@ -0,0 +1,145 @@ +# Copyright 2015 Huawei Technologies Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import six + +from oslo_config import cfg +from oslo_log import log +from retrying import retry +import tooz.coordination + +from mistral import utils + + +LOG = log.getLogger(__name__) + + +class ServiceCoordinator(object): + """Service coordinator. + + This class uses the `tooz` library to manage group membership. + + To ensure that the other agents know this agent is still alive, + the `heartbeat` method should be called periodically. + """ + + def __init__(self, my_id=None): + self._coordinator = None + self._my_id = my_id or utils.get_process_identifier() + self._started = False + + def start(self): + backend_url = cfg.CONF.coordination.backend_url + + if backend_url: + try: + self._coordinator = tooz.coordination.get_coordinator( + backend_url, + self._my_id + ) + + self._coordinator.start() + self._started = True + + LOG.info('Coordination backend started successfully.') + except tooz.coordination.ToozError as e: + self._started = False + + LOG.exception('Error connecting to coordination backend. ' + '%s', six.text_type(e)) + + def stop(self): + if not self.is_active(): + return + + try: + self._coordinator.stop() + except tooz.coordination.ToozError: + LOG.warning('Error connecting to coordination backend.') + finally: + self._coordinator = None + self._started = False + + def is_active(self): + return self._coordinator and self._started + + def heartbeat(self): + if not self.is_active(): + # Re-connect. + self.start() + + if not self.is_active(): + LOG.debug("Coordination backend didn't start.") + return + + try: + self._coordinator.heartbeat() + except tooz.coordination.ToozError as e: + LOG.exception('Error sending a heartbeat to coordination ' + 'backend. %s', six.text_type(e)) + + @retry(stop_max_attempt_number=5) + def join_group(self, group_id): + if not self.is_active() or not group_id: + return + + try: + join_req = self._coordinator.join_group(group_id) + join_req.get() + + LOG.info( + 'Joined service group:%s, member:%s', + group_id, + self._my_id + ) + + return + except tooz.coordination.MemberAlreadyExist: + return + except tooz.coordination.GroupNotCreated as e: + create_grp_req = self._coordinator.create_group(group_id) + + try: + create_grp_req.get() + except tooz.coordination.GroupAlreadyExist: + pass + + # Re-raise exception to join group again. + raise e + + def leave_group(self, group_id): + if self.is_active(): + self._coordinator.leave_group(group_id) + + LOG.info( + 'Left service group:%s, member:%s', + group_id, + self._my_id + ) + + def get_members(self, group_id): + if not self.is_active(): + return [] + + get_members_req = self._coordinator.get_members(group_id) + try: + members = get_members_req.get() + + LOG.debug('Members of group %s: %s', group_id, members) + + return members + except tooz.coordination.GroupNotCreated: + LOG.warning('Group %s does not exist.', group_id) + + return [] diff --git a/requirements.txt b/requirements.txt index 1ed335203..a5bcfe877 100644 --- a/requirements.txt +++ b/requirements.txt @@ -31,8 +31,10 @@ python-neutronclient<3,>=2.3.11 python-novaclient>=2.22.0 PyYAML>=3.1.0 requests>=2.5.2 +retrying>=1.2.3,!=1.3.0 # Apache-2.0 six>=1.9.0 SQLAlchemy<1.1.0,>=0.9.7 stevedore>=1.5.0 # Apache-2.0 WSME>=0.7 yaql>=0.2.7,!=0.3.0 # Apache 2.0 License +tooz>=0.16.0 # Apache-2.0