Files
distcloud/distributedcloud/dcorch/engine/generic_sync_manager.py
Hugo Brito c2c7ab93ef Update endpoint caches post network reconfig
After switching the management network to the admin network
we need to update the services endpoints cache of all
dcmanager, dcorch and cert-mon workers to the new admin value.

The fanout parameter is being added to the cast calls of the
RPC clients (dcorch and audit), this parameter makes the method
cast to all servers listening on a topic rather than just one
of them.

Test Plan:
PASS: dcmanager subcloud update (using admin network parameters)
1. Endpoints for the subcloud updated with admin ip value
2. subcloud availability = online
PASS: Verify that the subcloud is online shortly after succesful
completion of subcloud_update playbook
PASS: Verify that the service endpoints are updated in all workers'
endpoint caches for the subcloud
PASS: Manage the subcloud and verify that both dcmanager and dcorch
audits are working as expected
PASS: Perform a Identity sync:
1. openstack --os-region-name SystemController user create <new_user>
--domain <domain> --project <project> --password <password>
2. Log in into subcloud and verify the new user: openstack user list
PASS: Verify that the master token is refreshed successfully after an hour

Story: 2010319
Task: 47556

Depends-On: https://review.opendev.org/c/starlingx/config/+/877323

Signed-off-by: Hugo Brito <hugo.brito@windriver.com>
Change-Id: I149c864382b7c63d424f736bdb4eaac2a787b709
2023-03-14 11:44:15 -03:00

609 lines
27 KiB
Python

# Copyright 2017 Ericsson AB.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Copyright (c) 2020-2023 Wind River Systems, Inc.
#
import eventlet
import collections # noqa: H306
import random
from keystoneauth1 import exceptions as keystone_exceptions
from oslo_log import log as logging
from oslo_utils import timeutils
from dccommon import consts as dccommon_consts
from dcorch.common import consts as dco_consts
from dcorch.common import context
from dcorch.common import exceptions
from dcorch.db import api as db_api
from dcorch.drivers.openstack import sdk
from dcorch.engine import scheduler
from dcorch.engine import subcloud_lock
from dcorch.engine.sync_services.identity import IdentitySyncThread
from dcorch.engine.sync_services.sysinv import SysinvSyncThread
from dcorch.objects import subcloud
LOG = logging.getLogger(__name__)
CHECK_AUDIT_INTERVAL = 300 # frequency to check for audit work
SYNC_TIMEOUT = 600 # Timeout for subcloud sync
AUDIT_INTERVAL = 1200 # Default audit interval
# sync object endpoint type and subclass mappings
sync_object_class_map = {
dccommon_consts.ENDPOINT_TYPE_PLATFORM: SysinvSyncThread,
dccommon_consts.ENDPOINT_TYPE_IDENTITY: IdentitySyncThread,
dccommon_consts.ENDPOINT_TYPE_IDENTITY_OS: IdentitySyncThread
}
class GenericSyncManager(object):
"""Manages tasks related to resource management."""
def __init__(self, engine_id, *args, **kwargs):
super(GenericSyncManager, self).__init__()
self.context = context.get_admin_context()
self.engine_id = engine_id
# Keeps track of greenthreads we create to do the sync work.
self.thread_group_manager = scheduler.ThreadGroupManager(
thread_pool_size=100)
# Keeps track of greenthreads we create to do the audit work.
self.audit_thread_group_manager = scheduler.ThreadGroupManager(
thread_pool_size=100)
# this needs to map a name to a dictionary
# stores the sync object per region per endpoint type
self.sync_objs = collections.defaultdict(dict)
# Track greenthreads created for each subcloud.
self.subcloud_threads = list()
self.subcloud_audit_threads = list()
def init_from_db(self, context):
subclouds = subcloud.SubcloudList.get_all(context)
for sc in subclouds:
self.create_sync_objects(sc.region_name, sc.capabilities)
LOG.info('Engine id:(%s) create_sync_objects for'
'subcloud:%s.' % (self.engine_id, sc.region_name))
eventlet.sleep(0) # cooperative yield
def create_sync_objects(self, subcloud_name, capabilities):
"""Create sync object objects for the subcloud
The objects handle the syncing of the subcloud's endpoint_types
"""
endpoint_type_list = capabilities.get('endpoint_types', None)
if endpoint_type_list:
self.sync_objs[subcloud_name] = {}
for endpoint_type in endpoint_type_list:
LOG.info("Engine id:(%s) create %s/%s sync obj" %
(self.engine_id, subcloud_name, endpoint_type))
sync_obj = sync_object_class_map[endpoint_type](subcloud_name,
endpoint_type)
self.sync_objs[subcloud_name].update({
endpoint_type: sync_obj})
def sync_job_thread(self, engine_id):
"""Perform sync request for subclouds as required."""
while True:
try:
self.sync_subclouds(engine_id)
eventlet.greenthread.sleep(5)
except eventlet.greenlet.GreenletExit:
# We have been told to exit
return
except Exception as e:
LOG.exception(e)
def sync_audit_thread(self, engine_id):
"""Perform sync request for subclouds as required."""
while True:
try:
self.run_sync_audit(engine_id)
eventlet.greenthread.sleep(CHECK_AUDIT_INTERVAL)
except eventlet.greenlet.GreenletExit:
# We have been told to exit
return
except Exception as e:
LOG.exception(e)
def sync_subclouds(self, engine_id):
# get a list of subclouds that is online, managed and initial_sync is
# completed, than check if subcloud_name in self.sync_objs
# When the subcloud is managed, it will be returned in the list in the
# next cycle. When the subcloud is unmanaged, it will not be included
# in the list in the next cycle
# get the subcloud/endpoint list has sync_request set to requested
#
subclouds = db_api.subcloud_get_all(
self.context,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
initial_sync_state=dco_consts.INITIAL_SYNC_STATE_COMPLETED)
# randomize to reduce likelihood of sync_lock contention
random.shuffle(subclouds)
sc_names = []
for sc in subclouds:
if sc.region_name in self.sync_objs:
sc_names.append(sc.region_name)
for ept in self.sync_objs[sc.region_name].keys():
try:
self.sync_subcloud(self.context, engine_id, sc.region_name,
ept, 'sync')
except exceptions.SubcloudSyncNotFound:
# The endpoint in subcloud_sync has been removed
LOG.info("Engine id:(%s/%s) SubcloudSyncNotFound "
"remove from sync_obj endpoint_type %s" %
(engine_id, sc.region_name, ept))
self.sync_objs[sc.region_name].pop(ept, None)
LOG.debug('Engine id:(%s) Waiting for sync_subclouds %s to complete.'
% (engine_id, sc_names))
for thread in self.subcloud_threads:
thread.wait()
# Clear the list of threads before next interval
self.subcloud_threads = list()
LOG.debug('Engine id:(%s): All subcloud syncs have completed.'
% engine_id)
@subcloud_lock.sync_subcloud
def mutex_start_thread(self, context, engine_id, subcloud_name,
endpoint_type, action):
# Double check whether still need while locked this time
subcloud_sync = db_api.subcloud_sync_get(context, subcloud_name,
endpoint_type)
if subcloud_sync.sync_request in [dco_consts.SYNC_STATUS_REQUESTED,
dco_consts.SYNC_STATUS_FAILED]:
thread = self.thread_group_manager.start(
self._sync_subcloud, context, engine_id, subcloud_name,
endpoint_type)
self.subcloud_threads.append(thread)
else:
LOG.debug("mutex_start_thread Engine id: %s/%s sync not required" %
(engine_id, subcloud_name))
def sync_subcloud(self, context, engine_id, subcloud_name, endpoint_type,
action):
# precheck if the sync_state is still started
subcloud_sync = db_api.subcloud_sync_get(context, subcloud_name,
endpoint_type)
if subcloud_sync.sync_request in [dco_consts.SYNC_STATUS_REQUESTED,
dco_consts.SYNC_STATUS_FAILED]:
self.mutex_start_thread(
context, engine_id, subcloud_name, endpoint_type, action)
else:
LOG.debug("Engine id: %s/%s sync not required" %
(engine_id, subcloud_name))
def _sync_subcloud(self, context, engine_id, subcloud_name, endpoint_type):
db_api.subcloud_sync_update(
context, subcloud_name, endpoint_type,
values={'sync_request': dco_consts.SYNC_STATUS_IN_PROGRESS})
obj = self.sync_objs[subcloud_name][endpoint_type]
new_state = dco_consts.SYNC_STATUS_COMPLETED
timeout = eventlet.timeout.Timeout(SYNC_TIMEOUT)
try:
obj.sync(engine_id)
except eventlet.timeout.Timeout as t:
if t is not timeout:
raise # not my timeout
new_state = dco_consts.SYNC_STATUS_FAILED
except Exception as e:
LOG.exception('Sync failed for %s/%s: %s',
subcloud_name, endpoint_type, e)
new_state = dco_consts.SYNC_STATUS_FAILED
finally:
timeout.cancel()
db_api.subcloud_sync_update(
context, subcloud_name, endpoint_type,
values={'sync_request': new_state})
def add_subcloud(self, context, name, version):
# create subcloud in DB and create the sync objects
LOG.info('adding subcloud %(sc)s' % {'sc': name})
capabilities = {}
endpoint_type_list = dco_consts.SYNC_ENDPOINT_TYPES_LIST[:]
capabilities.update({'endpoint_types': endpoint_type_list})
sc = subcloud.Subcloud(
context, region_name=name, software_version=version,
capabilities=capabilities)
sc = sc.create()
for endpoint_type in endpoint_type_list:
db_api.subcloud_sync_create(context, name, endpoint_type,
values={'subcloud_id': sc.id}) # pylint: disable=E1101
# Create the sync object for this engine
self.create_sync_objects(name, capabilities)
def del_subcloud(self, context, subcloud_name):
# first update the state of the subcloud
self.update_subcloud_state(
subcloud_name,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
availability_status=dccommon_consts.AVAILABILITY_OFFLINE)
# shutdown, optionally deleting queued work
if subcloud_name not in self.sync_objs:
LOG.error("Subcloud %s sync_objs do not exist" % subcloud_name)
else:
del self.sync_objs[subcloud_name]
try:
# delete this subcloud
subcloud.Subcloud.delete_subcloud_by_name(context, subcloud_name)
except Exception:
raise exceptions.SubcloudNotFound(region_name=subcloud_name)
def sync_request(self, ctxt, endpoint_type):
# Someone has enqueued a sync job. set the endpoint sync_request to
# requested
subclouds = db_api.subcloud_get_all(
ctxt, management_state=dccommon_consts.MANAGEMENT_MANAGED)
for sc in subclouds:
GenericSyncManager.set_sync_request(ctxt, sc.region_name,
endpoint_type)
@classmethod
def set_sync_request(cls, ctxt, subcloud_name, endpoint_type):
db_api.subcloud_sync_update(
ctxt, subcloud_name, endpoint_type,
values={'sync_request': dco_consts.SYNC_STATUS_REQUESTED})
def subcloud_state_matches(self, subcloud_name,
management_state=None,
availability_status=None,
initial_sync_state=None):
# compare subcloud states
match = True
sc = subcloud.Subcloud.get_by_name(self.context, subcloud_name)
if management_state is not None:
if sc.management_state != management_state:
match = False
if match and availability_status is not None:
if sc.availability_status != availability_status:
match = False
if match and initial_sync_state is not None:
if sc.initial_sync_state != initial_sync_state:
match = False
return match
def update_subcloud_state(self, subcloud_name,
management_state=None,
availability_status=None,
initial_sync_state=None):
LOG.info('updating state for subcloud %(sc)s - '
'management_state: %(mgmt)s '
'availability_status: %(avail)s '
'initial_sync_state: %(iss)s ' %
{'sc': subcloud_name, 'mgmt': management_state,
'avail': availability_status, 'iss': initial_sync_state})
sc = subcloud.Subcloud.get_by_name(self.context, subcloud_name)
if management_state is not None:
sc.management_state = management_state
if availability_status is not None:
sc.availability_status = availability_status
if initial_sync_state is not None:
sc.initial_sync_state = initial_sync_state
sc.save()
def init_subcloud_sync_audit(self, subcloud_name):
LOG.info('Initialize subcloud sync audit for '
'subcloud %(sc)s' %
{'sc': subcloud_name})
endpoint_type_list = dco_consts.SYNC_ENDPOINT_TYPES_LIST[:]
for endpoint_type in endpoint_type_list:
db_api.subcloud_sync_update(
self.context, subcloud_name, endpoint_type,
values={'audit_status': dco_consts.AUDIT_STATUS_NONE,
'sync_status_reported': dco_consts.SYNC_STATUS_NONE,
'sync_status_report_time': None,
'last_audit_time': None})
def enable_subcloud(self, context, subcloud_name):
LOG.info('enabling subcloud %(sc)s' % {'sc': subcloud_name})
if subcloud_name in self.sync_objs:
for sync_obj in self.sync_objs[subcloud_name].values():
LOG.info('Engine id: %(id)s enabling sync '
'thread subcloud %(sc)s' %
{'sc': subcloud_name, 'id': self.engine_id})
sync_obj.enable()
else:
LOG.error("enable_subcloud No sync objects for subcloud:%s" %
subcloud_name)
def disable_subcloud(self, context, subcloud_name):
LOG.info('disabling subcloud %(sc)s' % {'sc': subcloud_name})
# nothing to do here at the moment
pass
def is_subcloud_managed(self, subcloud_name):
# is this subcloud managed
sc = subcloud.Subcloud.get_by_name(self.context, subcloud_name)
return sc.management_state == dccommon_consts.MANAGEMENT_MANAGED
def is_subcloud_enabled(self, subcloud_name):
# is this subcloud enabled
sc = subcloud.Subcloud.get_by_name(self.context, subcloud_name)
# We only enable syncing if the subcloud is online and the initial
# sync has completed.
if (sc.availability_status == dccommon_consts.AVAILABILITY_ONLINE and
sc.initial_sync_state == dco_consts.INITIAL_SYNC_STATE_COMPLETED):
return True
else:
return False
def is_subcloud_ready(self, subcloud_name):
# is this subcloud ready for synchronization
return self.is_subcloud_managed(subcloud_name) and \
self.is_subcloud_enabled(subcloud_name)
def add_subcloud_sync_endpoint_type(self, context, subcloud_name,
endpoint_type_list=None):
# TODO(jkung): This method is currently only required by
# stx-openstack and is to be integrated with stx-openstack when
# that feature is enabled.
LOG.info("add_subcloud_sync_endpoint_type subcloud_name=%s "
"endpoint_type_list=%s" %
(subcloud_name, endpoint_type_list))
sc = subcloud.Subcloud.get_by_name(context, subcloud_name)
capabilities = sc.capabilities
c_endpoint_type_list = capabilities.get('endpoint_types', [])
# Update the DB first
if endpoint_type_list:
for endpoint_type in endpoint_type_list:
if endpoint_type not in c_endpoint_type_list:
c_endpoint_type_list.append(endpoint_type)
if capabilities.get('endpoint_types') is None:
# assign back if 'endpoint_types' is not in capabilities
capabilities['endpoint_types'] = c_endpoint_type_list
sc.capabilities = capabilities
sc.save()
# Create objects for the endpoint types
if endpoint_type_list:
for endpoint_type in endpoint_type_list:
# Check whether sync endpoint already exists
try:
subcloud_sync = db_api.subcloud_sync_get(
context, subcloud_name,
endpoint_type)
if subcloud_sync:
LOG.info("subcloud_sync subcloud=%s "
"endpoint_type=%s already exists" %
(subcloud_name, endpoint_type))
continue
except exceptions.SubcloudSyncNotFound:
pass
# skip creation if a sync_obj of this endpoint type already
# exists
sync_obj = self.sync_objs[subcloud_name].get(
endpoint_type == endpoint_type)
if not sync_obj:
LOG.info("add_subcloud_sync_endpoint_type "
"subcloud_name=%s, sync_obj add=%s" %
(subcloud_name, endpoint_type))
sync_obj = sync_object_class_map[endpoint_type](
subcloud_name, endpoint_type=endpoint_type)
self.sync_objs[subcloud_name].update(
{endpoint_type: sync_obj})
# create the subcloud_sync !!!
db_api.subcloud_sync_create(
context, subcloud_name, endpoint_type,
values={'subcloud_id': sc.id}) # pylint: disable=E1101
if self.is_subcloud_ready(subcloud_name):
sync_obj.enable()
sync_obj.initial_sync()
def remove_subcloud_sync_endpoint_type(self, context, subcloud_name,
endpoint_type_list=None):
# TODO(jkung): This method is currently only required by
# stx-openstack and is to be integrated with stx-openstack when
# that feature is enabled and remove action performed.
# The subcloud_sync delete can be more graceful by ensuring the
# sync object is updated for each engine on delete.
LOG.info("remove_subcloud_sync_endpoint_type subcloud_name=%s "
"endpoint_type_list=%s" %
(subcloud_name, endpoint_type_list))
# Remove sync_objs and subcloud_sync for endpoint types to be removed
if endpoint_type_list:
for endpoint_type in endpoint_type_list:
self.sync_objs[subcloud_name].pop(endpoint_type, None)
try:
db_api.subcloud_sync_delete(
context, subcloud_name, endpoint_type)
except exceptions.SubcloudSyncNotFound:
pass
# remove the endpoint types from subcloud capabilities
sc = subcloud.Subcloud.get_by_name(context, subcloud_name)
capabilities = sc.capabilities
c_endpoint_type_list = capabilities.get('endpoint_types', [])
if endpoint_type_list and c_endpoint_type_list:
for endpoint_type in endpoint_type_list:
if endpoint_type in c_endpoint_type_list:
c_endpoint_type_list.remove(endpoint_type)
sc.capabilities = capabilities
sc.save()
def update_subcloud_version(self, context, subcloud_name, sw_version):
try:
sc = subcloud.Subcloud.get_by_name(context, subcloud_name)
sc.software_version = sw_version
sc.save()
except KeyError:
raise exceptions.SubcloudNotFound(region_name=subcloud_name)
def update_subcloud_endpoints(self, context, subcloud_name, endpoints):
try:
LOG.info("Updating service endpoints for subcloud %s in "
"endpoint cache" % subcloud_name)
endpoint_cache = sdk.OpenStackDriver(
region_name=dccommon_consts.CLOUD_0).keystone_client.endpoint_cache
endpoint_cache.update_master_service_endpoint_region(
subcloud_name, endpoints)
except (keystone_exceptions.EndpointNotFound,
keystone_exceptions.ConnectFailure,
IndexError):
LOG.error("Failed to update services endpoints for "
"subcloud: %s in dcorch." % subcloud_name)
def initial_sync(self, context, subcloud_name):
LOG.info('Initial sync subcloud %(sc)s %(id)s' %
{'sc': subcloud_name, 'id': self.engine_id})
# initial synchronization of the subcloud
if subcloud_name in self.sync_objs:
# self.sync_objs stores the sync object per endpoint
for sync_obj in self.sync_objs[subcloud_name].values():
sync_obj.initial_sync()
else:
LOG.info('Initial sync subcloud %(sc)s '
'sync_objs not found...creating' %
{'sc': subcloud_name})
capabilities = {}
endpoint_type_list = dco_consts.SYNC_ENDPOINT_TYPES_LIST[:]
capabilities.update({'endpoint_types': endpoint_type_list})
self.create_sync_objects(subcloud_name, capabilities)
if subcloud_name in self.sync_objs:
# self.sync_objs stores the sync object per endpoint
for sync_obj in self.sync_objs[subcloud_name].values():
sync_obj.initial_sync()
else:
LOG.error('Initial sync subcloud %(sc)s '
'sync_objs not found' %
{'sc': subcloud_name})
@subcloud_lock.sync_subcloud
def audit_subcloud(self, context, engine_id, subcloud_name, endpoint_type,
action):
subcloud_sync = db_api.subcloud_sync_get(context, subcloud_name,
endpoint_type)
# check if the last audit time is equal or greater than the audit
# interval ( only if the status is completed
# if status is failed, go ahead with audit
# restart audit if process death while audit is in progress
audit = False
if subcloud_sync.audit_status in [dco_consts.AUDIT_STATUS_COMPLETED,
dco_consts.AUDIT_STATUS_IN_PROGRESS]:
if subcloud_sync.last_audit_time:
delta = timeutils.delta_seconds(
subcloud_sync.last_audit_time, timeutils.utcnow())
# Audit interval
if delta >= AUDIT_INTERVAL:
audit = True
else:
audit = True
elif subcloud_sync.audit_status in [dco_consts.AUDIT_STATUS_NONE,
dco_consts.AUDIT_STATUS_FAILED]:
audit = True
if audit:
thread = self.thread_group_manager.start(
self._audit_subcloud, engine_id, subcloud_name, endpoint_type)
self.subcloud_audit_threads.append(thread)
def _audit_subcloud(self, engine_id, subcloud_name, endpoint_type):
# The last_audit_time is set up front in order to ensure synchronous
# audit_subcloud() check for in progress and last_audit_time
db_api.subcloud_sync_update(
context, subcloud_name, endpoint_type,
values={'audit_status': dco_consts.AUDIT_STATUS_IN_PROGRESS,
'last_audit_time': timeutils.utcnow()})
obj = self.sync_objs[subcloud_name][endpoint_type]
new_state = dco_consts.AUDIT_STATUS_COMPLETED
timeout = eventlet.timeout.Timeout(SYNC_TIMEOUT)
try:
obj.run_sync_audit(engine_id)
except eventlet.timeout.Timeout as t:
if t is not timeout:
raise # not my timeout
new_state = dco_consts.AUDIT_STATUS_FAILED
except Exception as e:
LOG.exception('Audit failed for %s/%s: %s',
subcloud_name, endpoint_type, e)
new_state = dco_consts.AUDIT_STATUS_FAILED
finally:
timeout.cancel()
db_api.subcloud_sync_update(
context, subcloud_name, endpoint_type,
values={'audit_status': new_state})
def run_sync_audit(self, engine_id):
LOG.info('run_sync_audit %(id)s' % {'id': engine_id})
# get a list of subclouds that are enabled
subclouds = db_api.subcloud_get_all(
self.context,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
initial_sync_state=dco_consts.INITIAL_SYNC_STATE_COMPLETED)
# randomize to reduce likelihood of sync_lock contention
random.shuffle(subclouds)
for sc in subclouds:
if sc.region_name in list(self.sync_objs.keys()):
for e in self.sync_objs[sc.region_name].keys():
LOG.debug("Attempt audit_subcloud: %s/%s/%s",
engine_id, sc.region_name, e)
self.audit_subcloud(self.context, engine_id,
sc.region_name, e, 'audit')
else:
# In this case, distribution of sync objects are
# to each worker. If needed in future implementation,
# it is possible to distribute sync_objs to certain workers.
LOG.info('Run sync audit sync subcloud %(sc)s '
'sync_objs not found...creating' %
{'sc': sc.region_name})
capabilities = {}
endpoint_type_list = dco_consts.SYNC_ENDPOINT_TYPES_LIST[:]
capabilities.update({'endpoint_types': endpoint_type_list})
self.create_sync_objects(sc.region_name, capabilities)
# self.sync_objs stores the sync object per endpoint
if sc.region_name in list(self.sync_objs.keys()):
for e in self.sync_objs[sc.region_name].keys():
LOG.debug("Attempt audit_subcloud: %s/%s/%s",
engine_id, sc.region_name, e)
self.audit_subcloud(self.context, engine_id,
sc.region_name, e, 'audit')
else:
LOG.error('Run sync audit subcloud %(sc)s '
'sync_objs not found' %
{'sc': sc.region_name})
LOG.debug('Engine id:(%s) Waiting for audit_subclouds to complete.'
% engine_id)
for thread in self.subcloud_audit_threads:
thread.wait()
# Clear the list of threads before next interval
self.subcloud_audit_threads = list()
LOG.info('Engine id:(%s): All subcloud audit have completed.'
% engine_id)