Files
distcloud/distributedcloud/dcmanager/tests/unit/db/test_subcloud_db_api.py
Yuxing Jiang 7f054369a2 Update multiple endpoints in one DB transaction
In the case of a large number of subclouds go offline, all the
endpoints of these subclouds need to update to unknown, currently we
are looping over every endpoint to update its status, the number of
total transactions for this operation is:
Total = number_of_subclouds X number_of_endpoint_types
The DB is throttled in this case, and it consumes a large amount of
time for dcmanager to process the RPC calls to update offline subclouds.

This commit introduces a DB API to update multiple endpoint statuses of
a subcloud, which transitions to unkown state, in a single DB
transaction.

Test:
 - On the system controller, delete a large number of routes to the
subclouds to simulate many subclouds going offline.
 - Time when dcmanager's message queue is cleared
The dcmanager's message queue is cleared faster after the change than
before the change.

 - On the system controller, unmanage a subcloud.
All subcloud's endpoints change to unknown except the dc-cert endpoint.

Story: 2008960
Task: 43377

Signed-off-by: Yuxing Jiang <yuxing.jiang@windriver.com>
Change-Id: I941c1b47acc1f72ba54d879236632455c8bae628
2021-09-22 14:22:18 -04:00

651 lines
27 KiB
Python

# Copyright (c) 2015 Ericsson AB
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2017 Wind River Systems, Inc.
#
# The right to copy, distribute, modify, or otherwise make use
# of this software may be licensed only pursuant to the terms
# of an applicable Wind River license agreement.
#
import sqlalchemy
from oslo_config import cfg
from oslo_db import exception as db_exception
from oslo_db import options
from dcmanager.common import config
from dcmanager.common import consts
from dcmanager.common import exceptions
from dcmanager.db import api as api
from dcmanager.db.sqlalchemy import api as db_api
from dcmanager.tests import base
from dcmanager.tests import utils
config.register_options()
get_engine = api.get_engine
# Enable foreign key support in sqlite - see:
# http://docs.sqlalchemy.org/en/latest/dialects/sqlite.html
from sqlalchemy.engine import Engine
from sqlalchemy import event
@event.listens_for(Engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
class DBAPISubcloudTest(base.DCManagerTestCase):
def setup_dummy_db(self):
options.cfg.set_defaults(options.database_opts,
sqlite_synchronous=False)
options.set_defaults(cfg.CONF, connection="sqlite://")
engine = get_engine()
db_api.db_sync(engine)
@staticmethod
def reset_dummy_db():
engine = get_engine()
meta = sqlalchemy.MetaData()
meta.reflect(bind=engine)
for table in reversed(meta.sorted_tables):
if table.name == 'migrate_version':
continue
engine.execute(table.delete())
@staticmethod
def create_subcloud_static(ctxt, **kwargs):
values = {
'name': "subcloud1",
'description': "This is a subcloud",
'location': "This is the location of the subcloud",
'software_version': "10.04",
'management_subnet': "192.168.101.0/24",
'management_gateway_ip': "192.168.101.1",
'management_start_ip': "192.168.101.2",
'management_end_ip': "192.168.101.50",
'systemcontroller_gateway_ip': "192.168.204.101",
'deploy_status': "not-deployed",
'openstack_installed': False,
'group_id': 1,
}
values.update(kwargs)
return db_api.subcloud_create(ctxt, **values)
@staticmethod
def create_subcloud(ctxt, data):
values = {
'name': data['name'],
'description': data['description'],
'location': data['location'],
'software_version': data['software-version'],
'management_subnet': data['management_subnet'],
'management_gateway_ip': data['management_gateway_address'],
'management_start_ip': data['management_start_address'],
'management_end_ip': data['management_end_address'],
'systemcontroller_gateway_ip': data[
'systemcontroller_gateway_address'],
'deploy_status': "not-deployed",
'openstack_installed': False,
'group_id': 1,
}
return db_api.subcloud_create(ctxt, **values)
@staticmethod
def create_subcloud_status(ctxt, **kwargs):
values = {
'subcloud_id': 1,
'endpoint_type': "sysinv",
}
values.update(kwargs)
return db_api.subcloud_status_create(ctxt, **values)
@staticmethod
def create_sw_update_strategy(ctxt, **kwargs):
values = {
'type': consts.SW_UPDATE_TYPE_PATCH,
'state': consts.SW_UPDATE_STATE_INITIAL,
'subcloud_apply_type': consts.SUBCLOUD_APPLY_TYPE_PARALLEL,
'max_parallel_subclouds': 10,
'stop_on_failure': True,
}
values.update(kwargs)
return db_api.sw_update_strategy_create(ctxt, **values)
@staticmethod
def create_strategy_step(ctxt, **kwargs):
values = {
'subcloud_id': 1,
'stage': 1,
'state': consts.STRATEGY_STATE_INITIAL,
'details': "The details"
}
values.update(kwargs)
return db_api.strategy_step_create(ctxt, **values)
def setUp(self):
super(DBAPISubcloudTest, self).setUp()
self.setup_dummy_db()
self.addCleanup(self.reset_dummy_db)
self.ctx = utils.dummy_context()
def test_create_subcloud(self):
fake_subcloud = utils.create_subcloud_dict(base.SUBCLOUD_SAMPLE_DATA_0)
name = fake_subcloud['name']
subcloud = self.create_subcloud(self.ctx, fake_subcloud)
self.assertIsNotNone(subcloud)
new_subcloud = db_api.subcloud_get(self.ctx, subcloud.id)
self.assertIsNotNone(new_subcloud)
self.assertEqual(name, new_subcloud.name)
def test_create_subcloud_duplicate_name(self):
fake_subcloud = utils.create_subcloud_dict(base.SUBCLOUD_SAMPLE_DATA_0)
subcloud = self.create_subcloud(self.ctx, fake_subcloud)
self.assertIsNotNone(subcloud)
fake_subcloud2 = utils.create_subcloud_dict(
base.SUBCLOUD_SAMPLE_DATA_0)
fake_subcloud2['management-start-ip'] = "2.3.4.6"
fake_subcloud2['management-end-ip'] = "2.3.4.7"
self.assertRaises(db_exception.DBDuplicateEntry,
self.create_subcloud,
self.ctx, fake_subcloud2)
def test_create_multiple_subclouds(self):
name1 = 'testname1'
name2 = 'testname2'
name3 = 'testname3'
subcloud = self.create_subcloud_static(self.ctx, name=name1)
self.assertIsNotNone(subcloud)
subcloud2 = self.create_subcloud_static(self.ctx,
name=name2,
management_start_ip="2.3.4.6",
management_end_ip="2.3.4.7")
self.assertIsNotNone(subcloud2)
subcloud3 = self.create_subcloud_static(self.ctx,
name=name3,
management_start_ip="3.3.4.6",
management_end_ip="3.3.4.7")
self.assertIsNotNone(subcloud3)
new_subclouds = db_api.subcloud_get_all(self.ctx)
self.assertIsNotNone(new_subclouds)
self.assertEqual(3, len(new_subclouds))
self.assertEqual(name1, new_subclouds[0].name)
self.assertEqual(1, new_subclouds[0].id)
self.assertEqual(name2, new_subclouds[1].name)
self.assertEqual(2, new_subclouds[1].id)
self.assertEqual(name3, new_subclouds[2].name)
self.assertEqual(3, new_subclouds[2].id)
def test_update_subcloud(self):
fake_subcloud = utils.create_subcloud_dict(base.SUBCLOUD_SAMPLE_DATA_0)
subcloud = self.create_subcloud(self.ctx, fake_subcloud)
self.assertIsNotNone(subcloud)
management_state = 'testmanagementstate'
availability_status = 'testavailabilitystatus'
software_version = 'testversion'
updated = db_api.subcloud_update(
self.ctx, subcloud.id,
management_state=management_state,
availability_status=availability_status,
software_version=software_version)
self.assertIsNotNone(updated)
self.assertEqual(management_state, updated.management_state)
self.assertEqual(availability_status, updated.availability_status)
self.assertEqual(software_version, updated.software_version)
updated_subcloud = db_api.subcloud_get(self.ctx, subcloud.id)
self.assertEqual(management_state,
updated_subcloud.management_state)
self.assertEqual(availability_status,
updated_subcloud.availability_status)
self.assertEqual(software_version,
updated_subcloud.software_version)
def test_delete_subcloud(self):
fake_subcloud = utils.create_subcloud_dict(base.SUBCLOUD_SAMPLE_DATA_0)
subcloud = self.create_subcloud(self.ctx, fake_subcloud)
self.assertIsNotNone(subcloud)
db_api.subcloud_destroy(self.ctx, subcloud.id)
self.assertRaises(exceptions.SubcloudNotFound,
db_api.subcloud_get,
self.ctx, subcloud.id)
def test_subcloud_get_by_name(self):
fake_subcloud = utils.create_subcloud_dict(base.SUBCLOUD_SAMPLE_DATA_0)
name = fake_subcloud['name']
subcloud = self.create_subcloud(self.ctx, fake_subcloud)
self.assertIsNotNone(subcloud)
by_name = db_api.subcloud_get_by_name(self.ctx, name)
self.assertIsNotNone(by_name)
self.assertEqual(name, by_name.name)
def test_subcloud_get_by_non_existing_name(self):
name = 'testname'
self.assertRaises(exceptions.SubcloudNameNotFound,
db_api.subcloud_get_by_name,
self.ctx, name)
def test_create_subcloud_status(self):
fake_subcloud = utils.create_subcloud_dict(base.SUBCLOUD_SAMPLE_DATA_0)
subcloud = self.create_subcloud(self.ctx, fake_subcloud)
self.assertIsNotNone(subcloud)
endpoint_type = 'testendpoint'
subcloud_status = self.create_subcloud_status(
self.ctx, endpoint_type=endpoint_type)
self.assertIsNotNone(subcloud_status)
new_subcloud_status = db_api.subcloud_status_get(self.ctx,
subcloud.id,
endpoint_type)
self.assertIsNotNone(new_subcloud_status)
self.assertEqual(endpoint_type, new_subcloud_status.endpoint_type)
self.assertEqual(consts.SYNC_STATUS_UNKNOWN,
new_subcloud_status.sync_status)
def test_create_multiple_subcloud_statuses(self):
fake_subcloud = utils.create_subcloud_dict(base.SUBCLOUD_SAMPLE_DATA_0)
subcloud = self.create_subcloud(self.ctx, fake_subcloud)
self.assertIsNotNone(subcloud)
endpoint_type1 = 'testendpoint1'
subcloud_status1 = self.create_subcloud_status(
self.ctx, endpoint_type=endpoint_type1)
self.assertIsNotNone(subcloud_status1)
endpoint_type2 = 'testendpoint2'
subcloud_status2 = self.create_subcloud_status(
self.ctx, endpoint_type=endpoint_type2)
self.assertIsNotNone(subcloud_status2)
endpoint_type3 = 'testendpoint3'
subcloud_status3 = self.create_subcloud_status(
self.ctx, endpoint_type=endpoint_type3)
self.assertIsNotNone(subcloud_status3)
new_subcloud_statuses = db_api.subcloud_status_get_all(self.ctx,
subcloud.id)
self.assertIsNotNone(new_subcloud_statuses)
self.assertEqual(3, len(new_subcloud_statuses))
self.assertEqual(endpoint_type1,
new_subcloud_statuses[0].endpoint_type)
self.assertEqual(1, new_subcloud_statuses[0].id)
self.assertEqual(endpoint_type2,
new_subcloud_statuses[1].endpoint_type)
self.assertEqual(2, new_subcloud_statuses[1].id)
self.assertEqual(endpoint_type3,
new_subcloud_statuses[2].endpoint_type)
self.assertEqual(3, new_subcloud_statuses[2].id)
def test_update_subcloud_status(self):
fake_subcloud = utils.create_subcloud_dict(base.SUBCLOUD_SAMPLE_DATA_0)
subcloud = self.create_subcloud(self.ctx, fake_subcloud)
self.assertIsNotNone(subcloud)
endpoint_type = 'testendpoint'
subcloud_status = self.create_subcloud_status(
self.ctx, endpoint_type=endpoint_type)
self.assertIsNotNone(subcloud_status)
sync_status = consts.SYNC_STATUS_IN_SYNC
updated = db_api.subcloud_status_update(self.ctx, subcloud.id,
endpoint_type=endpoint_type,
sync_status=sync_status)
self.assertIsNotNone(updated)
self.assertEqual(sync_status, updated.sync_status)
updated_subcloud_status = db_api.subcloud_status_get(self.ctx,
subcloud.id,
endpoint_type)
self.assertIsNotNone(updated_subcloud_status)
self.assertEqual(endpoint_type, updated_subcloud_status.endpoint_type)
self.assertEqual(sync_status, updated_subcloud_status.sync_status)
def test_update_subcloud_status_endpoints(self):
fake_subcloud = utils.create_subcloud_dict(base.SUBCLOUD_SAMPLE_DATA_0)
subcloud = self.create_subcloud(self.ctx, fake_subcloud)
self.assertIsNotNone(subcloud)
endpoint_type1 = 'testendpoint1'
subcloud_status = self.create_subcloud_status(
self.ctx, endpoint_type=endpoint_type1)
self.assertIsNotNone(subcloud_status)
endpoint_type2 = 'testendpoint2'
subcloud_status = self.create_subcloud_status(
self.ctx, endpoint_type=endpoint_type2)
self.assertIsNotNone(subcloud_status)
endpoint_type3 = 'testendpoint3'
subcloud_status = self.create_subcloud_status(
self.ctx, endpoint_type=endpoint_type3)
self.assertIsNotNone(subcloud_status)
sync_status = consts.SYNC_STATUS_IN_SYNC
endpoint_type_list = [endpoint_type1, endpoint_type2]
db_api.subcloud_status_update_endpoints(self.ctx, subcloud.id,
endpoint_type_list=endpoint_type_list,
sync_status=sync_status)
updated_endpoint1_status = db_api.subcloud_status_get(self.ctx,
subcloud.id,
endpoint_type1)
self.assertIsNotNone(updated_endpoint1_status)
self.assertEqual(endpoint_type1, updated_endpoint1_status.endpoint_type)
self.assertEqual(sync_status, updated_endpoint1_status.sync_status)
updated_endpoint2_status = db_api.subcloud_status_get(self.ctx,
subcloud.id,
endpoint_type2)
self.assertIsNotNone(updated_endpoint2_status)
self.assertEqual(endpoint_type2, updated_endpoint2_status.endpoint_type)
self.assertEqual(sync_status, updated_endpoint2_status.sync_status)
updated_endpoint3_status = db_api.subcloud_status_get(self.ctx,
subcloud.id,
endpoint_type3)
self.assertIsNotNone(updated_endpoint3_status)
self.assertEqual(endpoint_type3, updated_endpoint3_status.endpoint_type)
self.assertNotEqual(sync_status, updated_endpoint3_status.sync_status)
def test_update_subcloud_status_endpints_not_exists(self):
fake_subcloud = utils.create_subcloud_dict(base.SUBCLOUD_SAMPLE_DATA_0)
subcloud = self.create_subcloud(self.ctx, fake_subcloud)
self.assertIsNotNone(subcloud)
endpoint_type1 = 'testendpoint1'
subcloud_status = self.create_subcloud_status(
self.ctx, endpoint_type=endpoint_type1)
self.assertIsNotNone(subcloud_status)
endpoint_type2 = 'testendpoint2'
sync_status = consts.SYNC_STATUS_IN_SYNC
endpoint_type_list = [endpoint_type2]
self.assertRaises(exceptions.SubcloudStatusNotFound,
db_api.subcloud_status_update_endpoints,
self.ctx, subcloud.id,
endpoint_type_list, sync_status)
def test_delete_subcloud_status(self):
fake_subcloud = utils.create_subcloud_dict(base.SUBCLOUD_SAMPLE_DATA_0)
subcloud = self.create_subcloud(self.ctx, fake_subcloud)
self.assertIsNotNone(subcloud)
endpoint_type = 'testendpoint'
subcloud_status = self.create_subcloud_status(
self.ctx, endpoint_type=endpoint_type)
self.assertIsNotNone(subcloud_status)
db_api.subcloud_status_destroy_all(self.ctx, subcloud.id)
self.assertRaises(exceptions.SubcloudStatusNotFound,
db_api.subcloud_status_get,
self.ctx, subcloud.id, endpoint_type)
def test_cascade_delete_subcloud_status(self):
fake_subcloud = utils.create_subcloud_dict(base.SUBCLOUD_SAMPLE_DATA_0)
subcloud = self.create_subcloud(self.ctx, fake_subcloud)
self.assertIsNotNone(subcloud)
endpoint_type = 'testendpoint'
subcloud_status = self.create_subcloud_status(
self.ctx, endpoint_type=endpoint_type)
self.assertIsNotNone(subcloud_status)
db_api.subcloud_destroy(self.ctx, subcloud.id)
self.assertRaises(exceptions.SubcloudNotFound,
db_api.subcloud_get,
self.ctx, subcloud.id)
self.assertRaises(exceptions.SubcloudStatusNotFound,
db_api.subcloud_status_get,
self.ctx, subcloud.id, endpoint_type)
def test_subcloud_status_get_all_by_name(self):
fake_subcloud = utils.create_subcloud_dict(base.SUBCLOUD_SAMPLE_DATA_0)
name = fake_subcloud['name']
subcloud = self.create_subcloud(self.ctx, fake_subcloud)
self.assertIsNotNone(subcloud)
endpoint_type1 = 'testendpoint1'
subcloud_status1 = self.create_subcloud_status(
self.ctx, endpoint_type=endpoint_type1)
self.assertIsNotNone(subcloud_status1)
endpoint_type2 = 'testendpoint2'
subcloud_status2 = self.create_subcloud_status(
self.ctx, endpoint_type=endpoint_type2)
self.assertIsNotNone(subcloud_status2)
endpoint_type3 = 'testendpoint3'
subcloud_status3 = self.create_subcloud_status(
self.ctx, endpoint_type=endpoint_type3)
self.assertIsNotNone(subcloud_status3)
new_subcloud_statuses = db_api.subcloud_status_get_all_by_name(
self.ctx, name)
self.assertIsNotNone(new_subcloud_statuses)
self.assertEqual(3, len(new_subcloud_statuses))
self.assertEqual(endpoint_type1,
new_subcloud_statuses[0].endpoint_type)
self.assertEqual(1, new_subcloud_statuses[0].id)
self.assertEqual(endpoint_type2,
new_subcloud_statuses[1].endpoint_type)
self.assertEqual(2, new_subcloud_statuses[1].id)
self.assertEqual(endpoint_type3,
new_subcloud_statuses[2].endpoint_type)
self.assertEqual(3, new_subcloud_statuses[2].id)
def test_subcloud_status_get_all_by_non_existing_name(self):
fake_subcloud = utils.create_subcloud_dict(base.SUBCLOUD_SAMPLE_DATA_0)
subcloud = self.create_subcloud(self.ctx, fake_subcloud)
self.assertIsNotNone(subcloud)
endpoint_type1 = 'testendpoint1'
subcloud_status1 = self.create_subcloud_status(
self.ctx, endpoint_type=endpoint_type1)
self.assertIsNotNone(subcloud_status1)
new_subcloud_statuses = db_api.subcloud_status_get_all_by_name(
self.ctx, 'thisnameisnotknown')
self.assertEqual([], new_subcloud_statuses)
def test_create_sw_update_strategy(self):
sw_update_strategy = self.create_sw_update_strategy(
self.ctx,
type=consts.SW_UPDATE_TYPE_UPGRADE,
subcloud_apply_type=consts.SUBCLOUD_APPLY_TYPE_SERIAL,
max_parallel_subclouds=42,
stop_on_failure=False,
state=consts.SW_UPDATE_STATE_APPLYING
)
self.assertIsNotNone(sw_update_strategy)
new_sw_update_strategy = db_api.sw_update_strategy_get(self.ctx)
self.assertIsNotNone(new_sw_update_strategy)
self.assertEqual(consts.SW_UPDATE_TYPE_UPGRADE,
new_sw_update_strategy.type)
self.assertEqual(consts.SUBCLOUD_APPLY_TYPE_SERIAL,
new_sw_update_strategy.subcloud_apply_type)
self.assertEqual(42, new_sw_update_strategy.max_parallel_subclouds)
self.assertEqual(False, new_sw_update_strategy.stop_on_failure)
self.assertEqual(consts.SW_UPDATE_STATE_APPLYING,
new_sw_update_strategy.state)
def test_create_sw_update_strategy_duplicate(self):
sw_update_strategy = self.create_sw_update_strategy(self.ctx)
self.assertIsNotNone(sw_update_strategy)
self.assertRaises(db_exception.DBDuplicateEntry,
self.create_sw_update_strategy,
self.ctx)
def test_update_sw_update_strategy(self):
sw_update_strategy = self.create_sw_update_strategy(self.ctx)
self.assertIsNotNone(sw_update_strategy)
state = consts.SW_UPDATE_STATE_APPLYING
updated = db_api.sw_update_strategy_update(self.ctx, state=state)
self.assertIsNotNone(updated)
self.assertEqual(state, updated.state)
updated_sw_update_strategy = db_api.sw_update_strategy_get(self.ctx)
self.assertEqual(state, updated_sw_update_strategy.state)
def test_delete_sw_update_strategy(self):
sw_update_strategy = self.create_sw_update_strategy(self.ctx)
self.assertIsNotNone(sw_update_strategy)
db_api.sw_update_strategy_destroy(self.ctx)
self.assertRaises(exceptions.NotFound,
db_api.sw_update_strategy_get,
self.ctx)
def test_create_strategy_step(self):
name = 'testname'
subcloud = self.create_subcloud_static(self.ctx, name=name)
self.assertIsNotNone(subcloud)
strategy_step = self.create_strategy_step(
self.ctx, stage=1, details="Bart was here")
self.assertIsNotNone(strategy_step)
new_strategy_step = db_api.strategy_step_get(self.ctx,
subcloud.id)
self.assertIsNotNone(new_strategy_step)
self.assertEqual(1, new_strategy_step.stage)
self.assertEqual(consts.STRATEGY_STATE_INITIAL,
new_strategy_step.state)
self.assertEqual("Bart was here", new_strategy_step.details)
new_strategy_step = db_api.strategy_step_get_by_name(self.ctx,
subcloud.name)
self.assertIsNotNone(new_strategy_step)
self.assertEqual(1, new_strategy_step.stage)
self.assertEqual(consts.STRATEGY_STATE_INITIAL,
new_strategy_step.state)
self.assertEqual("Bart was here", new_strategy_step.details)
def test_strategy_step_get_all(self):
subcloud1 = self.create_subcloud_static(self.ctx,
name='subcloud one')
self.assertIsNotNone(subcloud1)
subcloud2 = self.create_subcloud_static(self.ctx,
name='subcloud two')
self.assertIsNotNone(subcloud2)
subcloud3 = self.create_subcloud_static(self.ctx,
name='subcloud three')
self.assertIsNotNone(subcloud3)
strategy_step_stage1 = self.create_strategy_step(
self.ctx, subcloud_id=1, stage=1)
self.assertIsNotNone(strategy_step_stage1)
strategy_step_stage2 = self.create_strategy_step(
self.ctx, subcloud_id=2, stage=2)
self.assertIsNotNone(strategy_step_stage2)
strategy_step_stage3 = self.create_strategy_step(
self.ctx, subcloud_id=3, stage=2)
self.assertIsNotNone(strategy_step_stage3)
new_strategy = db_api.strategy_step_get_all(self.ctx)
self.assertIsNotNone(new_strategy)
self.assertEqual(3, len(new_strategy))
self.assertEqual(1, new_strategy[0].id)
self.assertEqual(1, new_strategy[0].stage)
self.assertEqual('subcloud one', new_strategy[0].subcloud.name)
self.assertEqual(2, new_strategy[1].id)
self.assertEqual(2, new_strategy[1].stage)
self.assertEqual('subcloud two', new_strategy[1].subcloud.name)
self.assertEqual(3, new_strategy[2].id)
self.assertEqual(2, new_strategy[2].stage)
self.assertEqual('subcloud three', new_strategy[2].subcloud.name)
def test_update_strategy_step(self):
name = 'testname'
subcloud = self.create_subcloud_static(self.ctx, name=name)
self.assertIsNotNone(subcloud)
strategy_step = self.create_strategy_step(
self.ctx, stage=1, details="Bart was here")
self.assertIsNotNone(strategy_step)
updated = db_api.strategy_step_update(
self.ctx,
subcloud.id,
stage=2,
state=consts.STRATEGY_STATE_COMPLETE,
details="New details"
)
self.assertIsNotNone(updated)
self.assertEqual(2, updated.stage)
self.assertEqual(consts.STRATEGY_STATE_COMPLETE, updated.state)
self.assertEqual("New details", updated.details)
updated_strategy_step = db_api.strategy_step_get(self.ctx,
subcloud.id)
self.assertIsNotNone(updated_strategy_step)
self.assertEqual(2, updated_strategy_step.stage)
self.assertEqual(consts.STRATEGY_STATE_COMPLETE,
updated_strategy_step.state)
self.assertEqual("New details", updated_strategy_step.details)
def test_delete_strategy_step(self):
name = 'testname'
subcloud = self.create_subcloud_static(self.ctx, name=name)
self.assertIsNotNone(subcloud)
strategy_step = self.create_strategy_step(
self.ctx, stage=1, details="Bart was here")
self.assertIsNotNone(strategy_step)
db_api.strategy_step_destroy_all(self.ctx)
new_strategy = db_api.strategy_step_get_all(self.ctx)
self.assertEqual([], new_strategy)
def test_cascade_delete_strategy_step(self):
name = 'testname'
subcloud = self.create_subcloud_static(self.ctx, name=name)
self.assertIsNotNone(subcloud)
strategy_step = self.create_strategy_step(
self.ctx, stage=1, details="Bart was here")
self.assertIsNotNone(strategy_step)
db_api.subcloud_destroy(self.ctx, subcloud.id)
self.assertRaises(exceptions.SubcloudNotFound,
db_api.subcloud_get,
self.ctx, subcloud.id)
self.assertRaises(exceptions.StrategyStepNotFound,
db_api.strategy_step_get,
self.ctx, subcloud.id)