Added maintenance mode

Change-Id: Ie42fcfa96044da1793f1afbff5fb2ae5534d5c2e
This commit is contained in:
Vasudeo Nimbekar 2023-05-29 14:35:04 +05:30
parent d6b7f7305d
commit 51574ef6ef
8 changed files with 487 additions and 0 deletions

View File

@ -0,0 +1,59 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pecan
from pecan import rest
import wsmeext.pecan as wsme_pecan
from mistral.api.controllers import resource
from mistral import context
from mistral.db.v2 import api as db_api
from mistral.services import maintenance
from mistral.utils import rest_utils
class Maintenance(resource.Resource):
"""Maintenance resource."""
status = str
@classmethod
def sample(cls):
return cls(
status="1234"
)
class MaintenanceController(rest.RestController):
@pecan.expose('json')
def get(self):
context.set_ctx(None)
maintenance_status = db_api.get_maintenance_status()
return {
'status': maintenance_status
}
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(
Maintenance,
body=Maintenance,
status_code=200
)
def put(self, new_maintenance_status):
context.set_ctx(None)
maintenance.change_maintenance_mode(new_maintenance_status.status)
return new_maintenance_status

View File

@ -18,6 +18,7 @@ from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
from mistral.api.controllers import info
from mistral.api.controllers import maintenance
from mistral.api.controllers import resource
from mistral.api.controllers.v2 import root as v2_root
@ -64,6 +65,7 @@ class APIVersions(resource.Resource):
class RootController(object):
v2 = v2_root.Controller()
info = info.InfoController()
maintenance = maintenance.MaintenanceController()
@wsme_pecan.wsexpose(APIVersions)
def index(self):

View File

@ -0,0 +1,48 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pecan
from pecan import hooks
from mistral.db.v2 import api as db_api
from mistral.services import maintenance as maintenance_service
ALLOWED_WITHOUT_AUTH = ['/', '/v2/', '/health', '/maintenance']
class MaintenanceHook(hooks.PecanHook):
def before(self, state):
if state.request.path in ALLOWED_WITHOUT_AUTH or \
state.request.method == 'GET':
return
cluster_state = db_api.get_maintenance_status()
is_complete_async_actions = (
state.request.method == 'PUT' and
'/v2/action_executions' in state.request.path and
cluster_state == maintenance_service.PAUSING
)
if is_complete_async_actions or \
cluster_state == maintenance_service.RUNNING:
return
msg = "Current Mistral state is {}. Method is not allowed".format(
cluster_state
)
pecan.abort(
status_code=423,
detail=msg,
headers={'Server-Error-Message': msg}
)

View File

@ -0,0 +1,46 @@
# Copyright 2023 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""create maintenance table
Revision ID: 042
Revises: 041
Create Date: 2023-05-29 12:45:14.041458
"""
# revision identifiers, used by Alembic.
revision = '042'
down_revision = '041'
from alembic import op
from mistral.services import maintenance
def upgrade():
connection = op.get_bind()
connection.execute(
"""CREATE TABLE IF NOT EXISTS mistral_metrics
(name VARCHAR(255) UNIQUE, value VARCHAR(255),
id INT PRIMARY KEY DEFAULT 1)"""
)
connection.execute(
"""INSERT INTO mistral_metrics (id, name, value)
VALUES (1, 'maintenance_status', %s)""",
(maintenance.RUNNING,)
)

View File

@ -460,6 +460,10 @@ def get_delayed_calls_to_start(time, batch_size=None):
return IMPL.get_delayed_calls_to_start(time, batch_size)
def get_overdue_calls(time):
return IMPL.get_overdue_calls(time)
def create_delayed_call(values):
return IMPL.create_delayed_call(values)
@ -722,3 +726,11 @@ def delete_named_lock(lock_id):
def named_lock(name):
with IMPL.named_lock(name):
yield
def get_maintenance_status():
return IMPL.get_maintenance_status()
def update_maintenance_status(status):
return IMPL.update_maintenance_status(status)

View File

@ -2128,3 +2128,30 @@ def named_lock(name):
yield
delete_named_lock(lock_id)
def get_maintenance_status():
query = b.get_engine().execute('SELECT status FROM mistral_metrics '
'WHERE name = %s', ("maintenance_status",))
rows = query.fetchall()
return rows[0][0] if rows else None
def update_maintenance_status(status):
b.get_engine().execute(
"INSERT into mistral_metrics (id, name, value)"
"VALUES (1, %s, %s)"
"ON CONFLICT (name) DO UPDATE"
"SET value = EXCLUDED.value",
("maintenance_status", status,))
@b.session_aware()
def get_overdue_calls(time, session=None):
query = b.model_query(models.DelayedCall)
query = query.filter(models.DelayedCall.execution_time < time)
query = query.filter_by(processing=True)
return query.all()

View File

@ -205,3 +205,7 @@ class KombuException(Exception):
class InvalidStateTransitionException(MistralException):
http_code = 400
message = 'Invalid state transition'
class MaintenanceException(MistralException):
http_code = 500

View File

@ -0,0 +1,289 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import eventlet
from oslo_log import log as logging
from mistral import context as auth_ctx
from mistral.db.v2 import api as db_api
from mistral.engine import post_tx_queue
from mistral.engine import workflow_handler
from mistral import exceptions
from mistral.scheduler import base as sched_base
from mistral.workflow import states
PAUSING = 'PAUSING'
PAUSED = 'PAUSED'
RUNNING = 'RUNNING'
_VALID_TRANSITIONS = {
PAUSING: [PAUSED],
PAUSED: [PAUSED, RUNNING],
RUNNING: [PAUSED, RUNNING]
}
_ALL_STATES = [
PAUSING,
PAUSED,
RUNNING
]
LOG = logging.getLogger(__name__)
_PAUSE_EXECUTIONS_PATH = 'mistral.services.maintenance._pause_executions'
_PAUSE_EXECUTION_PATH = 'mistral.services.maintenance._pause_execution'
_RESUME_EXECUTIONS_PATH = 'mistral.services.maintenance._resume_executions'
_RESUME_EXECUTION_PATH = 'mistral.services.maintenance._resume_execution'
_AWAIT_PAUSE_EXECUTION_PATH = \
'mistral.services.maintenance.await_pause_executions'
def is_valid_transition(old_state, new_state):
return new_state in _VALID_TRANSITIONS.get(old_state, [])
def pause_running_executions(skip_tx=False):
execution_ids = [(ex.id, ex.project_id) for ex in
db_api.get_workflow_executions(state=states.RUNNING,
insecure=True)]
LOG.info("Number of find workflow executions is {}"
.format(len(execution_ids)))
if skip_tx:
sched = sched_base.get_system_scheduler()
for wf_ex_id, project_id in execution_ids:
job = sched_base.SchedulerJob(
func_name=_PAUSE_EXECUTION_PATH,
func_args={
'wf_ex_id': wf_ex_id,
'project_id': project_id
}
)
sched.schedule(job)
return
for wf_ex_id, project_id in execution_ids:
try:
with db_api.transaction(skip=skip_tx):
_pause_execution(wf_ex_id, project_id)
except BaseException as e:
LOG.error(str(e))
return True
def _pause_execution(wf_ex_id, project_id, skip_tx=False):
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
auth_token=None,
is_admin=True
)
)
current_state = db_api.get_maintenance_status()
if current_state != PAUSING:
return False
wf_ex = db_api.get_workflow_execution(wf_ex_id)
if wf_ex.root_execution_id:
trace_uuid = wf_ex.root_execution_id
else:
trace_uuid = wf_ex.id
auth_ctx.set_ctx(
auth_ctx.MistralContext(
tenant=project_id,
trace_uuid=trace_uuid
)
)
if states.is_running(wf_ex.state):
workflow_handler.pause_workflow(wf_ex)
LOG.info('Execution {} was paused'.format(wf_ex_id))
def await_pause_executions(skip_tx=False):
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
auth_token=None,
is_admin=True
)
)
if skip_tx:
current_state = db_api.get_maintenance_status()
if current_state != PAUSING:
return False
tasks = db_api.get_task_executions(
state=states.RUNNING, insecure=True
)
if not tasks:
if db_api.get_maintenance_status() == PAUSING:
db_api.update_maintenance_status(PAUSED)
return
LOG.info('The following tasks have RUNNING state: {}'.format([
task.id for task in tasks
]))
sched = sched_base.get_system_scheduler()
job = sched_base.SchedulerJob(
run_after=1,
func_name=_AWAIT_PAUSE_EXECUTION_PATH,
func_args={'skip_tx': True}
)
sched.schedule(job)
return
while True:
with db_api.transaction(skip=skip_tx):
current_state = db_api.get_maintenance_status()
if current_state != PAUSING:
return False
tasks = db_api.get_task_executions(
state=states.RUNNING, insecure=True
)
if not tasks:
return True
LOG.info('The following tasks have RUNNING state: {}'.format([
task.id for task in tasks
]))
eventlet.sleep(1)
def change_maintenance_mode(new_state):
if new_state not in _ALL_STATES:
raise exceptions.MaintenanceException(
'Not found {} maintenance state. List of states: {}'.format(
new_state, _ALL_STATES
)
)
if new_state == PAUSING:
raise exceptions.MaintenanceException(
'PAUSING is intermediate state. Consider PAUSED, RUNNING as new '
'state')
with db_api.transaction():
current_state = db_api.get_maintenance_status()
if current_state == new_state:
LOG.info('State was already changed. Skip')
return current_state
sched = sched_base.get_system_scheduler()
if new_state == PAUSED:
job = sched_base.SchedulerJob(func_name=_PAUSE_EXECUTIONS_PATH)
sched.schedule(job)
db_api.update_maintenance_status(PAUSING)
return PAUSING
elif new_state == RUNNING:
job = sched_base.SchedulerJob(func_name=_RESUME_EXECUTIONS_PATH)
sched.schedule(job)
db_api.update_maintenance_status(RUNNING)
return RUNNING
@post_tx_queue.run
def _pause_executions(skip_tx=False):
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
auth_token=None,
is_admin=True
)
)
if skip_tx:
pause_running_executions(skip_tx)
await_pause_executions(skip_tx)
return
if pause_running_executions() and await_pause_executions():
with db_api.transaction():
if db_api.get_maintenance_status() == PAUSING:
db_api.update_maintenance_status(PAUSED)
@post_tx_queue.run
def _resume_executions(skip_tx=False):
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
auth_token=None,
is_admin=True
)
)
sched = sched_base.get_system_scheduler()
with db_api.transaction(skip=skip_tx):
current_state = db_api.get_maintenance_status()
if current_state != RUNNING:
return
paused_executions = db_api.get_workflow_executions(
state=states.PAUSED, insecure=True
)
if not paused_executions:
return
for ex in paused_executions:
if skip_tx:
job = sched_base.SchedulerJob(
func_name=_RESUME_EXECUTION_PATH,
func_args={
'wf_ex_id': ex.id
}
)
sched.schedule(job)
else:
_resume_execution(wf_ex_id=ex.id)
def _resume_execution(wf_ex_id, skip_tx=False):
wf_ex = db_api.get_workflow_execution(wf_ex_id)
if wf_ex.root_execution_id:
trace_uuid = wf_ex.root_execution_id
else:
trace_uuid = wf_ex.id
auth_ctx.set_ctx(
auth_ctx.MistralContext(
tenant=wf_ex.project_id,
trace_uuid=trace_uuid
)
)
workflow_handler.resume_workflow(wf_ex)
LOG.info('The following execution was resumed: {}'.format([
wf_ex.id
]))