Eliminate unnecessary fd creation in dcorch

With 500 AIO-SX subclouds, the file descriptor (fd) usage for
the dcorch-engine processes is higher than expected. The open
file descriptors are about 13,000. The majority of the file
descriptors (almost 99%) are idle network sockets.

This update removes statically open connections that are not
used or not needed. Since both sysinv and dbsync clients are
not session-based they will not take advantage of an already
authenticated session. Cached clients are still used for both
sysinv and dbsync clients.

The test result shows the number of fds have been reduced by
more than 94%.

Story: 2008960
Task: 42868

Signed-off-by: Tao Liu <tao.liu@windriver.com>
Change-Id: Ie30910a335d5e8caa26a2f1834794146c7555e53
This commit is contained in:
Tao Liu
2021-07-20 16:37:37 -04:00
parent b4ef7c7bd3
commit e83493f30b
5 changed files with 147 additions and 164 deletions

View File

@@ -28,11 +28,13 @@ from dccommon.drivers.openstack.sysinv_v1 import SysinvClient
from dccommon import exceptions
from dccommon.utils import is_token_expiring_soon
from dcdbsync.dbsyncclient.client import Client as dbsyncclient
KEYSTONE_CLIENT_NAME = 'keystone'
SYSINV_CLIENT_NAME = 'sysinv'
FM_CLIENT_NAME = 'fm'
BARBICAN_CLIENT_NAME = 'barbican'
DBSYNC_CLIENT_NAME = 'dbsync'
LOG = log.getLogger(__name__)
@@ -42,6 +44,7 @@ SUPPORTED_REGION_CLIENTS = [
SYSINV_CLIENT_NAME,
FM_CLIENT_NAME,
BARBICAN_CLIENT_NAME,
DBSYNC_CLIENT_NAME,
]
# region client type and class mappings
@@ -49,6 +52,7 @@ region_client_class_map = {
SYSINV_CLIENT_NAME: SysinvClient,
FM_CLIENT_NAME: FmClient,
BARBICAN_CLIENT_NAME: BarbicanClient,
DBSYNC_CLIENT_NAME: dbsyncclient,
}
@@ -58,13 +62,15 @@ class OpenStackDriver(object):
_identity_tokens = {}
def __init__(self, region_name=consts.CLOUD_0, thread_name='dcorch',
auth_url=None, region_clients=SUPPORTED_REGION_CLIENTS):
auth_url=None, region_clients=SUPPORTED_REGION_CLIENTS,
endpoint_type=consts.KS_ENDPOINT_DEFAULT):
# Check if objects are cached and try to use those
self.region_name = region_name
self.keystone_client = None
self.sysinv_client = None
self.fm_client = None
self.barbican_client = None
self.dbsync_client = None
if region_clients:
# check if the requested clients are in the supported client list
@@ -107,7 +113,9 @@ class OpenStackDriver(object):
# Create new client object and cache it
try:
client_object = region_client_class_map[client_name](
region_name, self.keystone_client.session)
region=region_name,
session=self.keystone_client.session,
endpoint_type=endpoint_type)
setattr(self, client_obj_name, client_object)
OpenStackDriver.update_region_clients(region_name,
client_name,

View File

@@ -101,7 +101,9 @@ def make_sysinv_patch(update_dict):
class SysinvClient(base.DriverBase):
"""Sysinv V1 driver."""
def __init__(self, region, session, timeout=SYSINV_CLIENT_REST_DEFAULT_TIMEOUT):
def __init__(self, region, session,
timeout=SYSINV_CLIENT_REST_DEFAULT_TIMEOUT,
endpoint_type=consts.KS_ENDPOINT_ADMIN):
try:
# TOX cannot import cgts_client and all the dependencies therefore
# the client is being lazy loaded since TOX doesn't actually
@@ -113,7 +115,7 @@ class SysinvClient(base.DriverBase):
endpoint = session.get_endpoint(
service_type='platform',
region_name=region,
interface=consts.KS_ENDPOINT_ADMIN)
interface=endpoint_type)
token = session.get_token()
self.sysinv_client = client.Client(API_VERSION,

View File

@@ -18,7 +18,7 @@ import base64
from collections import namedtuple
from dccommon import consts as dccommon_consts
from dcdbsync.dbsyncclient import client as dbsyncclient
from dccommon.drivers.openstack import sdk_platform as sdk
from dcdbsync.dbsyncclient import exceptions as dbsync_exceptions
from dcorch.common import consts
from dcorch.common import exceptions
@@ -26,7 +26,6 @@ from dcorch.engine.sync_thread import SyncThread
from dcorch.objects import resource
from keystoneauth1 import exceptions as keystone_exceptions
from keystoneclient import client as keystoneclient
from oslo_log import log as logging
from oslo_serialization import jsonutils
@@ -86,74 +85,33 @@ class IdentitySyncThread(SyncThread):
self.log_extra = {"instance": "{}/{}: ".format(
self.region_name, self.endpoint_type)}
self.sc_ks_client = None
self.sc_dbs_client = None
self.initialize()
LOG.info("IdentitySyncThread initialized", extra=self.log_extra)
def initialize_sc_clients(self):
super(IdentitySyncThread, self).initialize_sc_clients()
# create a keystone client for the subcloud
if (not self.sc_ks_client and self.sc_admin_session):
self.sc_ks_client = keystoneclient.Client(
session=self.sc_admin_session,
endpoint_type=dccommon_consts.KS_ENDPOINT_ADMIN,
region_name=self.region_name)
# create a dbsync client for the subcloud
if (not self.sc_dbs_client and self.sc_admin_session):
self.sc_dbs_client = dbsyncclient.Client(
session=self.sc_admin_session,
endpoint_type=consts.DBS_ENDPOINT_ADMIN,
region_name=self.region_name)
@staticmethod
def get_os_client(region):
try:
os_client = sdk.OpenStackDriver(
region_name=region,
region_clients=['dbsync'])
except Exception as e:
LOG.error("Failed to get os_client for region {} {}."
.format(region, str(e)))
raise e
return os_client
def reauthenticate_m_dbs_client(self):
if self.m_dbs_client and self.admin_session:
self.m_dbs_client.update(session=self.admin_session)
def get_ks_client(self, region):
return self.get_os_client(region).keystone_client.keystone_client
def reauthenticate_m_ks_client(self):
if self.m_ks_client and self.admin_session:
self.m_ks_client.session.invalidate()
self.m_ks_client.session.get_auth_headers()
def reauthenticate_sc_clients(self):
self.reauthenticate_sc_ks_client()
self.sc_dbs_client.update(session=self.sc_admin_session)
def reauthenticate_sc_dbs_client(self):
self.sc_admin_session = None
self.sc_dbs_client = None
self.initialize_sc_clients()
def reauthenticate_sc_ks_client(self):
self.sc_admin_session = None
self.sc_ks_client = None
self.initialize_sc_clients()
def initialize(self):
# Subcloud may be enabled a while after being added.
# Keystone endpoints for the subcloud could be added in
# between these 2 steps. Reinitialize the session to
# get the most up-to-date service catalog.
super(IdentitySyncThread, self).initialize()
# We initialize a master version of the keystone client, and a
# subcloud specific version
self.m_ks_client = self.ks_client
# We initialize a master version of the dbsync client, and a
# subcloud specific version
self.m_dbs_client = self.dbs_client
LOG.info("Identity session and clients initialized",
extra=self.log_extra)
def get_dbs_client(self, region):
return self.get_os_client(region).dbsync_client
def _initial_sync_users(self, m_users, sc_users):
# Particularly sync users with same name but different ID. admin,
# sysinv, and dcmanager users are special cases as the id's will match
# (as this is forced during the subcloud deploy) but the details will
# not so we still need to sync them here.
m_client = self.m_dbs_client.identity_manager
sc_client = self.sc_dbs_client.identity_manager
m_client = self.get_dbs_client(self.master_region_name).identity_manager
sc_client = self.get_dbs_client(self.region_name).identity_manager
for m_user in m_users:
for sc_user in sc_users:
@@ -180,24 +138,22 @@ class IdentitySyncThread(SyncThread):
LOG.info("Update user {} request failed for {}: {}."
.format(sc_user.id,
self.region_name, str(e)))
self.reauthenticate_sc_dbs_client()
# Clear the cache so that the old token will not be validated
sdk.OpenStackDriver.delete_region_clients(self.region_name)
# Retry with a new token
sc_client = self.get_dbs_client(
self.region_name).identity_manager
user_ref = sc_client.update_user(sc_user.id,
user_records)
if not user_ref:
LOG.error("No user data returned when updating user {}"
" in subcloud.".format(sc_user.id))
raise exceptions.SyncRequestFailed
# If admin user get synced, the client need to
# re-authenticate.
if sc_user.local_user.name == \
dccommon_consts.ADMIN_USER_NAME:
self.reauthenticate_sc_clients()
def _initial_sync_projects(self, m_projects, sc_projects):
# Particularly sync projects with same name but different ID.
m_client = self.m_dbs_client.project_manager
sc_client = self.sc_dbs_client.project_manager
m_client = self.get_dbs_client(self.master_region_name).project_manager
sc_client = self.get_dbs_client(self.region_name).project_manager
for m_project in m_projects:
for sc_project in sc_projects:
@@ -220,7 +176,10 @@ class IdentitySyncThread(SyncThread):
LOG.info("Update project {} request failed for {}: {}."
.format(sc_project.id,
self.region_name, str(e)))
self.reauthenticate_sc_dbs_client()
# Clear the cache so that the old token will not be validated
sdk.OpenStackDriver.delete_region_clients(self.region_name)
sc_client = self.get_dbs_client(
self.region_name).project_manager
project_ref = sc_client.update_project(sc_project.id,
project_records)
@@ -229,15 +188,11 @@ class IdentitySyncThread(SyncThread):
" project {} in subcloud.".
format(sc_project.id))
raise exceptions.SyncRequestFailed
# If admin project get synced, the client need to
# re-authenticate.
if sc_project.name == dccommon_consts.ADMIN_PROJECT_NAME:
self.reauthenticate_sc_clients()
def _initial_sync_roles(self, m_roles, sc_roles):
# Particularly sync roles with same name but different ID
m_client = self.m_dbs_client.role_manager
sc_client = self.sc_dbs_client.role_manager
m_client = self.get_dbs_client(self.master_region_name).role_manager
sc_client = self.get_dbs_client(self.region_name).role_manager
for m_role in m_roles:
for sc_role in sc_roles:
@@ -260,7 +215,10 @@ class IdentitySyncThread(SyncThread):
LOG.info("Update role {} request failed for {}: {}."
.format(sc_role.id,
self.region_name, str(e)))
self.reauthenticate_sc_dbs_client()
# Clear the cache so that the old token will not be validated
sdk.OpenStackDriver.delete_region_clients(self.region_name)
sc_client = self.get_dbs_client(
self.region_name).role_manager
role_ref = sc_client.update_role(sc_role.id,
role_record)
@@ -283,7 +241,6 @@ class IdentitySyncThread(SyncThread):
# tokens at subcloud are revoked and services are forced to
# re-authenticate to get new tokens. This significantly decreases
# service recovery time at subcloud.
self.initialize_sc_clients()
# get users from master cloud
m_users = self.get_master_resources(
@@ -349,7 +306,6 @@ class IdentitySyncThread(SyncThread):
return True
def sync_identity_resource(self, request, rsrc):
self.initialize_sc_clients()
# Invoke function with name format "operationtype_resourcetype"
# For example: post_users()
try:
@@ -383,7 +339,6 @@ class IdentitySyncThread(SyncThread):
rsrc.resource_type,
self.region_name,
str(e)), extra=self.log_extra)
self.reauthenticate_sc_clients()
raise exceptions.SyncRequestFailedRetry
except dbsync_exceptions.UnauthorizedMaster as e:
LOG.info("Request [{} {}] failed for {}: {}"
@@ -391,7 +346,6 @@ class IdentitySyncThread(SyncThread):
rsrc.resource_type,
self.region_name,
str(e)), extra=self.log_extra)
self.reauthenticate_m_dbs_client()
raise exceptions.SyncRequestFailedRetry
except exceptions.SyncRequestFailed:
raise
@@ -414,8 +368,8 @@ class IdentitySyncThread(SyncThread):
# Retrieve DB records of the user just created. The records is in JSON
# format
try:
user_records = self.m_dbs_client.identity_manager.\
user_detail(user_id)
user_records = self.get_dbs_client(self.master_region_name).\
identity_manager.user_detail(user_id)
except dbsync_exceptions.Unauthorized:
raise dbsync_exceptions.UnauthorizedMaster
if not user_records:
@@ -425,7 +379,9 @@ class IdentitySyncThread(SyncThread):
raise exceptions.SyncRequestFailed
# Create the user on subcloud by pushing the DB records to subcloud
user_ref = self.sc_dbs_client.identity_manager.add_user(user_records)
user_ref = self.get_dbs_client(
self.region_name).identity_manager.add_user(
user_records)
if not user_ref:
LOG.error("No user data returned when creating user {} in"
" subcloud.".format(user_id), extra=self.log_extra)
@@ -465,8 +421,8 @@ class IdentitySyncThread(SyncThread):
# Retrieve DB records of the user. The records is in JSON
# format
try:
user_records = self.m_dbs_client.identity_manager.\
user_detail(user_id)
user_records = self.get_dbs_client(self.master_region_name).\
identity_manager.user_detail(user_id)
except dbsync_exceptions.Unauthorized:
raise dbsync_exceptions.UnauthorizedMaster
if not user_records:
@@ -477,7 +433,7 @@ class IdentitySyncThread(SyncThread):
# Update the corresponding user on subcloud by pushing the DB records
# to subcloud
user_ref = self.sc_dbs_client.identity_manager.\
user_ref = self.get_dbs_client(self.region_name).identity_manager.\
update_user(sc_user_id, user_records)
if not user_ref:
LOG.error("No user data returned when updating user {} in"
@@ -519,8 +475,9 @@ class IdentitySyncThread(SyncThread):
user_id = user_subcloud_rsrc.subcloud_resource_id
original_user_ref = UserReferenceWrapper(id=user_id)
sc_ks_client = self.get_ks_client(self.region_name)
# Update the user in the subcloud
user_ref = self.sc_ks_client.users.update(
user_ref = sc_ks_client.users.update(
original_user_ref,
name=user_update_dict.pop('name', None),
domain=user_update_dict.pop('domain', None),
@@ -531,7 +488,7 @@ class IdentitySyncThread(SyncThread):
enabled=user_update_dict.pop('enabled', None),
default_project=user_update_dict.pop('default_project', None))
if (user_ref.id == user_id):
if user_ref.id == user_id:
LOG.info("Updated Keystone user: {}:{}"
.format(rsrc.id, user_ref.id), extra=self.log_extra)
else:
@@ -558,7 +515,8 @@ class IdentitySyncThread(SyncThread):
# Delete the user in the subcloud
try:
self.sc_ks_client.users.delete(original_user_ref)
sc_ks_client = self.get_ks_client(self.region_name)
sc_ks_client.users.delete(original_user_ref)
except keystone_exceptions.NotFound:
LOG.info("Delete user: user {} not found in {}, "
"considered as deleted.".
@@ -589,7 +547,8 @@ class IdentitySyncThread(SyncThread):
# Retrieve DB records of the project just created.
# The records is in JSON format.
try:
project_records = self.m_dbs_client.project_manager.\
m_dbs_client = self.get_dbs_client(self.master_region_name)
project_records = m_dbs_client.project_manager.\
project_detail(project_id)
except dbsync_exceptions.Unauthorized:
raise dbsync_exceptions.UnauthorizedMaster
@@ -600,7 +559,8 @@ class IdentitySyncThread(SyncThread):
raise exceptions.SyncRequestFailed
# Create the project on subcloud by pushing the DB records to subcloud
project_ref = self.sc_dbs_client.project_manager.\
sc_dbs_client = self.get_dbs_client(self.region_name)
project_ref = sc_dbs_client.project_manager.\
add_project(project_records)
if not project_ref:
LOG.error("No project data returned when creating project {} in"
@@ -641,7 +601,8 @@ class IdentitySyncThread(SyncThread):
# Retrieve DB records of the project. The records is in JSON
# format
try:
project_records = self.m_dbs_client.project_manager.\
m_dbs_client = self.get_dbs_client(self.master_region_name)
project_records = m_dbs_client.project_manager.\
project_detail(project_id)
except dbsync_exceptions.Unauthorized:
raise dbsync_exceptions.UnauthorizedMaster
@@ -653,7 +614,8 @@ class IdentitySyncThread(SyncThread):
# Update the corresponding project on subcloud by pushing the DB
# records to subcloud
project_ref = self.sc_dbs_client.project_manager.\
sc_dbs_client = self.get_dbs_client(self.region_name)
project_ref = sc_dbs_client.project_manager.\
update_project(sc_project_id, project_records)
if not project_ref:
LOG.error("No project data returned when updating project {} in"
@@ -695,14 +657,15 @@ class IdentitySyncThread(SyncThread):
original_proj_ref = ProjectReferenceWrapper(id=proj_id)
# Update the project in the subcloud
project_ref = self.sc_ks_client.projects.update(
sc_ks_client = self.get_ks_client(self.region_name)
project_ref = sc_ks_client.projects.update(
original_proj_ref,
name=project_update_dict.pop('name', None),
domain=project_update_dict.pop('domain_id', None),
description=project_update_dict.pop('description', None),
enabled=project_update_dict.pop('enabled', None))
if (project_ref.id == proj_id):
if project_ref.id == proj_id:
LOG.info("Updated Keystone project: {}:{}"
.format(rsrc.id, project_ref.id), extra=self.log_extra)
else:
@@ -729,7 +692,8 @@ class IdentitySyncThread(SyncThread):
# Delete the project in the subcloud
try:
self.sc_ks_client.projects.delete(original_proj_ref)
sc_ks_client = self.get_ks_client(self.region_name)
sc_ks_client.projects.delete(original_proj_ref)
except keystone_exceptions.NotFound:
LOG.info("Delete project: project {} not found in {}, "
"considered as deleted.".
@@ -760,7 +724,8 @@ class IdentitySyncThread(SyncThread):
# Retrieve DB records of the role just created. The records is in JSON
# format.
try:
role_records = self.m_dbs_client.role_manager.\
m_dbs_client = self.get_dbs_client(self.master_region_name)
role_records = m_dbs_client.role_manager.\
role_detail(role_id)
except dbsync_exceptions.Unauthorized:
raise dbsync_exceptions.UnauthorizedMaster
@@ -771,8 +736,8 @@ class IdentitySyncThread(SyncThread):
raise exceptions.SyncRequestFailed
# Create the role on subcloud by pushing the DB records to subcloud
role_ref = self.sc_dbs_client.role_manager.\
add_role(role_records)
sc_dbs_client = self.get_dbs_client(self.region_name)
role_ref = sc_dbs_client.role_manager.add_role(role_records)
if not role_ref:
LOG.error("No role data returned when creating role {} in"
" subcloud.".format(role_id), extra=self.log_extra)
@@ -812,8 +777,8 @@ class IdentitySyncThread(SyncThread):
# Retrieve DB records of the role. The records is in JSON
# format
try:
role_records = self.m_dbs_client.role_manager.\
role_detail(role_id)
m_dbs_client = self.get_dbs_client(self.master_region_name)
role_records = m_dbs_client.role_manager.role_detail(role_id)
except dbsync_exceptions.Unauthorized:
raise dbsync_exceptions.UnauthorizedMaster
if not role_records:
@@ -824,7 +789,8 @@ class IdentitySyncThread(SyncThread):
# Update the corresponding role on subcloud by pushing the DB records
# to subcloud
role_ref = self.sc_dbs_client.role_manager.\
sc_dbs_client = self.get_dbs_client(self.region_name)
role_ref = sc_dbs_client.role_manager.\
update_role(sc_role_id, role_records)
if not role_ref:
LOG.error("No role data returned when updating role {} in"
@@ -866,11 +832,12 @@ class IdentitySyncThread(SyncThread):
original_role_ref = RoleReferenceWrapper(id=role_id)
# Update the role in the subcloud
role_ref = self.sc_ks_client.roles.update(
sc_ks_client = self.get_ks_client(self.region_name)
role_ref = sc_ks_client.roles.update(
original_role_ref,
name=role_update_dict.pop('name', None))
if (role_ref.id == role_id):
if role_ref.id == role_id:
LOG.info("Updated Keystone role: {}:{}"
.format(rsrc.id, role_ref.id), extra=self.log_extra)
else:
@@ -897,7 +864,8 @@ class IdentitySyncThread(SyncThread):
# Delete the role in the subcloud
try:
self.sc_ks_client.roles.delete(original_role_ref)
sc_ks_client = self.get_ks_client(self.region_name)
sc_ks_client.roles.delete(original_role_ref)
except keystone_exceptions.NotFound:
LOG.info("Delete role: role {} not found in {}, "
"considered as deleted.".
@@ -931,7 +899,8 @@ class IdentitySyncThread(SyncThread):
# Ensure that we have already synced the project, user and role
# prior to syncing the assignment
sc_role = None
sc_role_list = self.sc_ks_client.roles.list()
sc_ks_client = self.get_ks_client(self.region_name)
sc_role_list = sc_ks_client.roles.list()
for role in sc_role_list:
if role.id == role_id:
sc_role = role
@@ -944,7 +913,8 @@ class IdentitySyncThread(SyncThread):
raise exceptions.SyncRequestFailed
sc_proj = None
sc_proj_list = self.sc_ks_client.projects.list()
# refresh client in case the token expires in between API calls
sc_proj_list = sc_ks_client.projects.list()
for proj in sc_proj_list:
if proj.id == project_id:
sc_proj = proj
@@ -957,7 +927,7 @@ class IdentitySyncThread(SyncThread):
raise exceptions.SyncRequestFailed
sc_user = None
sc_user_list = self._get_all_users(self.sc_ks_client)
sc_user_list = self._get_all_users(sc_ks_client)
for user in sc_user_list:
if user.id == user_id:
sc_user = user
@@ -970,11 +940,11 @@ class IdentitySyncThread(SyncThread):
raise exceptions.SyncRequestFailed
# Create role assignment
self.sc_ks_client.roles.grant(
sc_ks_client.roles.grant(
sc_role,
user=sc_user,
project=sc_proj)
role_ref = self.sc_ks_client.role_assignments.list(
role_ref = sc_ks_client.role_assignments.list(
user=sc_user,
project=sc_proj,
role=sc_role)
@@ -1028,7 +998,8 @@ class IdentitySyncThread(SyncThread):
# Revoke role assignment
try:
self.sc_ks_client.roles.revoke(
sc_ks_client = self.get_ks_client(self.region_name)
sc_ks_client.roles.revoke(
role_id,
user=user_id,
project=project_id)
@@ -1039,12 +1010,12 @@ class IdentitySyncThread(SyncThread):
self.region_name),
extra=self.log_extra)
role_ref = self.sc_ks_client.role_assignments.list(
role_ref = sc_ks_client.role_assignments.list(
user=user_id,
project=project_id,
role=role_id)
if (not role_ref):
if not role_ref:
LOG.info("Deleted Keystone role assignment: {}:{}"
.format(rsrc.id, assignment_subcloud_rsrc),
extra=self.log_extra)
@@ -1074,7 +1045,8 @@ class IdentitySyncThread(SyncThread):
# Retrieve DB records of the revoke event just created. The records
# is in JSON format.
try:
revoke_event_records = self.m_dbs_client.revoke_event_manager.\
m_dbs_client = self.get_dbs_client(self.master_region_name)
revoke_event_records = m_dbs_client.revoke_event_manager.\
revoke_event_detail(audit_id=audit_id)
except dbsync_exceptions.Unauthorized:
raise dbsync_exceptions.UnauthorizedMaster
@@ -1087,7 +1059,8 @@ class IdentitySyncThread(SyncThread):
# Create the revoke event on subcloud by pushing the DB records to
# subcloud
revoke_event_ref = self.sc_dbs_client.revoke_event_manager.\
sc_dbs_client = self.get_dbs_client(self.region_name)
revoke_event_ref = sc_dbs_client.revoke_event_manager.\
add_revoke_event(revoke_event_records)
if not revoke_event_ref:
LOG.error("No token revocation event data returned when creating"
@@ -1114,9 +1087,9 @@ class IdentitySyncThread(SyncThread):
# subcloud resource id is the audit_id
subcloud_resource_id = revoke_event_subcloud_rsrc.subcloud_resource_id
try:
self.sc_dbs_client.revoke_event_manager.delete_revoke_event(
sc_dbs_client = self.get_dbs_client(self.region_name)
sc_dbs_client.revoke_event_manager.delete_revoke_event(
audit_id=subcloud_resource_id)
except dbsync_exceptions.NotFound:
LOG.info("Delete token revocation event: event {} not found in {},"
@@ -1148,7 +1121,8 @@ class IdentitySyncThread(SyncThread):
# Retrieve DB records of the revoke event just created. The records
# is in JSON format.
try:
revoke_event_records = self.m_dbs_client.revoke_event_manager.\
m_dbs_client = self.get_dbs_client(self.master_region_name)
revoke_event_records = m_dbs_client.revoke_event_manager.\
revoke_event_detail(user_id=event_id)
except dbsync_exceptions.Unauthorized:
raise dbsync_exceptions.UnauthorizedMaster
@@ -1161,7 +1135,8 @@ class IdentitySyncThread(SyncThread):
# Create the revoke event on subcloud by pushing the DB records to
# subcloud
revoke_event_ref = self.sc_dbs_client.revoke_event_manager.\
sc_dbs_client = self.get_dbs_client(self.region_name)
revoke_event_ref = sc_dbs_client.revoke_event_manager.\
add_revoke_event(revoke_event_records)
if not revoke_event_ref:
LOG.error("No token revocation event data returned when creating"
@@ -1186,9 +1161,9 @@ class IdentitySyncThread(SyncThread):
# subcloud resource id is <user_id>_<issued_before> encoded in base64
subcloud_resource_id = revoke_event_subcloud_rsrc.subcloud_resource_id
try:
self.sc_dbs_client.revoke_event_manager.delete_revoke_event(
sc_dbs_client = self.get_dbs_client(self.region_name)
sc_dbs_client.revoke_event_manager.delete_revoke_event(
user_id=subcloud_resource_id)
except dbsync_exceptions.NotFound:
LOG.info("Delete token revocation event: event {} not found in {},"
@@ -1588,81 +1563,79 @@ class IdentitySyncThread(SyncThread):
# dbsync client, other resources use keystone client.
if self.is_resource_handled_by_dbs_client(resource_type):
try:
return self._get_resource_audit_handler(resource_type,
self.m_dbs_client)
return self._get_resource_audit_handler(
resource_type,
self.get_dbs_client(self.master_region_name))
except dbsync_exceptions.Unauthorized as e:
LOG.info("Get master resource [{}] request failed for {}: {}."
.format(resource_type,
dccommon_consts.VIRTUAL_MASTER_CLOUD,
str(e)), extra=self.log_extra)
# Token might be expired, re-authenticate dbsync client
self.reauthenticate_m_dbs_client()
# Retry with re-authenticated dbsync client
return self._get_resource_audit_handler(resource_type,
self.m_dbs_client)
# Clear the cache so that the old token will not be validated
sdk.OpenStackDriver.delete_region_clients(self.master_region_name)
# Retry will get a new token
return self._get_resource_audit_handler(
resource_type,
self.get_dbs_client(self.master_region_name))
except Exception as e:
LOG.exception(e)
return None
else:
try:
return self._get_resource_audit_handler(resource_type,
self.m_ks_client)
return self._get_resource_audit_handler(
resource_type,
self.get_ks_client(self.master_region_name))
except keystone_exceptions.Unauthorized as e:
LOG.info("Get master resource [{}] request failed for {}: {}."
.format(resource_type,
dccommon_consts.VIRTUAL_MASTER_CLOUD,
str(e)), extra=self.log_extra)
# Token might be expired, re-authenticate ks client
self.reauthenticate_m_ks_client()
# Retry with re-authenticated ks client
return self._get_resource_audit_handler(resource_type,
self.m_ks_client)
# Clear the cache so that the old token will not be validated
sdk.OpenStackDriver.delete_region_clients(self.master_region_name)
# Retry with get a new token
return self._get_resource_audit_handler(
resource_type,
self.get_ks_client(self.master_region_name))
except Exception as e:
LOG.exception(e)
return None
def get_subcloud_resources(self, resource_type):
self.initialize_sc_clients()
# Retrieve subcloud resources from DB or through keystone.
# users, projects, roles, and token revocation events use
# dbsync client, other resources use keystone client.
if self.is_resource_handled_by_dbs_client(resource_type):
try:
return self._get_resource_audit_handler(resource_type,
self.sc_dbs_client)
return self._get_resource_audit_handler(
resource_type, self.get_dbs_client(self.region_name))
except dbsync_exceptions.Unauthorized as e:
LOG.info("Get subcloud resource [{}] request failed for {}: {}."
.format(resource_type,
self.region_name,
str(e)), extra=self.log_extra)
# Token might be expired, re-authenticate dbsync client
self.reauthenticate_sc_dbs_client()
# Clear the cache so that the old token will not be validated
sdk.OpenStackDriver.delete_region_clients(self.region_name)
# Retry with re-authenticated dbsync client
return self._get_resource_audit_handler(resource_type,
self.sc_dbs_client)
return self._get_resource_audit_handler(
resource_type, self.get_dbs_client(self.region_name))
except Exception as e:
LOG.exception(e)
return None
else:
try:
return self._get_resource_audit_handler(resource_type,
self.sc_ks_client)
return self._get_resource_audit_handler(
resource_type, self.get_ks_client(self.region_name))
except keystone_exceptions.Unauthorized as e:
LOG.info("Get subcloud resource [{}] request failed for {}: {}."
.format(resource_type,
self.region_name,
str(e)), extra=self.log_extra)
# Token might be expired, re-authenticate ks client
self.reauthenticate_sc_ks_client()
# Clear the cache so that the old token will not be validated
sdk.OpenStackDriver.delete_region_clients(self.region_name)
# Retry with re-authenticated ks client
return self._get_resource_audit_handler(resource_type,
self.sc_ks_client)
return self._get_resource_audit_handler(
resource_type, self.get_ks_client(self.region_name))
except Exception as e:
LOG.exception(e)
return None

View File

@@ -79,15 +79,14 @@ class SysinvSyncThread(SyncThread):
consts.RESOURCE_TYPE_SYSINV_FERNET_REPO,
]
# initialize the master clients
super(SysinvSyncThread, self).initialize()
LOG.info("SysinvSyncThread initialized", extra=self.log_extra)
def sync_platform_resource(self, request, rsrc):
try:
s_os_client = sdk.OpenStackDriver(
region_name=self.region_name,
thread_name='sync')
thread_name='sync',
region_clients=["sysinv"])
# invoke the sync method for the requested resource_type
# I.e. sync_idns
s_func_name = "sync_" + rsrc.resource_type
@@ -439,7 +438,8 @@ class SysinvSyncThread(SyncThread):
try:
os_client = sdk.OpenStackDriver(
region_name=dccommon_consts.CLOUD_0,
thread_name='audit')
thread_name='audit',
region_clients=["sysinv"])
if resource_type == consts.RESOURCE_TYPE_SYSINV_DNS:
return [self.get_dns_resource(os_client)]
elif resource_type == consts.RESOURCE_TYPE_SYSINV_CERTIFICATE:
@@ -461,7 +461,8 @@ class SysinvSyncThread(SyncThread):
threading.currentThread().getName()), extra=self.log_extra)
try:
os_client = sdk.OpenStackDriver(region_name=self.region_name,
thread_name='audit')
thread_name='audit',
region_clients=["sysinv"])
if resource_type == consts.RESOURCE_TYPE_SYSINV_DNS:
return [self.get_dns_resource(os_client)]
elif resource_type == consts.RESOURCE_TYPE_SYSINV_CERTIFICATE:

View File

@@ -199,7 +199,6 @@ class SyncThread(object):
def enable(self):
# Called when DC manager thinks this subcloud is good to go.
self.initialize()
self.run_sync_audit()
def get_db_subcloud_resource(self, rsrc_id):