Update multiple endpoints in one DB transaction
In the case of a large number of subclouds go offline, all the endpoints of these subclouds need to update to unknown, currently we are looping over every endpoint to update its status, the number of total transactions for this operation is: Total = number_of_subclouds X number_of_endpoint_types The DB is throttled in this case, and it consumes a large amount of time for dcmanager to process the RPC calls to update offline subclouds. This commit introduces a DB API to update multiple endpoint statuses of a subcloud, which transitions to unkown state, in a single DB transaction. Test: - On the system controller, delete a large number of routes to the subclouds to simulate many subclouds going offline. - Time when dcmanager's message queue is cleared The dcmanager's message queue is cleared faster after the change than before the change. - On the system controller, unmanage a subcloud. All subcloud's endpoints change to unknown except the dc-cert endpoint. Story: 2008960 Task: 43377 Signed-off-by: Yuxing Jiang <yuxing.jiang@windriver.com> Change-Id: I941c1b47acc1f72ba54d879236632455c8bae628
This commit is contained in:
@@ -246,6 +246,14 @@ def subcloud_status_update(context, subcloud_id, endpoint_type, sync_status):
|
||||
sync_status)
|
||||
|
||||
|
||||
def subcloud_status_update_endpoints(context, subcloud_id,
|
||||
endpoint_type_list, sync_status):
|
||||
"""Update all statuses of the endpoints in endpoint_type_list of a subcloud."""
|
||||
|
||||
return IMPL.subcloud_status_update_endpoints(context, subcloud_id,
|
||||
endpoint_type_list, sync_status)
|
||||
|
||||
|
||||
def subcloud_status_destroy_all(context, subcloud_id):
|
||||
"""Destroy all the statuses for a subcloud
|
||||
|
||||
|
@@ -474,6 +474,26 @@ def subcloud_status_update(context, subcloud_id, endpoint_type, sync_status):
|
||||
return subcloud_status_ref
|
||||
|
||||
|
||||
@require_admin_context
|
||||
def subcloud_status_update_endpoints(context, subcloud_id,
|
||||
endpoint_type_list, sync_status):
|
||||
"""Update all statuses of endpoints in endpoint_type_list of a subcloud.
|
||||
|
||||
Will raise if subcloud status does not exist.
|
||||
"""
|
||||
value = {"sync_status": sync_status}
|
||||
with write_session() as session:
|
||||
result = session.query(models.SubcloudStatus). \
|
||||
filter_by(subcloud_id=subcloud_id). \
|
||||
filter(models.SubcloudStatus.endpoint_type.in_(endpoint_type_list)). \
|
||||
update(value, synchronize_session=False)
|
||||
if not result:
|
||||
raise exception.SubcloudStatusNotFound(subcloud_id=subcloud_id,
|
||||
endpoint_type="any")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@require_admin_context
|
||||
def subcloud_status_destroy_all(context, subcloud_id):
|
||||
with write_session() as session:
|
||||
|
@@ -1364,16 +1364,20 @@ class SubcloudManager(manager.Manager):
|
||||
"ignore_endpoints: %s" %
|
||||
(subcloud.name, sync_status, ignore_endpoints))
|
||||
|
||||
# TODO(yuxing): The following code can be further optimized when
|
||||
# batch alarm clearance APIs are available, so we don't need to
|
||||
# loop over all the endpoints of a given subcloud, e.g.
|
||||
# if not ignore_endpoints:
|
||||
# db_api.subcloud_status_update_endpoints_all(...)
|
||||
# else:
|
||||
# db_api.subcloud_status_update_endpoints(...)
|
||||
endpoint_to_update_list = []
|
||||
for entry in subcloud_status_list:
|
||||
endpoint = entry[consts.ENDPOINT_TYPE]
|
||||
if endpoint in ignore_endpoints:
|
||||
# Do not update this endpoint
|
||||
continue
|
||||
|
||||
db_api.subcloud_status_update(context,
|
||||
subcloud_id,
|
||||
endpoint,
|
||||
sync_status)
|
||||
endpoint_to_update_list.append(endpoint)
|
||||
|
||||
entity_instance_id = "subcloud=%s.resource=%s" % \
|
||||
(subcloud.name, endpoint)
|
||||
@@ -1382,6 +1386,8 @@ class SubcloudManager(manager.Manager):
|
||||
fm_const.FM_ALARM_ID_DC_SUBCLOUD_RESOURCE_OUT_OF_SYNC,
|
||||
entity_instance_id)
|
||||
|
||||
# TODO(yuxing): batch clear all the out-of-sync alarms of
|
||||
# a given subcloud if fm_api support it.
|
||||
if (sync_status != consts.SYNC_STATUS_OUT_OF_SYNC) \
|
||||
and fault:
|
||||
try:
|
||||
@@ -1415,6 +1421,14 @@ class SubcloudManager(manager.Manager):
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
|
||||
if endpoint_to_update_list:
|
||||
try:
|
||||
db_api.subcloud_status_update_endpoints(context, subcloud_id,
|
||||
endpoint_to_update_list,
|
||||
sync_status)
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
|
||||
else:
|
||||
LOG.error("Subcloud not found:%s" % subcloud_id)
|
||||
|
||||
|
@@ -326,6 +326,72 @@ class DBAPISubcloudTest(base.DCManagerTestCase):
|
||||
self.assertEqual(endpoint_type, updated_subcloud_status.endpoint_type)
|
||||
self.assertEqual(sync_status, updated_subcloud_status.sync_status)
|
||||
|
||||
def test_update_subcloud_status_endpoints(self):
|
||||
fake_subcloud = utils.create_subcloud_dict(base.SUBCLOUD_SAMPLE_DATA_0)
|
||||
subcloud = self.create_subcloud(self.ctx, fake_subcloud)
|
||||
self.assertIsNotNone(subcloud)
|
||||
|
||||
endpoint_type1 = 'testendpoint1'
|
||||
subcloud_status = self.create_subcloud_status(
|
||||
self.ctx, endpoint_type=endpoint_type1)
|
||||
self.assertIsNotNone(subcloud_status)
|
||||
|
||||
endpoint_type2 = 'testendpoint2'
|
||||
subcloud_status = self.create_subcloud_status(
|
||||
self.ctx, endpoint_type=endpoint_type2)
|
||||
self.assertIsNotNone(subcloud_status)
|
||||
|
||||
endpoint_type3 = 'testendpoint3'
|
||||
subcloud_status = self.create_subcloud_status(
|
||||
self.ctx, endpoint_type=endpoint_type3)
|
||||
self.assertIsNotNone(subcloud_status)
|
||||
|
||||
sync_status = consts.SYNC_STATUS_IN_SYNC
|
||||
endpoint_type_list = [endpoint_type1, endpoint_type2]
|
||||
db_api.subcloud_status_update_endpoints(self.ctx, subcloud.id,
|
||||
endpoint_type_list=endpoint_type_list,
|
||||
sync_status=sync_status)
|
||||
|
||||
updated_endpoint1_status = db_api.subcloud_status_get(self.ctx,
|
||||
subcloud.id,
|
||||
endpoint_type1)
|
||||
self.assertIsNotNone(updated_endpoint1_status)
|
||||
self.assertEqual(endpoint_type1, updated_endpoint1_status.endpoint_type)
|
||||
self.assertEqual(sync_status, updated_endpoint1_status.sync_status)
|
||||
|
||||
updated_endpoint2_status = db_api.subcloud_status_get(self.ctx,
|
||||
subcloud.id,
|
||||
endpoint_type2)
|
||||
self.assertIsNotNone(updated_endpoint2_status)
|
||||
self.assertEqual(endpoint_type2, updated_endpoint2_status.endpoint_type)
|
||||
self.assertEqual(sync_status, updated_endpoint2_status.sync_status)
|
||||
|
||||
updated_endpoint3_status = db_api.subcloud_status_get(self.ctx,
|
||||
subcloud.id,
|
||||
endpoint_type3)
|
||||
self.assertIsNotNone(updated_endpoint3_status)
|
||||
self.assertEqual(endpoint_type3, updated_endpoint3_status.endpoint_type)
|
||||
self.assertNotEqual(sync_status, updated_endpoint3_status.sync_status)
|
||||
|
||||
def test_update_subcloud_status_endpints_not_exists(self):
|
||||
fake_subcloud = utils.create_subcloud_dict(base.SUBCLOUD_SAMPLE_DATA_0)
|
||||
subcloud = self.create_subcloud(self.ctx, fake_subcloud)
|
||||
self.assertIsNotNone(subcloud)
|
||||
|
||||
endpoint_type1 = 'testendpoint1'
|
||||
subcloud_status = self.create_subcloud_status(
|
||||
self.ctx, endpoint_type=endpoint_type1)
|
||||
self.assertIsNotNone(subcloud_status)
|
||||
|
||||
endpoint_type2 = 'testendpoint2'
|
||||
|
||||
sync_status = consts.SYNC_STATUS_IN_SYNC
|
||||
endpoint_type_list = [endpoint_type2]
|
||||
self.assertRaises(exceptions.SubcloudStatusNotFound,
|
||||
db_api.subcloud_status_update_endpoints,
|
||||
self.ctx, subcloud.id,
|
||||
endpoint_type_list, sync_status)
|
||||
|
||||
def test_delete_subcloud_status(self):
|
||||
fake_subcloud = utils.create_subcloud_dict(base.SUBCLOUD_SAMPLE_DATA_0)
|
||||
subcloud = self.create_subcloud(self.ctx, fake_subcloud)
|
||||
|
Reference in New Issue
Block a user