Simplify workflow and join completion logic
* action_queue module is replaced with the more generic post_tx_queue module that allows to register operations that must run after the main DB transaction associated with processing a workflow event such as completing action. * Instead of calling workflow completion check from all places where task may possibly complete, Mistral now registers a post transactional operation that runs after the main DB transaction (to make sure at least one needed consistent DB read) right inside the task completion logic. It reduces clutter significantly. * Workflow completion check is now registered only if the just completed task may lead to workflow completion, i.e. if it's the last one in a workflow branch. * Join now checks delayed calls to reduce a number of join completion checks created with scheduler and also uses post transactional queue for that. Closes-Bug: #1801872 Change-Id: I90741d4121c48c42606dfa850cfe824557b095d0
This commit is contained in:
parent
3d7acd3957
commit
80a1bed67b
@ -382,6 +382,10 @@ def get_delayed_calls(**kwargs):
|
||||
return IMPL.get_delayed_calls(**kwargs)
|
||||
|
||||
|
||||
def get_delayed_calls_count(**kwargs):
|
||||
return IMPL.get_delayed_calls_count(**kwargs)
|
||||
|
||||
|
||||
def delete_delayed_calls(**kwargs):
|
||||
return IMPL.delete_delayed_calls(**kwargs)
|
||||
|
||||
|
@ -284,6 +284,14 @@ def _get_collection(model, insecure=False, limit=None, marker=None,
|
||||
return query.all()
|
||||
|
||||
|
||||
def _get_count(model, insecure=False, **filters):
|
||||
query = b.model_query(model) if insecure else _secure_query(model)
|
||||
|
||||
query = db_filters.apply_filters(query, model, **filters)
|
||||
|
||||
return query.count()
|
||||
|
||||
|
||||
def _get_db_object_by_name(model, name, columns=()):
|
||||
query = _secure_query(model, *columns)
|
||||
|
||||
@ -1134,6 +1142,11 @@ def get_delayed_calls(session=None, **kwargs):
|
||||
return _get_collection(model=models.DelayedCall, **kwargs)
|
||||
|
||||
|
||||
@b.session_aware()
|
||||
def get_delayed_calls_count(session=None, **kwargs):
|
||||
return _get_count(model=models.DelayedCall, **kwargs)
|
||||
|
||||
|
||||
@b.session_aware()
|
||||
def delete_delayed_calls(session=None, **kwargs):
|
||||
return _delete_all(models.DelayedCall, **kwargs)
|
||||
|
@ -1,133 +0,0 @@
|
||||
# Copyright 2016 - Nokia Networks.
|
||||
# Copyright 2016 - Brocade Communications Systems, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import eventlet
|
||||
import functools
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from mistral import context
|
||||
from mistral.executors import base as exe
|
||||
from mistral.rpc import clients as rpc
|
||||
from mistral import utils
|
||||
|
||||
|
||||
_THREAD_LOCAL_NAME = "__action_queue_thread_local"
|
||||
|
||||
# Action queue operations.
|
||||
_RUN_ACTION = "run_action"
|
||||
_ON_ACTION_COMPLETE = "on_action_complete"
|
||||
|
||||
|
||||
def _prepare():
|
||||
utils.set_thread_local(_THREAD_LOCAL_NAME, list())
|
||||
|
||||
|
||||
def _clear():
|
||||
utils.set_thread_local(_THREAD_LOCAL_NAME, None)
|
||||
|
||||
|
||||
def _get_queue():
|
||||
queue = utils.get_thread_local(_THREAD_LOCAL_NAME)
|
||||
|
||||
if queue is None:
|
||||
raise RuntimeError(
|
||||
'Action queue is not initialized for the current thread.'
|
||||
' Most likely some transactional method is not decorated'
|
||||
' with action_queue.process()'
|
||||
)
|
||||
|
||||
return queue
|
||||
|
||||
|
||||
def _process_queue(queue):
|
||||
executor = exe.get_executor(cfg.CONF.executor.type)
|
||||
|
||||
for operation, args in queue:
|
||||
if operation == _RUN_ACTION:
|
||||
action_ex, action_def, target, execution_context, timeout = args
|
||||
|
||||
executor.run_action(
|
||||
action_ex.id,
|
||||
action_def.action_class,
|
||||
action_def.attributes or {},
|
||||
action_ex.input,
|
||||
action_ex.runtime_context.get('safe_rerun', False),
|
||||
execution_context,
|
||||
target=target,
|
||||
timeout=timeout
|
||||
)
|
||||
elif operation == _ON_ACTION_COMPLETE:
|
||||
action_ex_id, result, wf_action = args
|
||||
|
||||
rpc.get_engine_client().on_action_complete(
|
||||
action_ex_id,
|
||||
result,
|
||||
wf_action
|
||||
)
|
||||
|
||||
|
||||
def process(func):
|
||||
"""Decorator that processes (runs) all actions in the action queue.
|
||||
|
||||
Various engine methods may cause new actions to be scheduled. All
|
||||
such methods must be decorated with this decorator. It makes sure
|
||||
to run all the actions in the queue and clean up the queue.
|
||||
"""
|
||||
@functools.wraps(func)
|
||||
def decorate(*args, **kw):
|
||||
_prepare()
|
||||
|
||||
try:
|
||||
res = func(*args, **kw)
|
||||
|
||||
queue = _get_queue()
|
||||
auth_ctx = context.ctx() if context.has_ctx() else None
|
||||
|
||||
# NOTE(rakhmerov): Since we make RPC calls to the engine itself
|
||||
# we need to process the action queue asynchronously in a new
|
||||
# thread. Otherwise, if we have one engine process the engine
|
||||
# may send a request to itself while already processing
|
||||
# another one. In conjunction with blocking RPC it will lead
|
||||
# to a deadlock (and RPC timeout).
|
||||
def _within_new_thread():
|
||||
old_auth_ctx = context.ctx() if context.has_ctx() else None
|
||||
|
||||
context.set_ctx(auth_ctx)
|
||||
|
||||
try:
|
||||
_process_queue(queue)
|
||||
finally:
|
||||
context.set_ctx(old_auth_ctx)
|
||||
|
||||
eventlet.spawn(_within_new_thread)
|
||||
finally:
|
||||
_clear()
|
||||
|
||||
return res
|
||||
|
||||
return decorate
|
||||
|
||||
|
||||
def schedule_run_action(action_ex, action_def, target, execution_context,
|
||||
timeout):
|
||||
args = (action_ex, action_def, target, execution_context, timeout)
|
||||
_get_queue().append((_RUN_ACTION, args))
|
||||
|
||||
|
||||
def schedule_on_action_complete(action_ex_id, result, wf_action=False):
|
||||
_get_queue().append(
|
||||
(_ON_ACTION_COMPLETE, (action_ex_id, result, wf_action))
|
||||
)
|
@ -21,7 +21,7 @@ from osprofiler import profiler
|
||||
import six
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.engine import action_queue
|
||||
from mistral.engine import post_tx_queue
|
||||
from mistral.engine import utils as engine_utils
|
||||
from mistral.engine import workflow_handler as wf_handler
|
||||
from mistral import exceptions as exc
|
||||
@ -254,13 +254,23 @@ class PythonAction(Action):
|
||||
|
||||
execution_context = self._prepare_execution_context()
|
||||
|
||||
action_queue.schedule_run_action(
|
||||
self.action_ex,
|
||||
self.action_def,
|
||||
target,
|
||||
execution_context,
|
||||
timeout=timeout
|
||||
)
|
||||
# Register an asynchronous command to send the action to
|
||||
# run on an executor outside of the main DB transaction.
|
||||
def _run_action():
|
||||
executor = exe.get_executor(cfg.CONF.executor.type)
|
||||
|
||||
executor.run_action(
|
||||
self.action_ex.id,
|
||||
self.action_def.action_class,
|
||||
self.action_def.attributes or {},
|
||||
self.action_ex.input,
|
||||
self.action_ex.runtime_context.get('safe_rerun', False),
|
||||
execution_context,
|
||||
target=target,
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
post_tx_queue.register_operation(_run_action)
|
||||
|
||||
@profiler.trace('action-run', hide_args=True)
|
||||
def run(self, input_dict, target, index=0, desc='', save=True,
|
||||
|
@ -23,8 +23,8 @@ from mistral.db import utils as db_utils
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.db.v2.sqlalchemy import models as db_models
|
||||
from mistral.engine import action_handler
|
||||
from mistral.engine import action_queue
|
||||
from mistral.engine import base
|
||||
from mistral.engine import post_tx_queue
|
||||
from mistral.engine import workflow_handler as wf_handler
|
||||
from mistral import exceptions
|
||||
from mistral import utils as u
|
||||
@ -40,7 +40,7 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
class DefaultEngine(base.Engine):
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
@post_tx_queue.run
|
||||
@profiler.trace('engine-start-workflow', hide_args=True)
|
||||
def start_workflow(self, wf_identifier, wf_namespace='', wf_ex_id=None,
|
||||
wf_input=None, description='', **params):
|
||||
@ -79,7 +79,7 @@ class DefaultEngine(base.Engine):
|
||||
return wf_ex.get_clone()
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
@post_tx_queue.run
|
||||
def start_action(self, action_name, action_input,
|
||||
description=None, **params):
|
||||
with db_api.transaction():
|
||||
@ -134,7 +134,7 @@ class DefaultEngine(base.Engine):
|
||||
return db_api.create_action_execution(values)
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
@post_tx_queue.run
|
||||
@profiler.trace('engine-on-action-complete', hide_args=True)
|
||||
def on_action_complete(self, action_ex_id, result, wf_action=False,
|
||||
async_=False):
|
||||
@ -146,26 +146,10 @@ class DefaultEngine(base.Engine):
|
||||
|
||||
action_handler.on_action_complete(action_ex, result)
|
||||
|
||||
result = action_ex.get_clone()
|
||||
|
||||
# Need to see if checking workflow completion makes sense.
|
||||
wf_ex_id = None
|
||||
|
||||
if (action_ex.task_execution_id
|
||||
and states.is_completed(action_ex.task_execution.state)):
|
||||
wf_ex_id = action_ex.task_execution.workflow_execution_id
|
||||
|
||||
# Note: We must do this check in a new transaction to make sure
|
||||
# that at least one of the parallel transactions will do a consistent
|
||||
# read from the DB.
|
||||
if wf_ex_id:
|
||||
with db_api.transaction():
|
||||
wf_handler.check_and_complete(wf_ex_id)
|
||||
|
||||
return result
|
||||
return action_ex.get_clone()
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
@post_tx_queue.run
|
||||
@profiler.trace('engine-on-action-update', hide_args=True)
|
||||
def on_action_update(self, action_ex_id, state, wf_action=False,
|
||||
async_=False):
|
||||
@ -177,25 +161,10 @@ class DefaultEngine(base.Engine):
|
||||
|
||||
action_handler.on_action_update(action_ex, state)
|
||||
|
||||
result = action_ex.get_clone()
|
||||
|
||||
wf_ex_id = None
|
||||
|
||||
if (action_ex.task_execution_id
|
||||
and states.is_completed(action_ex.task_execution.state)):
|
||||
wf_ex_id = action_ex.task_execution.workflow_execution_id
|
||||
|
||||
# Note: We must do this check in a new transaction to make sure
|
||||
# that at least one of the parallel transactions will do a consistent
|
||||
# read from the DB.
|
||||
if wf_ex_id:
|
||||
with db_api.transaction():
|
||||
wf_handler.check_and_complete(wf_ex_id)
|
||||
|
||||
return result
|
||||
return action_ex.get_clone()
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
@post_tx_queue.run
|
||||
def pause_workflow(self, wf_ex_id):
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex_id)
|
||||
@ -205,7 +174,7 @@ class DefaultEngine(base.Engine):
|
||||
return wf_ex.get_clone()
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
@post_tx_queue.run
|
||||
def rerun_workflow(self, task_ex_id, reset=True, env=None):
|
||||
with db_api.transaction():
|
||||
task_ex = db_api.get_task_execution(task_ex_id)
|
||||
@ -217,7 +186,7 @@ class DefaultEngine(base.Engine):
|
||||
return wf_ex.get_clone()
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
@post_tx_queue.run
|
||||
def resume_workflow(self, wf_ex_id, env=None):
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex_id)
|
||||
@ -227,7 +196,7 @@ class DefaultEngine(base.Engine):
|
||||
return wf_ex.get_clone()
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
@post_tx_queue.run
|
||||
def stop_workflow(self, wf_ex_id, state, message=None):
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex_id)
|
||||
@ -241,10 +210,11 @@ class DefaultEngine(base.Engine):
|
||||
raise NotImplementedError
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
@post_tx_queue.run
|
||||
def report_running_actions(self, action_ex_ids):
|
||||
with db_api.transaction():
|
||||
now = u.utc_now_sec()
|
||||
|
||||
for exec_id in action_ex_ids:
|
||||
try:
|
||||
db_api.update_action_execution(
|
||||
|
@ -15,8 +15,8 @@
|
||||
|
||||
from mistral.db import utils as db_utils
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.engine import action_queue
|
||||
from mistral.engine import base
|
||||
from mistral.engine import post_tx_queue
|
||||
from mistral.engine import workflow_handler as wf_handler
|
||||
from mistral import expressions
|
||||
from mistral.services import scheduler
|
||||
@ -511,7 +511,7 @@ class ConcurrencyPolicy(base.TaskPolicy):
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
@post_tx_queue.run
|
||||
def _continue_task(task_ex_id):
|
||||
from mistral.engine import task_handler
|
||||
|
||||
@ -520,38 +520,25 @@ def _continue_task(task_ex_id):
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
@post_tx_queue.run
|
||||
def _complete_task(task_ex_id, state, state_info):
|
||||
from mistral.engine import task_handler
|
||||
|
||||
with db_api.transaction():
|
||||
task_ex = db_api.get_task_execution(task_ex_id)
|
||||
|
||||
wf_ex_id = task_ex.workflow_execution_id
|
||||
|
||||
task_handler.complete_task(task_ex, state, state_info)
|
||||
|
||||
with db_api.transaction():
|
||||
wf_handler.check_and_complete(wf_ex_id)
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
@post_tx_queue.run
|
||||
def _fail_task_if_incomplete(task_ex_id, timeout):
|
||||
from mistral.engine import task_handler
|
||||
|
||||
with db_api.transaction():
|
||||
task_ex = db_api.get_task_execution(task_ex_id)
|
||||
|
||||
wf_ex_id = None
|
||||
|
||||
if not states.is_completed(task_ex.state):
|
||||
msg = 'Task timed out [timeout(s)=%s].' % timeout
|
||||
|
||||
wf_ex_id = task_ex.workflow_execution_id
|
||||
|
||||
task_handler.complete_task(task_ex, states.ERROR, msg)
|
||||
|
||||
if wf_ex_id:
|
||||
with db_api.transaction():
|
||||
wf_handler.check_and_complete(wf_ex_id)
|
||||
|
131
mistral/engine/post_tx_queue.py
Normal file
131
mistral/engine/post_tx_queue.py
Normal file
@ -0,0 +1,131 @@
|
||||
# Copyright 2015 - Mirantis, Inc.
|
||||
# Copyright 2016 - Brocade Communications Systems, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import eventlet
|
||||
import functools
|
||||
from oslo_log import log as logging
|
||||
|
||||
from mistral import context
|
||||
from mistral.db import utils as db_utils
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral import utils
|
||||
|
||||
|
||||
"""
|
||||
This module contains a mini framework for scheduling operations while
|
||||
performing transactional processing of a workflow event such as
|
||||
completing a workflow action. The scheduled operations will run after
|
||||
the main DB transaction, in a new transaction, if needed.
|
||||
"""
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
_THREAD_LOCAL_NAME = "__operation_queue_thread_local"
|
||||
|
||||
|
||||
def _prepare():
|
||||
# Register two queues: transactional and non transactional operations.
|
||||
utils.set_thread_local(_THREAD_LOCAL_NAME, (list(), list()))
|
||||
|
||||
|
||||
def _clear():
|
||||
utils.set_thread_local(_THREAD_LOCAL_NAME, None)
|
||||
|
||||
|
||||
def register_operation(func, args=None, in_tx=False):
|
||||
"""Register an operation."""
|
||||
|
||||
_get_queues()[0 if in_tx else 1].append((func, args or []))
|
||||
|
||||
|
||||
def _get_queues():
|
||||
queues = utils.get_thread_local(_THREAD_LOCAL_NAME)
|
||||
|
||||
if queues is None:
|
||||
raise RuntimeError(
|
||||
'Operation queue is not initialized for the current thread.'
|
||||
' Most likely some engine method is not decorated with'
|
||||
' operation_queue.run()'
|
||||
)
|
||||
|
||||
return queues
|
||||
|
||||
|
||||
def run(func):
|
||||
"""Decorator that runs all operations registered in the operation queue.
|
||||
|
||||
Various engine methods may register such operations. All such methods must
|
||||
be decorated with this decorator.
|
||||
"""
|
||||
@functools.wraps(func)
|
||||
def decorate(*args, **kw):
|
||||
_prepare()
|
||||
|
||||
try:
|
||||
res = func(*args, **kw)
|
||||
|
||||
queues = _get_queues()
|
||||
|
||||
tx_queue = queues[0]
|
||||
non_tx_queue = queues[1]
|
||||
|
||||
if not tx_queue and not non_tx_queue:
|
||||
return res
|
||||
|
||||
auth_ctx = context.ctx() if context.has_ctx() else None
|
||||
|
||||
def _within_new_thread():
|
||||
old_auth_ctx = context.ctx() if context.has_ctx() else None
|
||||
|
||||
context.set_ctx(auth_ctx)
|
||||
|
||||
try:
|
||||
if tx_queue:
|
||||
_process_tx_queue(tx_queue)
|
||||
|
||||
if non_tx_queue:
|
||||
_process_non_tx_queue(non_tx_queue)
|
||||
finally:
|
||||
context.set_ctx(old_auth_ctx)
|
||||
|
||||
eventlet.spawn(_within_new_thread)
|
||||
finally:
|
||||
_clear()
|
||||
|
||||
return res
|
||||
|
||||
return decorate
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
@run
|
||||
def _process_tx_queue(queue):
|
||||
with db_api.transaction():
|
||||
for func, args in queue:
|
||||
try:
|
||||
func(*args)
|
||||
except Exception:
|
||||
LOG.exception("Failed to run transactional engine operation.")
|
||||
|
||||
raise
|
||||
|
||||
|
||||
def _process_non_tx_queue(queue):
|
||||
for func, args in queue:
|
||||
try:
|
||||
func(*args)
|
||||
except Exception:
|
||||
LOG.exception("Failed to run non-transactional engine operation.")
|
@ -22,7 +22,7 @@ import traceback as tb
|
||||
from mistral.db import utils as db_utils
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.db.v2.sqlalchemy import models
|
||||
from mistral.engine import action_queue
|
||||
from mistral.engine import post_tx_queue
|
||||
from mistral.engine import tasks
|
||||
from mistral.engine import workflow_handler as wf_handler
|
||||
from mistral import exceptions as exc
|
||||
@ -274,6 +274,7 @@ def complete_task(task_ex, state, state_info):
|
||||
_check_affected_tasks(task)
|
||||
|
||||
|
||||
@profiler.trace('task-handler-check-affected-tasks', hide_args=True)
|
||||
def _check_affected_tasks(task):
|
||||
if not task.is_completed():
|
||||
return
|
||||
@ -295,8 +296,28 @@ def _check_affected_tasks(task):
|
||||
task_ex.name
|
||||
)
|
||||
|
||||
def _schedule_if_needed(t_ex_id):
|
||||
# NOTE(rakhmerov): we need to minimize the number of delayed calls
|
||||
# that refresh state of "join" tasks. We'll check if corresponding
|
||||
# calls are already scheduled. Note that we must ignore delayed calls
|
||||
# that are currently being processed because of a possible race with
|
||||
# the transaction that deletes delayed calls, i.e. the call may still
|
||||
# exist in DB (the deleting transaction didn't commit yet) but it has
|
||||
# already been processed and the task state hasn't changed.
|
||||
cnt = db_api.get_delayed_calls_count(
|
||||
key=_get_refresh_state_job_key(t_ex_id),
|
||||
processing=False
|
||||
)
|
||||
|
||||
if cnt == 0:
|
||||
_schedule_refresh_task_state(t_ex_id)
|
||||
|
||||
for t_ex in affected_task_execs:
|
||||
_schedule_refresh_task_state(t_ex)
|
||||
post_tx_queue.register_operation(
|
||||
_schedule_if_needed,
|
||||
args=[t_ex.id],
|
||||
in_tx=True
|
||||
)
|
||||
|
||||
|
||||
def _build_task_from_execution(wf_spec, task_ex):
|
||||
@ -364,7 +385,7 @@ def _create_task(wf_ex, wf_spec, task_spec, ctx, task_ex=None,
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
@post_tx_queue.run
|
||||
@profiler.trace('task-handler-refresh-task-state', hide_args=True)
|
||||
def _refresh_task_state(task_ex_id):
|
||||
with db_api.transaction():
|
||||
@ -373,6 +394,10 @@ def _refresh_task_state(task_ex_id):
|
||||
if not task_ex:
|
||||
return
|
||||
|
||||
if (states.is_completed(task_ex.state)
|
||||
or task_ex.state == states.RUNNING):
|
||||
return
|
||||
|
||||
wf_ex = task_ex.workflow_execution
|
||||
|
||||
if states.is_completed(wf_ex.state):
|
||||
@ -384,46 +409,35 @@ def _refresh_task_state(task_ex_id):
|
||||
|
||||
wf_ctrl = wf_base.get_controller(wf_ex, wf_spec)
|
||||
|
||||
with db_api.named_lock(task_ex.id):
|
||||
db_api.refresh(task_ex)
|
||||
log_state = wf_ctrl.get_logical_task_state(task_ex)
|
||||
|
||||
if (states.is_completed(task_ex.state)
|
||||
or task_ex.state == states.RUNNING):
|
||||
return
|
||||
state = log_state.state
|
||||
state_info = log_state.state_info
|
||||
|
||||
log_state = wf_ctrl.get_logical_task_state(task_ex)
|
||||
# Update 'triggered_by' because it could have changed.
|
||||
task_ex.runtime_context['triggered_by'] = log_state.triggered_by
|
||||
|
||||
state = log_state.state
|
||||
state_info = log_state.state_info
|
||||
|
||||
# Update 'triggered_by' because it could have changed.
|
||||
task_ex.runtime_context['triggered_by'] = log_state.triggered_by
|
||||
|
||||
if state == states.RUNNING:
|
||||
continue_task(task_ex)
|
||||
elif state == states.ERROR:
|
||||
complete_task(task_ex, state, state_info)
|
||||
elif state == states.WAITING:
|
||||
LOG.info(
|
||||
"Task execution is still in WAITING state"
|
||||
" [task_ex_id=%s, task_name=%s]",
|
||||
task_ex_id,
|
||||
task_ex.name
|
||||
)
|
||||
else:
|
||||
# Must never get here.
|
||||
raise RuntimeError(
|
||||
'Unexpected logical task state [task_ex_id=%s, '
|
||||
'task_name=%s, state=%s]' %
|
||||
(task_ex_id, task_ex.name, state)
|
||||
)
|
||||
|
||||
if states.is_completed(task_ex.state):
|
||||
with db_api.transaction():
|
||||
wf_handler.check_and_complete(wf_ex.id)
|
||||
if state == states.RUNNING:
|
||||
continue_task(task_ex)
|
||||
elif state == states.ERROR:
|
||||
complete_task(task_ex, state, state_info)
|
||||
elif state == states.WAITING:
|
||||
LOG.info(
|
||||
"Task execution is still in WAITING state"
|
||||
" [task_ex_id=%s, task_name=%s]",
|
||||
task_ex_id,
|
||||
task_ex.name
|
||||
)
|
||||
else:
|
||||
# Must never get here.
|
||||
raise RuntimeError(
|
||||
'Unexpected logical task state [task_ex_id=%s, '
|
||||
'task_name=%s, state=%s]' %
|
||||
(task_ex_id, task_ex.name, state)
|
||||
)
|
||||
|
||||
|
||||
def _schedule_refresh_task_state(task_ex, delay=0):
|
||||
def _schedule_refresh_task_state(task_ex_id, delay=0):
|
||||
"""Schedules task preconditions check.
|
||||
|
||||
This method provides transactional decoupling of task preconditions
|
||||
@ -436,17 +450,17 @@ def _schedule_refresh_task_state(task_ex, delay=0):
|
||||
we'll have in this case (time between transactions) whereas scheduler
|
||||
is a special component that is designed to be resistant to failures.
|
||||
|
||||
:param task_ex: Task execution.
|
||||
:param task_ex_id: Task execution ID.
|
||||
:param delay: Delay.
|
||||
"""
|
||||
key = _get_refresh_state_job_key(task_ex.id)
|
||||
key = _get_refresh_state_job_key(task_ex_id)
|
||||
|
||||
scheduler.schedule_call(
|
||||
None,
|
||||
_REFRESH_TASK_STATE_PATH,
|
||||
delay,
|
||||
key=key,
|
||||
task_ex_id=task_ex.id
|
||||
task_ex_id=task_ex_id
|
||||
)
|
||||
|
||||
|
||||
@ -455,7 +469,7 @@ def _get_refresh_state_job_key(task_ex_id):
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
@post_tx_queue.run
|
||||
def _scheduled_on_action_complete(action_ex_id, wf_action):
|
||||
with db_api.transaction():
|
||||
if wf_action:
|
||||
@ -465,15 +479,6 @@ def _scheduled_on_action_complete(action_ex_id, wf_action):
|
||||
|
||||
_on_action_complete(action_ex)
|
||||
|
||||
wf_ex_id = None
|
||||
|
||||
if states.is_completed(action_ex.task_execution.state):
|
||||
wf_ex_id = action_ex.task_execution.workflow_execution_id
|
||||
|
||||
if wf_ex_id:
|
||||
with db_api.transaction():
|
||||
wf_handler.check_and_complete(wf_ex_id)
|
||||
|
||||
|
||||
def schedule_on_action_complete(action_ex, delay=0):
|
||||
"""Schedules task completion check.
|
||||
@ -510,7 +515,7 @@ def schedule_on_action_complete(action_ex, delay=0):
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
@post_tx_queue.run
|
||||
def _scheduled_on_action_update(action_ex_id, wf_action):
|
||||
with db_api.transaction():
|
||||
if wf_action:
|
||||
@ -520,15 +525,6 @@ def _scheduled_on_action_update(action_ex_id, wf_action):
|
||||
|
||||
_on_action_update(action_ex)
|
||||
|
||||
wf_ex_id = None
|
||||
|
||||
if states.is_completed(action_ex.task_execution.state):
|
||||
wf_ex_id = action_ex.task_execution.workflow_execution_id
|
||||
|
||||
if wf_ex_id:
|
||||
with db_api.transaction():
|
||||
wf_handler.check_and_complete(wf_ex_id)
|
||||
|
||||
|
||||
def schedule_on_action_update(action_ex, delay=0):
|
||||
"""Schedules task update check.
|
||||
|
@ -26,6 +26,8 @@ from mistral.db.v2 import api as db_api
|
||||
from mistral.engine import actions
|
||||
from mistral.engine import dispatcher
|
||||
from mistral.engine import policies
|
||||
from mistral.engine import post_tx_queue
|
||||
from mistral.engine import workflow_handler as wf_handler
|
||||
from mistral import exceptions as exc
|
||||
from mistral import expressions as expr
|
||||
from mistral.notifiers import base as notif
|
||||
@ -118,6 +120,23 @@ class Task(object):
|
||||
|
||||
This method puts task to a waiting state.
|
||||
"""
|
||||
|
||||
# NOTE(rakhmerov): using named locks may cause problems under load
|
||||
# with MySQL that raises a lot of deadlocks in case of high
|
||||
# parallelism so it makes sense to do a fast check if the object
|
||||
# already exists in DB outside of the lock.
|
||||
if not self.task_ex:
|
||||
t_execs = db_api.get_task_executions(
|
||||
workflow_execution_id=self.wf_ex.id,
|
||||
unique_key=self.unique_key,
|
||||
state=states.WAITING
|
||||
)
|
||||
|
||||
self.task_ex = t_execs[0] if t_execs else None
|
||||
|
||||
if self.task_ex:
|
||||
return
|
||||
|
||||
with db_api.named_lock(self.unique_key):
|
||||
if not self.task_ex:
|
||||
t_execs = db_api.get_task_executions(
|
||||
@ -249,11 +268,25 @@ class Task(object):
|
||||
# upon its completion.
|
||||
self.task_ex.processed = True
|
||||
|
||||
self.register_workflow_completion_check()
|
||||
|
||||
# Publish task event.
|
||||
self.notify(old_task_state, self.task_ex.state)
|
||||
|
||||
dispatcher.dispatch_workflow_commands(self.wf_ex, cmds)
|
||||
|
||||
def register_workflow_completion_check(self):
|
||||
wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec)
|
||||
|
||||
# Register an asynchronous command to check workflow completion
|
||||
# in a separate transaction if the task may potentially lead to
|
||||
# workflow completion.
|
||||
def _check():
|
||||
wf_handler.check_and_complete(self.wf_ex.id)
|
||||
|
||||
if wf_ctrl.may_complete_workflow(self.task_ex):
|
||||
post_tx_queue.register_operation(_check, in_tx=True)
|
||||
|
||||
@profiler.trace('task-update')
|
||||
def update(self, state, state_info=None):
|
||||
"""Update task and set specified state.
|
||||
@ -290,6 +323,9 @@ class Task(object):
|
||||
|
||||
self.set_state(state, state_info)
|
||||
|
||||
if states.is_completed(self.task_ex.state):
|
||||
self.register_workflow_completion_check()
|
||||
|
||||
# Publish event.
|
||||
self.notify(old_task_state, self.task_ex.state)
|
||||
|
||||
|
@ -20,6 +20,7 @@ from osprofiler import profiler
|
||||
import traceback as tb
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.engine import post_tx_queue
|
||||
from mistral.engine import workflows
|
||||
from mistral import exceptions as exc
|
||||
from mistral.services import scheduler
|
||||
@ -108,6 +109,7 @@ def check_and_complete(wf_ex_id):
|
||||
force_fail_workflow(wf.wf_ex, msg)
|
||||
|
||||
|
||||
@post_tx_queue.run
|
||||
@profiler.trace('workflow-handler-check-and-fix-integrity')
|
||||
def _check_and_fix_integrity(wf_ex_id):
|
||||
check_after_seconds = CONF.engine.execution_integrity_check_delay
|
||||
@ -125,15 +127,13 @@ def _check_and_fix_integrity(wf_ex_id):
|
||||
if states.is_completed(wf_ex.state):
|
||||
return
|
||||
|
||||
_schedule_check_and_fix_integrity(wf_ex, delay=60)
|
||||
_schedule_check_and_fix_integrity(wf_ex, delay=120)
|
||||
|
||||
running_task_execs = db_api.get_task_executions(
|
||||
workflow_execution_id=wf_ex.id,
|
||||
state=states.RUNNING
|
||||
)
|
||||
|
||||
any_completed = False
|
||||
|
||||
for t_ex in running_task_execs:
|
||||
# The idea is that we take the latest known timestamp of the task
|
||||
# execution and consider it eligible for checking and fixing only
|
||||
@ -182,13 +182,6 @@ def _check_and_fix_integrity(wf_ex_id):
|
||||
child_executions[-1]
|
||||
)
|
||||
|
||||
if states.is_completed(t_ex.state):
|
||||
any_completed = True
|
||||
|
||||
if any_completed:
|
||||
with db_api.transaction():
|
||||
check_and_complete(wf_ex_id)
|
||||
|
||||
|
||||
def pause_workflow(wf_ex, msg=None):
|
||||
# Pause subworkflows first.
|
||||
@ -272,6 +265,11 @@ def _schedule_check_and_fix_integrity(wf_ex, delay=0):
|
||||
:param wf_ex: Workflow execution.
|
||||
:param delay: Minimum amount of time before the check should be made.
|
||||
"""
|
||||
|
||||
if CONF.engine.execution_integrity_check_delay < 0:
|
||||
# Never check integrity if it's a negative value.
|
||||
return
|
||||
|
||||
key = _get_integrity_check_key(wf_ex)
|
||||
|
||||
scheduler.schedule_call(
|
||||
|
@ -23,14 +23,15 @@ import six
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.db.v2.sqlalchemy import models as db_models
|
||||
from mistral.engine import action_queue
|
||||
from mistral.engine import dispatcher
|
||||
from mistral.engine import post_tx_queue
|
||||
from mistral.engine import utils as engine_utils
|
||||
from mistral import exceptions as exc
|
||||
from mistral import expressions as expr
|
||||
from mistral.lang import parser as spec_parser
|
||||
from mistral.notifiers import base as notif
|
||||
from mistral.notifiers import notification_events as events
|
||||
from mistral.rpc import clients as rpc
|
||||
from mistral.services import triggers
|
||||
from mistral.services import workflows as wf_service
|
||||
from mistral import utils
|
||||
@ -374,7 +375,7 @@ class Workflow(object):
|
||||
|
||||
if wf_ex is None:
|
||||
# Do nothing because the state was updated previously.
|
||||
return
|
||||
return False
|
||||
|
||||
self.wf_ex = wf_ex
|
||||
self.wf_ex.state_info = json.dumps(state_info) \
|
||||
@ -406,6 +407,8 @@ class Workflow(object):
|
||||
|
||||
triggers.on_workflow_complete(self.wf_ex)
|
||||
|
||||
return True
|
||||
|
||||
@profiler.trace('workflow-check-and-complete')
|
||||
def check_and_complete(self):
|
||||
"""Completes the workflow if it needs to be completed.
|
||||
@ -457,14 +460,17 @@ class Workflow(object):
|
||||
return 0
|
||||
|
||||
def _succeed_workflow(self, final_context, msg=None):
|
||||
self.wf_ex.output = data_flow.evaluate_workflow_output(
|
||||
output = data_flow.evaluate_workflow_output(
|
||||
self.wf_ex,
|
||||
self.wf_spec.get_output(),
|
||||
final_context
|
||||
)
|
||||
|
||||
# Set workflow execution to success until after output is evaluated.
|
||||
self.set_state(states.SUCCESS, msg)
|
||||
# Set workflow execution to success after output is evaluated.
|
||||
if not self.set_state(states.SUCCESS, msg):
|
||||
return
|
||||
|
||||
self.wf_ex.output = output
|
||||
|
||||
# Publish event.
|
||||
self.notify(events.WORKFLOW_SUCCEEDED)
|
||||
@ -492,7 +498,8 @@ class Workflow(object):
|
||||
)
|
||||
LOG.error(msg)
|
||||
|
||||
self.set_state(states.ERROR, state_info=msg)
|
||||
if not self.set_state(states.ERROR, state_info=msg):
|
||||
return
|
||||
|
||||
# When we set an ERROR state we should safely set output value getting
|
||||
# w/o exceptions due to field size limitations.
|
||||
@ -524,7 +531,8 @@ class Workflow(object):
|
||||
if states.is_completed(self.wf_ex.state):
|
||||
return
|
||||
|
||||
self.set_state(states.CANCELLED, state_info=msg)
|
||||
if not self.set_state(states.CANCELLED, state_info=msg):
|
||||
return
|
||||
|
||||
# When we set an ERROR state we should safely set output value getting
|
||||
# w/o exceptions due to field size limitations.
|
||||
@ -564,11 +572,16 @@ class Workflow(object):
|
||||
" if a workflow is not in SUCCESS, ERROR or CANCELLED state."
|
||||
)
|
||||
|
||||
action_queue.schedule_on_action_complete(
|
||||
self.wf_ex.id,
|
||||
result,
|
||||
wf_action=True
|
||||
)
|
||||
# Register a command executed in a separate thread to send the result
|
||||
# to the parent workflow outside of the main DB transaction.
|
||||
def _send_result():
|
||||
rpc.get_engine_client().on_action_complete(
|
||||
self.wf_ex.id,
|
||||
result,
|
||||
wf_action=True
|
||||
)
|
||||
|
||||
post_tx_queue.register_operation(_send_result)
|
||||
|
||||
|
||||
def _get_environment(params):
|
||||
|
@ -17,7 +17,7 @@ import datetime
|
||||
from mistral.db import utils as db_utils
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.engine import action_handler
|
||||
from mistral.engine import action_queue
|
||||
from mistral.engine import post_tx_queue
|
||||
from mistral.services import scheduler
|
||||
from mistral import utils
|
||||
from mistral_lib import actions as mistral_lib
|
||||
@ -30,7 +30,7 @@ SCHEDULER_KEY = 'handle_expired_actions_key'
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
@post_tx_queue.run
|
||||
def handle_expired_actions():
|
||||
LOG.debug("Running heartbeat checker...")
|
||||
|
||||
|
@ -532,6 +532,11 @@ class DefaultEngineTest(base.DbTestCase):
|
||||
ml_actions.Result(data='Hi')
|
||||
)
|
||||
|
||||
self._await(
|
||||
lambda:
|
||||
db_api.get_workflow_execution(wf_ex.id).state == states.SUCCESS
|
||||
)
|
||||
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
|
@ -229,6 +229,11 @@ class WorkflowController(object):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def may_complete_workflow(self, task_ex):
|
||||
"""Determines if the task execution may lead to workflow completion."""
|
||||
|
||||
return states.is_completed(task_ex.state)
|
||||
|
||||
@abc.abstractmethod
|
||||
def _find_next_commands(self, task_ex):
|
||||
"""Finds commands that should run next.
|
||||
|
@ -236,6 +236,13 @@ class DirectWorkflowController(base.WorkflowController):
|
||||
filter(is_end_task, batch)
|
||||
)
|
||||
|
||||
def may_complete_workflow(self, task_ex):
|
||||
res = super(DirectWorkflowController, self).may_complete_workflow(
|
||||
task_ex
|
||||
)
|
||||
|
||||
return res and not self._has_outbound_tasks(task_ex)
|
||||
|
||||
def _has_outbound_tasks(self, task_ex):
|
||||
# In order to determine if there are outbound tasks we just need
|
||||
# to calculate next task names (based on task outbound context)
|
||||
|
@ -0,0 +1,10 @@
|
||||
---
|
||||
fixes:
|
||||
- |
|
||||
Workflow and join completion check logic is now simplified with using
|
||||
post transactional queue of operations which is a more generic version of
|
||||
action_queue module previously serving for scheduling action runs outside
|
||||
of the main DB transaction. Workflow completion check is now registered
|
||||
only once when a task completes which reduces clutter and it's registered
|
||||
only if the task may potentially lead to workflow completion.
|
||||
|
Loading…
Reference in New Issue
Block a user