From 51574ef6efb0167695727ef71d1c9eec7f2b81f4 Mon Sep 17 00:00:00 2001 From: Vasudeo Nimbekar Date: Mon, 29 May 2023 14:35:04 +0530 Subject: [PATCH] Added maintenance mode Change-Id: Ie42fcfa96044da1793f1afbff5fb2ae5534d5c2e --- mistral/api/controllers/maintenance.py | 59 ++++ mistral/api/controllers/root.py | 2 + mistral/api/hooks/maintenance.py | 48 +++ .../versions/042_create_maintenance_table.py | 46 +++ mistral/db/v2/api.py | 12 + mistral/db/v2/sqlalchemy/api.py | 27 ++ mistral/exceptions.py | 4 + mistral/services/maintenance.py | 289 ++++++++++++++++++ 8 files changed, 487 insertions(+) create mode 100644 mistral/api/controllers/maintenance.py create mode 100644 mistral/api/hooks/maintenance.py create mode 100644 mistral/db/sqlalchemy/migration/alembic_migrations/versions/042_create_maintenance_table.py create mode 100644 mistral/services/maintenance.py diff --git a/mistral/api/controllers/maintenance.py b/mistral/api/controllers/maintenance.py new file mode 100644 index 000000000..c3f8d207b --- /dev/null +++ b/mistral/api/controllers/maintenance.py @@ -0,0 +1,59 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pecan +from pecan import rest +import wsmeext.pecan as wsme_pecan + +from mistral.api.controllers import resource +from mistral import context +from mistral.db.v2 import api as db_api +from mistral.services import maintenance +from mistral.utils import rest_utils + + +class Maintenance(resource.Resource): + """Maintenance resource.""" + + status = str + + @classmethod + def sample(cls): + return cls( + status="1234" + ) + + +class MaintenanceController(rest.RestController): + + @pecan.expose('json') + def get(self): + context.set_ctx(None) + + maintenance_status = db_api.get_maintenance_status() + + return { + 'status': maintenance_status + } + + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose( + Maintenance, + body=Maintenance, + status_code=200 + ) + def put(self, new_maintenance_status): + context.set_ctx(None) + + maintenance.change_maintenance_mode(new_maintenance_status.status) + + return new_maintenance_status diff --git a/mistral/api/controllers/root.py b/mistral/api/controllers/root.py index 22943abf5..39d084fee 100644 --- a/mistral/api/controllers/root.py +++ b/mistral/api/controllers/root.py @@ -18,6 +18,7 @@ from wsme import types as wtypes import wsmeext.pecan as wsme_pecan from mistral.api.controllers import info +from mistral.api.controllers import maintenance from mistral.api.controllers import resource from mistral.api.controllers.v2 import root as v2_root @@ -64,6 +65,7 @@ class APIVersions(resource.Resource): class RootController(object): v2 = v2_root.Controller() info = info.InfoController() + maintenance = maintenance.MaintenanceController() @wsme_pecan.wsexpose(APIVersions) def index(self): diff --git a/mistral/api/hooks/maintenance.py b/mistral/api/hooks/maintenance.py new file mode 100644 index 000000000..f9f22d340 --- /dev/null +++ b/mistral/api/hooks/maintenance.py @@ -0,0 +1,48 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pecan +from pecan import hooks + +from mistral.db.v2 import api as db_api +from mistral.services import maintenance as maintenance_service + +ALLOWED_WITHOUT_AUTH = ['/', '/v2/', '/health', '/maintenance'] + + +class MaintenanceHook(hooks.PecanHook): + + def before(self, state): + if state.request.path in ALLOWED_WITHOUT_AUTH or \ + state.request.method == 'GET': + return + + cluster_state = db_api.get_maintenance_status() + is_complete_async_actions = ( + state.request.method == 'PUT' and + '/v2/action_executions' in state.request.path and + cluster_state == maintenance_service.PAUSING + ) + + if is_complete_async_actions or \ + cluster_state == maintenance_service.RUNNING: + return + + msg = "Current Mistral state is {}. Method is not allowed".format( + cluster_state + ) + + pecan.abort( + status_code=423, + detail=msg, + headers={'Server-Error-Message': msg} + ) diff --git a/mistral/db/sqlalchemy/migration/alembic_migrations/versions/042_create_maintenance_table.py b/mistral/db/sqlalchemy/migration/alembic_migrations/versions/042_create_maintenance_table.py new file mode 100644 index 000000000..ab63c3136 --- /dev/null +++ b/mistral/db/sqlalchemy/migration/alembic_migrations/versions/042_create_maintenance_table.py @@ -0,0 +1,46 @@ +# Copyright 2023 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""create maintenance table + +Revision ID: 042 +Revises: 041 +Create Date: 2023-05-29 12:45:14.041458 + +""" + +# revision identifiers, used by Alembic. +revision = '042' +down_revision = '041' + +from alembic import op + +from mistral.services import maintenance + + +def upgrade(): + connection = op.get_bind() + + connection.execute( + """CREATE TABLE IF NOT EXISTS mistral_metrics + (name VARCHAR(255) UNIQUE, value VARCHAR(255), + id INT PRIMARY KEY DEFAULT 1)""" + ) + + connection.execute( + """INSERT INTO mistral_metrics (id, name, value) + VALUES (1, 'maintenance_status', %s)""", + (maintenance.RUNNING,) + ) diff --git a/mistral/db/v2/api.py b/mistral/db/v2/api.py index fde104592..6fd806d19 100644 --- a/mistral/db/v2/api.py +++ b/mistral/db/v2/api.py @@ -460,6 +460,10 @@ def get_delayed_calls_to_start(time, batch_size=None): return IMPL.get_delayed_calls_to_start(time, batch_size) +def get_overdue_calls(time): + return IMPL.get_overdue_calls(time) + + def create_delayed_call(values): return IMPL.create_delayed_call(values) @@ -722,3 +726,11 @@ def delete_named_lock(lock_id): def named_lock(name): with IMPL.named_lock(name): yield + + +def get_maintenance_status(): + return IMPL.get_maintenance_status() + + +def update_maintenance_status(status): + return IMPL.update_maintenance_status(status) diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index deffaa2b1..cbfe98240 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -2128,3 +2128,30 @@ def named_lock(name): yield delete_named_lock(lock_id) + + +def get_maintenance_status(): + query = b.get_engine().execute('SELECT status FROM mistral_metrics ' + 'WHERE name = %s', ("maintenance_status",)) + rows = query.fetchall() + + return rows[0][0] if rows else None + + +def update_maintenance_status(status): + b.get_engine().execute( + "INSERT into mistral_metrics (id, name, value)" + "VALUES (1, %s, %s)" + "ON CONFLICT (name) DO UPDATE" + "SET value = EXCLUDED.value", + ("maintenance_status", status,)) + + +@b.session_aware() +def get_overdue_calls(time, session=None): + query = b.model_query(models.DelayedCall) + + query = query.filter(models.DelayedCall.execution_time < time) + query = query.filter_by(processing=True) + + return query.all() diff --git a/mistral/exceptions.py b/mistral/exceptions.py index 237e70bc0..2eb3b1135 100644 --- a/mistral/exceptions.py +++ b/mistral/exceptions.py @@ -205,3 +205,7 @@ class KombuException(Exception): class InvalidStateTransitionException(MistralException): http_code = 400 message = 'Invalid state transition' + + +class MaintenanceException(MistralException): + http_code = 500 diff --git a/mistral/services/maintenance.py b/mistral/services/maintenance.py new file mode 100644 index 000000000..8d82cf8db --- /dev/null +++ b/mistral/services/maintenance.py @@ -0,0 +1,289 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import eventlet +from oslo_log import log as logging + +from mistral import context as auth_ctx +from mistral.db.v2 import api as db_api +from mistral.engine import post_tx_queue +from mistral.engine import workflow_handler +from mistral import exceptions +from mistral.scheduler import base as sched_base +from mistral.workflow import states + +PAUSING = 'PAUSING' +PAUSED = 'PAUSED' +RUNNING = 'RUNNING' + +_VALID_TRANSITIONS = { + PAUSING: [PAUSED], + PAUSED: [PAUSED, RUNNING], + RUNNING: [PAUSED, RUNNING] +} + +_ALL_STATES = [ + PAUSING, + PAUSED, + RUNNING +] + +LOG = logging.getLogger(__name__) + +_PAUSE_EXECUTIONS_PATH = 'mistral.services.maintenance._pause_executions' +_PAUSE_EXECUTION_PATH = 'mistral.services.maintenance._pause_execution' +_RESUME_EXECUTIONS_PATH = 'mistral.services.maintenance._resume_executions' +_RESUME_EXECUTION_PATH = 'mistral.services.maintenance._resume_execution' +_AWAIT_PAUSE_EXECUTION_PATH = \ + 'mistral.services.maintenance.await_pause_executions' + + +def is_valid_transition(old_state, new_state): + return new_state in _VALID_TRANSITIONS.get(old_state, []) + + +def pause_running_executions(skip_tx=False): + execution_ids = [(ex.id, ex.project_id) for ex in + db_api.get_workflow_executions(state=states.RUNNING, + insecure=True)] + + LOG.info("Number of find workflow executions is {}" + .format(len(execution_ids))) + + if skip_tx: + sched = sched_base.get_system_scheduler() + for wf_ex_id, project_id in execution_ids: + job = sched_base.SchedulerJob( + func_name=_PAUSE_EXECUTION_PATH, + func_args={ + 'wf_ex_id': wf_ex_id, + 'project_id': project_id + } + ) + sched.schedule(job) + return + + for wf_ex_id, project_id in execution_ids: + try: + with db_api.transaction(skip=skip_tx): + _pause_execution(wf_ex_id, project_id) + except BaseException as e: + LOG.error(str(e)) + + return True + + +def _pause_execution(wf_ex_id, project_id, skip_tx=False): + auth_ctx.set_ctx( + auth_ctx.MistralContext( + user=None, + auth_token=None, + is_admin=True + ) + ) + + current_state = db_api.get_maintenance_status() + + if current_state != PAUSING: + return False + + wf_ex = db_api.get_workflow_execution(wf_ex_id) + + if wf_ex.root_execution_id: + trace_uuid = wf_ex.root_execution_id + else: + trace_uuid = wf_ex.id + + auth_ctx.set_ctx( + auth_ctx.MistralContext( + tenant=project_id, + trace_uuid=trace_uuid + ) + ) + + if states.is_running(wf_ex.state): + workflow_handler.pause_workflow(wf_ex) + LOG.info('Execution {} was paused'.format(wf_ex_id)) + + +def await_pause_executions(skip_tx=False): + auth_ctx.set_ctx( + auth_ctx.MistralContext( + user=None, + auth_token=None, + is_admin=True + ) + ) + + if skip_tx: + current_state = db_api.get_maintenance_status() + + if current_state != PAUSING: + return False + + tasks = db_api.get_task_executions( + state=states.RUNNING, insecure=True + ) + + if not tasks: + if db_api.get_maintenance_status() == PAUSING: + db_api.update_maintenance_status(PAUSED) + return + + LOG.info('The following tasks have RUNNING state: {}'.format([ + task.id for task in tasks + ])) + + sched = sched_base.get_system_scheduler() + job = sched_base.SchedulerJob( + run_after=1, + func_name=_AWAIT_PAUSE_EXECUTION_PATH, + func_args={'skip_tx': True} + ) + sched.schedule(job) + return + + while True: + with db_api.transaction(skip=skip_tx): + current_state = db_api.get_maintenance_status() + + if current_state != PAUSING: + return False + + tasks = db_api.get_task_executions( + state=states.RUNNING, insecure=True + ) + + if not tasks: + return True + + LOG.info('The following tasks have RUNNING state: {}'.format([ + task.id for task in tasks + ])) + + eventlet.sleep(1) + + +def change_maintenance_mode(new_state): + if new_state not in _ALL_STATES: + raise exceptions.MaintenanceException( + 'Not found {} maintenance state. List of states: {}'.format( + new_state, _ALL_STATES + ) + ) + + if new_state == PAUSING: + raise exceptions.MaintenanceException( + 'PAUSING is intermediate state. Consider PAUSED, RUNNING as new ' + 'state') + + with db_api.transaction(): + current_state = db_api.get_maintenance_status() + + if current_state == new_state: + LOG.info('State was already changed. Skip') + return current_state + + sched = sched_base.get_system_scheduler() + + if new_state == PAUSED: + job = sched_base.SchedulerJob(func_name=_PAUSE_EXECUTIONS_PATH) + sched.schedule(job) + db_api.update_maintenance_status(PAUSING) + + return PAUSING + elif new_state == RUNNING: + job = sched_base.SchedulerJob(func_name=_RESUME_EXECUTIONS_PATH) + sched.schedule(job) + db_api.update_maintenance_status(RUNNING) + + return RUNNING + + +@post_tx_queue.run +def _pause_executions(skip_tx=False): + auth_ctx.set_ctx( + auth_ctx.MistralContext( + user=None, + auth_token=None, + is_admin=True + ) + ) + + if skip_tx: + pause_running_executions(skip_tx) + await_pause_executions(skip_tx) + return + + if pause_running_executions() and await_pause_executions(): + with db_api.transaction(): + if db_api.get_maintenance_status() == PAUSING: + db_api.update_maintenance_status(PAUSED) + + +@post_tx_queue.run +def _resume_executions(skip_tx=False): + auth_ctx.set_ctx( + auth_ctx.MistralContext( + user=None, + auth_token=None, + is_admin=True + ) + ) + sched = sched_base.get_system_scheduler() + + with db_api.transaction(skip=skip_tx): + current_state = db_api.get_maintenance_status() + + if current_state != RUNNING: + return + + paused_executions = db_api.get_workflow_executions( + state=states.PAUSED, insecure=True + ) + + if not paused_executions: + return + + for ex in paused_executions: + if skip_tx: + job = sched_base.SchedulerJob( + func_name=_RESUME_EXECUTION_PATH, + func_args={ + 'wf_ex_id': ex.id + } + ) + sched.schedule(job) + else: + _resume_execution(wf_ex_id=ex.id) + + +def _resume_execution(wf_ex_id, skip_tx=False): + wf_ex = db_api.get_workflow_execution(wf_ex_id) + + if wf_ex.root_execution_id: + trace_uuid = wf_ex.root_execution_id + else: + trace_uuid = wf_ex.id + + auth_ctx.set_ctx( + auth_ctx.MistralContext( + tenant=wf_ex.project_id, + trace_uuid=trace_uuid + ) + ) + + workflow_handler.resume_workflow(wf_ex) + + LOG.info('The following execution was resumed: {}'.format([ + wf_ex.id + ]))