Add coordination support for devstack

This way we can test the service api later on gate

In order to use etcd in gate few changes were made:
* All identifiers must be byte type (group type, member id)
* Tooz has a built-in mechanizm for heartbeat no need to implement it
* Need to use eventlet monkey patch since etcd client uses blocking
  methods
* Services name must be identical to LAUNCH_OPTIONS used in cli
* Gate coordination url should be define with a schema of etcd+http
  which is the etcd gateway and works better then just etcd

Change-Id: I772651e33eada4a5c2149bfa867095c277eddeed
This commit is contained in:
Eyal 2019-12-24 13:40:37 +02:00
parent 541aabbfe8
commit d838607b2f
4 changed files with 16 additions and 35 deletions

View File

@ -105,6 +105,12 @@ function configure_mistral {
if [ "$MISTRAL_USE_MOD_WSGI" == "True" ]; then if [ "$MISTRAL_USE_MOD_WSGI" == "True" ]; then
_config_mistral_apache_wsgi _config_mistral_apache_wsgi
fi fi
if [[ ! -z "$MISTRAL_COORDINATION_URL" ]]; then
iniset $MISTRAL_CONF_FILE coordination backend_url "$MISTRAL_COORDINATION_URL"
elif is_service_enabled etcd3; then
iniset $MISTRAL_CONF_FILE coordination backend_url "etcd3+http://${SERVICE_HOST}:$ETCD_PORT"
fi
} }

View File

@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import eventlet
eventlet.monkey_patch()
from oslo_config import cfg from oslo_config import cfg
import oslo_middleware.cors as cors_middleware import oslo_middleware.cors as cors_middleware
import oslo_middleware.http_proxy_to_wsgi as http_proxy_to_wsgi_middleware import oslo_middleware.http_proxy_to_wsgi as http_proxy_to_wsgi_middleware

View File

@ -35,7 +35,7 @@ class EventEngineServer(service_base.MistralService):
""" """
def __init__(self, event_engine): def __init__(self, event_engine):
super(EventEngineServer, self).__init__('event_engine_group') super(EventEngineServer, self).__init__('event-engine_group')
self._event_engine = event_engine self._event_engine = event_engine
self._rpc_server = None self._rpc_server = None

View File

@ -17,7 +17,6 @@ import six
from oslo_concurrency import lockutils from oslo_concurrency import lockutils
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
from oslo_service import threadgroup
import tenacity import tenacity
import tooz.coordination import tooz.coordination
@ -40,7 +39,7 @@ class ServiceCoordinator(object):
def __init__(self, my_id=None): def __init__(self, my_id=None):
self._coordinator = None self._coordinator = None
self._my_id = my_id or utils.get_process_identifier() self._my_id = six.b(my_id or utils.get_process_identifier())
self._started = False self._started = False
def start(self): def start(self):
@ -53,7 +52,7 @@ class ServiceCoordinator(object):
self._my_id self._my_id
) )
self._coordinator.start() self._coordinator.start(start_heart=True)
self._started = True self._started = True
LOG.info('Coordination backend started successfully.') LOG.info('Coordination backend started successfully.')
@ -78,30 +77,13 @@ class ServiceCoordinator(object):
def is_active(self): def is_active(self):
return self._coordinator and self._started return self._coordinator and self._started
def heartbeat(self):
if not self.is_active():
# Re-connect.
self.start()
if not self.is_active():
LOG.debug("Coordination backend didn't start.")
return
try:
self._coordinator.heartbeat()
except tooz.coordination.ToozError as e:
LOG.exception('Error sending a heartbeat to coordination '
'backend. %s', six.text_type(e))
self._started = False
@tenacity.retry(stop=tenacity.stop_after_attempt(5)) @tenacity.retry(stop=tenacity.stop_after_attempt(5))
def join_group(self, group_id): def join_group(self, group_id):
if not self.is_active() or not group_id: if not self.is_active() or not group_id:
return return
try: try:
join_req = self._coordinator.join_group(group_id) join_req = self._coordinator.join_group(six.b(group_id))
join_req.get() join_req.get()
LOG.info( LOG.info(
@ -114,7 +96,7 @@ class ServiceCoordinator(object):
except tooz.coordination.MemberAlreadyExist: except tooz.coordination.MemberAlreadyExist:
return return
except tooz.coordination.GroupNotCreated as e: except tooz.coordination.GroupNotCreated as e:
create_grp_req = self._coordinator.create_group(group_id) create_grp_req = self._coordinator.create_group(six.b(group_id))
try: try:
create_grp_req.get() create_grp_req.get()
@ -126,7 +108,7 @@ class ServiceCoordinator(object):
def leave_group(self, group_id): def leave_group(self, group_id):
if self.is_active(): if self.is_active():
self._coordinator.leave_group(group_id) self._coordinator.leave_group(six.b(group_id))
LOG.info( LOG.info(
'Left service group:%s, member:%s', 'Left service group:%s, member:%s',
@ -143,7 +125,7 @@ class ServiceCoordinator(object):
if not self.is_active(): if not self.is_active():
return [] return []
get_members_req = self._coordinator.get_members(group_id) get_members_req = self._coordinator.get_members(six.b(group_id))
try: try:
members = get_members_req.get() members = get_members_req.get()
@ -178,7 +160,6 @@ def get_service_coordinator(my_id=None):
class Service(object): class Service(object):
def __init__(self, group_type): def __init__(self, group_type):
self.group_type = group_type self.group_type = group_type
self._tg = None
@lockutils.synchronized('service_coordinator') @lockutils.synchronized('service_coordinator')
def register_membership(self): def register_membership(self):
@ -194,17 +175,8 @@ class Service(object):
if service_coordinator.is_active(): if service_coordinator.is_active():
service_coordinator.join_group(self.group_type) service_coordinator.join_group(self.group_type)
self._tg = threadgroup.ThreadGroup()
self._tg.add_timer(
cfg.CONF.coordination.heartbeat_interval,
service_coordinator.heartbeat
)
def stop(self): def stop(self):
service_coordinator = get_service_coordinator() service_coordinator = get_service_coordinator()
if service_coordinator.is_active(): if service_coordinator.is_active():
self._tg.stop()
service_coordinator.stop() service_coordinator.stop()