diff --git a/releasenotes/notes/cluster_restart-bb5abb7372131ee0.yaml b/releasenotes/notes/cluster_restart-bb5abb7372131ee0.yaml new file mode 100644 index 0000000000..5cbe649d83 --- /dev/null +++ b/releasenotes/notes/cluster_restart-bb5abb7372131ee0.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + Add support for cluster restart. diff --git a/trove/cluster/models.py b/trove/cluster/models.py index 82ed5fa559..69a80f062d 100644 --- a/trove/cluster/models.py +++ b/trove/cluster/models.py @@ -21,8 +21,9 @@ from trove.cluster.tasks import ClusterTasks from trove.common import cfg from trove.common import exception from trove.common.i18n import _ -from trove.common.notification import DBaaSClusterGrow, DBaaSClusterShrink -from trove.common.notification import DBaaSClusterResetStatus +from trove.common.notification import (DBaaSClusterGrow, DBaaSClusterShrink, + DBaaSClusterResetStatus, + DBaaSClusterRestart) from trove.common.notification import DBaaSClusterUpgrade from trove.common.notification import StartNotification from trove.common import remote @@ -316,6 +317,11 @@ class Cluster(object): with StartNotification(context, cluster_id=self.id): return self.reset_status() + elif action == 'restart': + context.notification = DBaaSClusterRestart(context, request=req) + with StartNotification(context, cluster_id=self.id): + return self.restart() + elif action == 'upgrade': context.notification = DBaaSClusterUpgrade(context, request=req) dv_id = param['datastore_version'] @@ -332,8 +338,43 @@ class Cluster(object): def shrink(self, instance_ids): raise exception.BadRequest(_("Action 'shrink' not supported")) + def rolling_restart(self): + self.validate_cluster_available() + self.db_info.update(task_status=ClusterTasks.RESTARTING_CLUSTER) + try: + cluster_id = self.db_info.id + task_api.load(self.context, self.ds_version.manager + ).restart_cluster(cluster_id) + except Exception: + self.db_info.update(task_status=ClusterTasks.NONE) + raise + + return self.__class__(self.context, self.db_info, + self.ds, self.ds_version) + + def rolling_upgrade(self, datastore_version): + """Upgrades a cluster to a new datastore version.""" + LOG.debug("Upgrading cluster %s." % self.id) + + self.validate_cluster_available() + self.db_info.update(task_status=ClusterTasks.UPGRADING_CLUSTER) + try: + cluster_id = self.db_info.id + ds_ver_id = datastore_version.id + task_api.load(self.context, self.ds_version.manager + ).upgrade_cluster(cluster_id, ds_ver_id) + except Exception: + self.db_info.update(task_status=ClusterTasks.NONE) + raise + + return self.__class__(self.context, self.db_info, + self.ds, self.ds_version) + + def restart(self): + raise exception.BadRequest(_("Action 'restart' not supported")) + def upgrade(self, datastore_version): - raise exception.BadRequest(_("Action 'upgrade' not supported")) + raise exception.BadRequest(_("Action 'upgrade' not supported")) @staticmethod def load_instance(context, cluster_id, instance_id): diff --git a/trove/cluster/tasks.py b/trove/cluster/tasks.py index b91fc1a5fb..44d6d30118 100644 --- a/trove/cluster/tasks.py +++ b/trove/cluster/tasks.py @@ -69,6 +69,10 @@ class ClusterTasks(object): 0x05, 'GROWING_CLUSTER', 'Increasing the size of the cluster.') SHRINKING_CLUSTER = ClusterTask( 0x06, 'SHRINKING_CLUSTER', 'Decreasing the size of the cluster.') + UPGRADING_CLUSTER = ClusterTask( + 0x07, 'UPGRADING_CLUSTER', 'Upgrading the cluster to new version.') + RESTARTING_CLUSTER = ClusterTask( + 0x08, 'RESTARTING_CLUSTER', 'Restarting the cluster.') # Dissuade further additions at run-time. diff --git a/trove/common/cfg.py b/trove/common/cfg.py index 2005165071..0fb0feb9aa 100644 --- a/trove/common/cfg.py +++ b/trove/common/cfg.py @@ -938,6 +938,26 @@ cassandra_opts = [ help='Character length of generated passwords.', deprecated_name='default_password_length', deprecated_group='DEFAULT'), + cfg.BoolOpt('enable_cluster_instance_backup', + default=False, + help='Allows backup of single instance in the cluster.'), + cfg.BoolOpt('enable_saslauthd', default=False, + help='Enable the saslauth daemon.'), + cfg.StrOpt('user_controller', + default='trove.extensions.cassandra.service.' + 'CassandraUserController', + help='User controller implementation.'), + cfg.StrOpt('database_controller', + default='trove.extensions.cassandra.service.' + 'CassandraDatabaseController', + help='Database controller implementation.'), + cfg.StrOpt('user_access_controller', + default='trove.extensions.cassandra.service.' + 'CassandraUserAccessController', + help='User access controller implementation.'), + cfg.IntOpt('node_sync_time', default=60, + help='Time (in seconds) given to a node after a state change ' + 'to finish rejoining the cluster.'), ] # Couchbase diff --git a/trove/common/notification.py b/trove/common/notification.py index c23923a27c..e4e6d4e7b6 100644 --- a/trove/common/notification.py +++ b/trove/common/notification.py @@ -368,7 +368,7 @@ class DBaaSAPINotification(object): }) elif 'request_id' not in kwargs: raise TroveError(_("Notification %s must include 'request'" - " property") % self.__class__.__name__) + " property") % self.__class__.__name__) self.payload.update(kwargs) @@ -385,7 +385,7 @@ class DBaaSAPINotification(object): 'keys': list(required_keys - provided_keys)}) if 'server_type' not in self.payload: raise TroveError(_("Notification %s must include a" - " 'server_type' for correct routing") + " 'server_type' for correct routing") % self.__class__.__name__) def _notify(self, event_qualifier, required_traits, optional_traits, @@ -564,6 +564,15 @@ class DBaaSClusterCreate(DBaaSAPINotification): return ['cluster_id'] +class DBaaSClusterRestart(DBaaSAPINotification): + + def event_type(self): + return 'cluster_restart' + + def required_start_traits(self): + return ['cluster_id'] + + class DBaaSClusterUpgrade(DBaaSAPINotification): @abc.abstractmethod diff --git a/trove/common/strategies/cluster/experimental/cassandra/api.py b/trove/common/strategies/cluster/experimental/cassandra/api.py index 41c3f229c5..ab680b34ed 100644 --- a/trove/common/strategies/cluster/experimental/cassandra/api.py +++ b/trove/common/strategies/cluster/experimental/cassandra/api.py @@ -206,6 +206,12 @@ class CassandraCluster(models.Cluster): return CassandraCluster(context, db_info, datastore, datastore_version) + def restart(self): + self.rolling_restart() + + def upgrade(self, datastore_version): + self.rolling_upgrade(datastore_version) + class CassandraClusterView(ClusterView): diff --git a/trove/common/strategies/cluster/experimental/cassandra/taskmanager.py b/trove/common/strategies/cluster/experimental/cassandra/taskmanager.py index 6e89d2bf4d..d074dc5230 100644 --- a/trove/common/strategies/cluster/experimental/cassandra/taskmanager.py +++ b/trove/common/strategies/cluster/experimental/cassandra/taskmanager.py @@ -341,6 +341,13 @@ class CassandraClusterTasks(task_models.ClusterTasks): LOG.debug("End shrink_cluster for id: %s." % cluster_id) + def restart_cluster(self, context, cluster_id): + self.rolling_restart_cluster( + context, cluster_id, delay_sec=CONF.cassandra.node_sync_time) + + def upgrade_cluster(self, context, cluster_id, datastore_version): + self.rolling_upgrade_cluster(context, cluster_id, datastore_version) + class CassandraTaskManagerAPI(task_api.API): pass diff --git a/trove/common/strategies/cluster/experimental/galera_common/api.py b/trove/common/strategies/cluster/experimental/galera_common/api.py index edaf9a3861..9b47291f1b 100644 --- a/trove/common/strategies/cluster/experimental/galera_common/api.py +++ b/trove/common/strategies/cluster/experimental/galera_common/api.py @@ -197,6 +197,12 @@ class GaleraCommonCluster(cluster_models.Cluster): return self.__class__(self.context, self.db_info, self.ds, self.ds_version) + def restart(self): + self.rolling_restart() + + def upgrade(self, datastore_version): + self.rolling_upgrade(datastore_version) + class GaleraCommonClusterView(ClusterView): diff --git a/trove/common/strategies/cluster/experimental/galera_common/taskmanager.py b/trove/common/strategies/cluster/experimental/galera_common/taskmanager.py index 7374538d4d..870f303d20 100644 --- a/trove/common/strategies/cluster/experimental/galera_common/taskmanager.py +++ b/trove/common/strategies/cluster/experimental/galera_common/taskmanager.py @@ -325,3 +325,9 @@ class GaleraCommonClusterTasks(task_models.ClusterTasks): timeout.cancel() LOG.debug("End shrink_cluster for id: %s." % cluster_id) + + def restart_cluster(self, context, cluster_id): + self.rolling_restart_cluster(context, cluster_id) + + def upgrade_cluster(self, context, cluster_id, datastore_version): + self.rolling_upgrade_cluster(context, cluster_id, datastore_version) diff --git a/trove/instance/tasks.py b/trove/instance/tasks.py index 8e6fdcd3bc..6ec366f33a 100644 --- a/trove/instance/tasks.py +++ b/trove/instance/tasks.py @@ -114,6 +114,9 @@ class InstanceTasks(object): SHRINKING_ERROR = InstanceTask(0x58, 'SHRINKING', 'Shrinking Cluster Error.', is_error=True) + UPGRADING_ERROR = InstanceTask(0x59, 'UPGRADING', + 'Upgrading Cluster Error.', + is_error=True) UPGRADING = InstanceTask(0x59, 'UPGRADING', 'Upgrading the instance.') # Dissuade further additions at run-time. diff --git a/trove/taskmanager/api.py b/trove/taskmanager/api.py index 437e720b21..951983a2bf 100644 --- a/trove/taskmanager/api.py +++ b/trove/taskmanager/api.py @@ -251,6 +251,22 @@ class API(object): cctxt.cast(self.context, "upgrade", instance_id=instance_id, datastore_version_id=datastore_version_id) + def restart_cluster(self, cluster_id): + LOG.debug("Making async call to restart cluster %s " % cluster_id) + version = self.API_BASE_VERSION + + cctxt = self.client.prepare(version=version) + cctxt.cast(self.context, "restart_cluster", cluster_id=cluster_id) + + def upgrade_cluster(self, cluster_id, datastore_version_id): + LOG.debug("Making async call to upgrade guest to datastore " + "version %s " % datastore_version_id) + version = self.API_BASE_VERSION + + cctxt = self.client.prepare(version=version) + cctxt.cast(self.context, "upgrade_cluster", cluster_id=cluster_id, + datastore_version_id=datastore_version_id) + def load(context, manager=None): if manager: diff --git a/trove/taskmanager/manager.py b/trove/taskmanager/manager.py index 4e2555d215..d337bff18e 100644 --- a/trove/taskmanager/manager.py +++ b/trove/taskmanager/manager.py @@ -371,7 +371,7 @@ class Manager(periodic_task.PeriodicTasks): cluster_config, volume_type, modules, locality): with EndNotification(context, instance_id=(instance_id[0] - if type(instance_id) is list + if isinstance(instance_id, list) else instance_id)): self._create_instance(context, instance_id, name, flavor, image_id, databases, users, @@ -409,6 +409,15 @@ class Manager(periodic_task.PeriodicTasks): cluster_tasks = models.load_cluster_tasks(context, cluster_id) cluster_tasks.shrink_cluster(context, cluster_id, instance_ids) + def restart_cluster(self, context, cluster_id): + cluster_tasks = models.load_cluster_tasks(context, cluster_id) + cluster_tasks.restart_cluster(context, cluster_id) + + def upgrade_cluster(self, context, cluster_id, datastore_version_id): + datastore_version = DatastoreVersion.load_by_uuid(datastore_version_id) + cluster_tasks = models.load_cluster_tasks(context, cluster_id) + cluster_tasks.upgrade_cluster(context, cluster_id, datastore_version) + def delete_cluster(self, context, cluster_id): with EndNotification(context): cluster_tasks = models.load_cluster_tasks(context, cluster_id) diff --git a/trove/taskmanager/models.py b/trove/taskmanager/models.py index 32deb57f73..d8acf2415c 100755 --- a/trove/taskmanager/models.py +++ b/trove/taskmanager/models.py @@ -13,10 +13,12 @@ # under the License. import os.path +import time import traceback from cinderclient import exceptions as cinder_exceptions from eventlet import greenthread +from eventlet.timeout import Timeout from heatclient import exc as heat_exceptions from novaclient import exceptions as nova_exceptions from oslo_log import log as logging @@ -45,6 +47,10 @@ from trove.common.i18n import _ from trove.common import instance as rd_instance from trove.common.instance import ServiceStatuses from trove.common.notification import ( + DBaaSInstanceRestart, + DBaaSInstanceUpgrade, + EndNotification, + StartNotification, TroveInstanceCreate, TroveInstanceModifyVolume, TroveInstanceModifyFlavor, @@ -316,6 +322,88 @@ class ClusterTasks(Cluster): cluster.save() LOG.debug("end delete_cluster for id: %s" % cluster_id) + def rolling_restart_cluster(self, context, cluster_id, delay_sec=0): + LOG.debug("Begin rolling cluster restart for id: %s" % cluster_id) + + def _restart_cluster_instance(instance): + LOG.debug("Restarting instance with id: %s" % instance.id) + context.notification = ( + DBaaSInstanceRestart(context, **request_info)) + with StartNotification(context, instance_id=instance.id): + with EndNotification(context): + instance.update_db(task_status=InstanceTasks.REBOOTING) + instance.restart() + + timeout = Timeout(CONF.cluster_usage_timeout) + cluster_notification = context.notification + request_info = cluster_notification.serialize(context) + try: + node_db_inst = DBInstance.find_all(cluster_id=cluster_id).all() + for index, db_inst in enumerate(node_db_inst): + if index > 0: + LOG.debug( + "Waiting (%ds) for restarted nodes to rejoin the " + "cluster before proceeding." % delay_sec) + time.sleep(delay_sec) + instance = BuiltInstanceTasks.load(context, db_inst.id) + _restart_cluster_instance(instance) + except Timeout as t: + if t is not timeout: + raise # not my timeout + LOG.exception(_("Timeout for restarting cluster.")) + raise + except Exception: + LOG.exception(_("Error restarting cluster.") % cluster_id) + raise + finally: + context.notification = cluster_notification + timeout.cancel() + self.reset_task() + + LOG.debug("End rolling restart for id: %s." % cluster_id) + + def rolling_upgrade_cluster(self, context, cluster_id, datastore_version): + LOG.debug("Begin rolling cluster upgrade for id: %s." % cluster_id) + + def _upgrade_cluster_instance(instance): + LOG.debug("Upgrading instance with id: %s." % instance.id) + context.notification = ( + DBaaSInstanceUpgrade(context, **request_info)) + with StartNotification( + context, instance_id=instance.id, + datastore_version_id=datastore_version.id): + with EndNotification(context): + instance.update_db( + datastore_version_id=datastore_version.id, + task_status=InstanceTasks.UPGRADING) + instance.upgrade(datastore_version) + + timeout = Timeout(CONF.cluster_usage_timeout) + cluster_notification = context.notification + request_info = cluster_notification.serialize(context) + try: + for db_inst in DBInstance.find_all(cluster_id=cluster_id).all(): + instance = BuiltInstanceTasks.load( + context, db_inst.id) + _upgrade_cluster_instance(instance) + + self.reset_task() + except Timeout as t: + if t is not timeout: + raise # not my timeout + LOG.exception(_("Timeout for upgrading cluster.")) + self.update_statuses_on_failure( + cluster_id, status=InstanceTasks.UPGRADING_ERROR) + except Exception: + LOG.exception(_("Error upgrading cluster %s.") % cluster_id) + self.update_statuses_on_failure( + cluster_id, status=InstanceTasks.UPGRADING_ERROR) + finally: + context.notification = cluster_notification + timeout.cancel() + + LOG.debug("End upgrade_cluster for id: %s." % cluster_id) + class FreshInstanceTasks(FreshInstance, NotifyMixin, ConfigurationMixin): diff --git a/trove/tests/int_tests.py b/trove/tests/int_tests.py index 2bb1f281ff..b4ce7fa3ab 100644 --- a/trove/tests/int_tests.py +++ b/trove/tests/int_tests.py @@ -162,6 +162,9 @@ cluster_root_groups.extend([groups.CLUSTER_ACTIONS_ROOT_ENABLE]) cluster_root_actions_groups = list(cluster_actions_groups) cluster_root_actions_groups.extend([groups.CLUSTER_ACTIONS_ROOT_ACTIONS]) +cluster_restart_groups = list(cluster_create_groups) +cluster_restart_groups.extend([groups.CLUSTER_ACTIONS_RESTART_WAIT]) + cluster_upgrade_groups = list(cluster_create_groups) cluster_upgrade_groups.extend([groups.CLUSTER_UPGRADE_WAIT]) @@ -247,6 +250,7 @@ register(["cluster"], cluster_actions_groups) register(["cluster_actions"], cluster_actions_groups) register(["cluster_create"], cluster_create_groups) register(["cluster_negative_actions"], cluster_negative_actions_groups) +register(["cluster_restart"], cluster_restart_groups) register(["cluster_root"], cluster_root_groups) register(["cluster_root_actions"], cluster_root_actions_groups) register(["cluster_upgrade"], cluster_upgrade_groups) diff --git a/trove/tests/scenario/groups/__init__.py b/trove/tests/scenario/groups/__init__.py index 75c326dd4d..8274dd7d1b 100644 --- a/trove/tests/scenario/groups/__init__.py +++ b/trove/tests/scenario/groups/__init__.py @@ -61,6 +61,8 @@ CLUSTER_ACTIONS_GROW = "scenario.cluster_actions_grow_grp" CLUSTER_ACTIONS_GROW_WAIT = "scenario.cluster_actions_grow_wait_grp" CLUSTER_ACTIONS_SHRINK = "scenario.cluster_actions_shrink_grp" CLUSTER_ACTIONS_SHRINK_WAIT = "scenario.cluster_actions_shrink_wait_grp" +CLUSTER_ACTIONS_RESTART = "scenario.cluster_actions_restart_grp" +CLUSTER_ACTIONS_RESTART_WAIT = "scenario.cluster_actions_restart_wait_grp" # Cluster Create Group (in cluster_actions file) diff --git a/trove/tests/scenario/groups/cluster_group.py b/trove/tests/scenario/groups/cluster_group.py index cadd8565b2..205002b9a3 100644 --- a/trove/tests/scenario/groups/cluster_group.py +++ b/trove/tests/scenario/groups/cluster_group.py @@ -92,8 +92,44 @@ class ClusterCreateWaitGroup(TestGroup): @test(groups=[GROUP, groups.CLUSTER_ACTIONS, - groups.CLUSTER_ACTIONS_ROOT_ENABLE], + groups.CLUSTER_ACTIONS_RESTART], depends_on_groups=[groups.CLUSTER_CREATE_WAIT]) +class ClusterRestartGroup(TestGroup): + + def __init__(self): + super(ClusterRestartGroup, self).__init__( + ClusterRunnerFactory.instance()) + + @test + def cluster_restart(self): + """Restart the cluster.""" + self.test_runner.run_cluster_restart() + + +@test(groups=[GROUP, groups.CLUSTER_ACTIONS, + groups.CLUSTER_ACTIONS_RESTART_WAIT], + depends_on_groups=[groups.CLUSTER_ACTIONS_RESTART]) +class ClusterRestartWaitGroup(TestGroup): + + def __init__(self): + super(ClusterRestartWaitGroup, self).__init__( + ClusterRunnerFactory.instance()) + + @test + def cluster_restart_wait(self): + """Wait for cluster restart to complete.""" + self.test_runner.run_cluster_restart_wait() + + @test(depends_on=[cluster_restart_wait]) + def verify_initial_cluster_data(self): + """Verify the initial data still exists after cluster restart.""" + self.test_runner.run_verify_initial_cluster_data() + + +@test(groups=[GROUP, groups.CLUSTER_ACTIONS, + groups.CLUSTER_ACTIONS_ROOT_ENABLE], + depends_on_groups=[groups.CLUSTER_CREATE_WAIT], + runs_after_groups=[groups.CLUSTER_ACTIONS_RESTART_WAIT]) class ClusterRootEnableGroup(TestGroup): def __init__(self): @@ -308,7 +344,8 @@ class ClusterRootEnableShrinkGroup(TestGroup): groups.CLUSTER_ACTIONS_ROOT_SHRINK, groups.CLUSTER_ACTIONS_GROW_WAIT, groups.CLUSTER_ACTIONS_SHRINK_WAIT, - groups.CLUSTER_UPGRADE_WAIT]) + groups.CLUSTER_UPGRADE_WAIT, + groups.CLUSTER_ACTIONS_RESTART_WAIT]) class ClusterDeleteGroup(TestGroup): def __init__(self): diff --git a/trove/tests/scenario/runners/cluster_runners.py b/trove/tests/scenario/runners/cluster_runners.py index de06048798..d9a0f153bc 100644 --- a/trove/tests/scenario/runners/cluster_runners.py +++ b/trove/tests/scenario/runners/cluster_runners.py @@ -160,6 +160,34 @@ class ClusterRunner(TestRunner): self.assert_cluster_show( self.cluster_id, expected_task_name, expected_http_code) + def run_cluster_restart(self, expected_http_code=202, + expected_task_name='RESTARTING_CLUSTER'): + self.assert_cluster_restart( + self.cluster_id, expected_task_name, expected_http_code) + + def assert_cluster_restart( + self, cluster_id, expected_task_name, expected_http_code): + client = self.auth_client + client.clusters.restart(cluster_id) + self.assert_client_code(client, expected_http_code) + self._assert_cluster_response( + client, cluster_id, expected_task_name) + + def run_cluster_restart_wait(self): + self.assert_cluster_restart_wait(self.cluster_id) + + def assert_cluster_restart_wait(self, cluster_id): + client = self.auth_client + cluster_instances = self._get_cluster_instances( + client, cluster_id) + self.assert_all_instance_states( + cluster_instances, ['REBOOT', 'ACTIVE']) + + self._assert_cluster_states( + client, cluster_id, ['NONE']) + self._assert_cluster_response( + client, cluster_id, 'NONE') + def assert_cluster_show(self, cluster_id, expected_task_name, expected_http_code): self._assert_cluster_response(self.auth_client,