diff --git a/doc/source/configuration/config-guide.rst b/doc/source/configuration/config-guide.rst index 635798cf6..116dd3d7b 100644 --- a/doc/source/configuration/config-guide.rst +++ b/doc/source/configuration/config-guide.rst @@ -131,6 +131,43 @@ directory. For more details see `policy.json file `_. +#. Modify the action execution reporting configuration if needed. + + It is possible that actions stuck in *"RUNNING"* state, for example if the + assigned executor dies or the message that signals the completion of the + action is lost. This section describes a heartbeat based solution to close + these forgotten action executions. The related configuration options are + ``max_missed_heartbeats`` and ``check_interval``. Note that if either + of these options are *"0"* then the feature won't be enabled. + + The default configuration is the following:: + + [action_heartbeat] + max_missed_heartbeats = 15 + check_interval = 20 + first_heartbeat_timeout = 3600 + + *"check_interval = 20"*, so check action executions every + 20 seconds. When the checker runs it will transit all running action + executions to error if the last heartbeat received is older than *"20 \* + 15"* seconds. Note that *"first_heartbeat_timeout = 3600"*, so the action + execution won't be closed for 3600 seconds if no heartbeat was received for + it. + + - **max_missed_heartbeats** + + Defines the maximum amount of missed heartbeats to be allowed. If the number + of missed heartbeats exceeds this number, then the related action execution + will be transited to *"ERROR"* state with cause *"Heartbeat wasn't received."*. + + - **check_interval** + + The interval between checks (in seconds). + + - **first_heartbeat_timeout** + + The grace period for the first heartbeat (in seconds). + #. Finally, try to run mistral engine and verify that it is running without any error:: diff --git a/mistral/config.py b/mistral/config.py index 3c8486409..d437f9eb5 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -337,7 +337,7 @@ execution_expiration_policy_opts = [ 'evaluation_interval', help=_('How often will the executions be evaluated ' '(in minutes). For example for value 120 the interval ' - 'will be 2 hours (every 2 hours).' + 'will be 2 hours (every 2 hours). ' 'Note that only final state executions will be removed: ' '( SUCCESS / ERROR / CANCELLED ).') ), @@ -351,12 +351,12 @@ execution_expiration_policy_opts = [ cfg.IntOpt( 'max_finished_executions', default=0, - help=_('The maximum number of finished workflow executions' - 'to be stored. For example when max_finished_executions = 100,' - 'only the 100 latest finished executions will be preserved.' - 'This means that even unexpired executions are eligible' - 'for deletion, to decrease the number of executions in the' - 'database. The default value is 0. If it is set to 0,' + help=_('The maximum number of finished workflow executions ' + 'to be stored. For example when max_finished_executions = 100, ' + 'only the 100 latest finished executions will be preserved. ' + 'This means that even unexpired executions are eligible ' + 'for deletion, to decrease the number of executions in the ' + 'database. The default value is 0. If it is set to 0, ' 'this constraint won\'t be applied.') ), cfg.IntOpt( @@ -364,11 +364,44 @@ execution_expiration_policy_opts = [ default=0, help=_('Size of batch of expired executions to be deleted.' 'The default value is 0. If it is set to 0, ' - 'size of batch is total number of expired executions' + 'size of batch is total number of expired executions ' 'that is going to be deleted.') ) ] +action_heartbeat_opts = [ + cfg.IntOpt( + 'max_missed_heartbeats', + min=0, + default=15, + help=_('The maximum amount of missed heartbeats to be allowed. ' + 'If set to 0 then this feature won\'t be enabled. ' + 'See check_interval for more details.') + ), + cfg.IntOpt( + 'check_interval', + min=0, + default=20, + help=_('How often the action executions are checked (in seconds). ' + 'For example when check_interval = 10, check action ' + 'executions every 10 seconds. When the checker runs it will ' + 'transit all running action executions to error if the last ' + 'heartbeat received is older than 10 * max_missed_heartbeats ' + 'seconds. If set to 0 then this feature won\'t be enabled.') + ), + cfg.IntOpt( + 'first_heartbeat_timeout', + min=0, + default=3600, + help=_('The first heartbeat is handled differently, to provide a ' + 'grace period in case there is no available executor to handle ' + 'the action execution. For example when ' + 'first_heartbeat_timeout = 3600, wait 3600 seconds before ' + 'closing the action executions that never received a heartbeat.' + ) + ) +] + coordination_opts = [ cfg.StrOpt( 'backend_url', @@ -514,6 +547,7 @@ NOTIFIER_GROUP = 'notifier' PECAN_GROUP = 'pecan' COORDINATION_GROUP = 'coordination' EXECUTION_EXPIRATION_POLICY_GROUP = 'execution_expiration_policy' +ACTION_HEARTBEAT_GROUP = 'action_heartbeat' PROFILER_GROUP = profiler.list_opts()[0][0] KEYCLOAK_OIDC_GROUP = "keycloak_oidc" OPENSTACK_ACTIONS_GROUP = 'openstack_actions' @@ -536,6 +570,10 @@ CONF.register_opts( execution_expiration_policy_opts, group=EXECUTION_EXPIRATION_POLICY_GROUP ) +CONF.register_opts( + action_heartbeat_opts, + group=ACTION_HEARTBEAT_GROUP +) CONF.register_opts(event_engine_opts, group=EVENT_ENGINE_GROUP) CONF.register_opts(notifier_opts, group=NOTIFIER_GROUP) CONF.register_opts(pecan_opts, group=PECAN_GROUP) @@ -591,6 +629,7 @@ def list_opts(): (KEYCLOAK_OIDC_GROUP, keycloak_oidc_opts), (OPENSTACK_ACTIONS_GROUP, openstack_actions_opts), (YAQL_GROUP, yaql_opts), + (ACTION_HEARTBEAT_GROUP, action_heartbeat_opts), (None, default_group_opts) ] diff --git a/mistral/db/sqlalchemy/migration/alembic_migrations/versions/027_add_last_heartbeat_to_action_execution.py b/mistral/db/sqlalchemy/migration/alembic_migrations/versions/027_add_last_heartbeat_to_action_execution.py new file mode 100644 index 000000000..0bf76ece5 --- /dev/null +++ b/mistral/db/sqlalchemy/migration/alembic_migrations/versions/027_add_last_heartbeat_to_action_execution.py @@ -0,0 +1,51 @@ +# Copyright 2018 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Add last_heartbeat to action execution + +Revision ID: 027 +Revises: 026 +Create Date: 2018-09-05 16:49:50.342349 + +""" + +# revision identifiers, used by Alembic. +revision = '027' +down_revision = '026' + +from alembic import op +import datetime +from mistral import utils +from oslo_config import cfg +from sqlalchemy import Column, DateTime, Boolean + +CONF = cfg.CONF + + +def upgrade(): + op.add_column( + 'action_executions_v2', + Column( + 'last_heartbeat', + DateTime, + default=lambda: utils.utc_now_sec() + datetime.timedelta( + seconds=CONF.action_heartbeat.first_heartbeat_timeout + ) + ) + ) + op.add_column( + 'action_executions_v2', + Column('is_sync', Boolean, default=None, nullable=True) + ) diff --git a/mistral/db/v2/api.py b/mistral/db/v2/api.py index 98738bd17..eb81c50ce 100644 --- a/mistral/db/v2/api.py +++ b/mistral/db/v2/api.py @@ -211,8 +211,8 @@ def delete_action_definitions(**kwargs): # Action executions. -def get_action_execution(id, fields=()): - return IMPL.get_action_execution(id, fields=fields) +def get_action_execution(id, fields=(), insecure=False): + return IMPL.get_action_execution(id, fields=fields, insecure=insecure) def load_action_execution(name, fields=()): @@ -228,8 +228,8 @@ def create_action_execution(values): return IMPL.create_action_execution(values) -def update_action_execution(id, values): - return IMPL.update_action_execution(id, values) +def update_action_execution(id, values, insecure=False): + return IMPL.update_action_execution(id, values, insecure) def create_or_update_action_execution(id, values): @@ -413,6 +413,10 @@ def get_expired_executions(expiration_time, limit=None, columns=()): ) +def get_running_expired_sync_actions(expiration_time, session=None): + return IMPL.get_running_expired_sync_actions(expiration_time) + + def get_superfluous_executions(max_finished_executions, limit=None, columns=()): return IMPL.get_superfluous_executions( diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index 4270d974b..98a41e056 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -669,8 +669,9 @@ def delete_action_definitions(session=None, **kwargs): # Action executions. @b.session_aware() -def get_action_execution(id, fields=(), session=None): - a_ex = _get_db_object_by_id(models.ActionExecution, id, columns=fields) +def get_action_execution(id, insecure=False, fields=(), session=None): + a_ex = _get_db_object_by_id(models.ActionExecution, id, insecure=insecure, + columns=fields) if not a_ex: raise exc.DBEntityNotFoundError( @@ -707,8 +708,8 @@ def create_action_execution(values, session=None): @b.session_aware() -def update_action_execution(id, values, session=None): - a_ex = get_action_execution(id) +def update_action_execution(id, values, insecure=False, session=None): + a_ex = get_action_execution(id, insecure) a_ex.update(values.copy()) @@ -1098,6 +1099,18 @@ def get_expired_executions(expiration_time, limit=None, columns=(), return query.all() +@b.session_aware() +def get_running_expired_sync_actions(expiration_time, session=None): + query = b.model_query(models.ActionExecution) + query = query.filter( + models.ActionExecution.last_heartbeat < expiration_time + ) + query = query.filter_by(is_sync=True) + query = query.filter(models.ActionExecution.state == states.RUNNING) + + return query.all() + + @b.session_aware() def get_superfluous_executions(max_finished_executions, limit=None, columns=(), session=None): diff --git a/mistral/db/v2/sqlalchemy/models.py b/mistral/db/v2/sqlalchemy/models.py index b83f554ce..1f54eec1c 100644 --- a/mistral/db/v2/sqlalchemy/models.py +++ b/mistral/db/v2/sqlalchemy/models.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime import hashlib import json import sys @@ -33,6 +34,7 @@ from mistral import utils # Definition objects. +CONF = cfg.CONF LOG = logging.getLogger(__name__) @@ -195,6 +197,13 @@ class ActionExecution(Execution): accepted = sa.Column(sa.Boolean(), default=False) input = sa.Column(st.JsonLongDictType(), nullable=True) output = sa.orm.deferred(sa.Column(st.JsonLongDictType(), nullable=True)) + last_heartbeat = sa.Column( + sa.DateTime, + default=lambda: utils.utc_now_sec() + datetime.timedelta( + seconds=CONF.action_heartbeat.first_heartbeat_timeout + ) + ) + is_sync = sa.Column(sa.Boolean(), default=None, nullable=True) class WorkflowExecution(Execution): diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py index 014b59b87..8e5affea9 100644 --- a/mistral/engine/actions.py +++ b/mistral/engine/actions.py @@ -150,7 +150,7 @@ class Action(object): """ return True - def _create_action_execution(self, input_dict, runtime_ctx, + def _create_action_execution(self, input_dict, runtime_ctx, is_sync, desc='', action_ex_id=None): action_ex_id = action_ex_id or utils.generate_unicode_uuid() @@ -161,7 +161,8 @@ class Action(object): 'state': states.RUNNING, 'input': input_dict, 'runtime_context': runtime_ctx, - 'description': desc + 'description': desc, + 'is_sync': is_sync } if self.task_ex: @@ -246,6 +247,7 @@ class PythonAction(Action): self._create_action_execution( self._prepare_input(input_dict), self._prepare_runtime_context(index, safe_rerun), + self.is_sync(input_dict), desc=desc, action_ex_id=action_ex_id ) @@ -278,6 +280,7 @@ class PythonAction(Action): self._create_action_execution( input_dict, runtime_ctx, + self.is_sync(input_dict), desc=desc, action_ex_id=action_ex_id ) diff --git a/mistral/engine/base.py b/mistral/engine/base.py index aa6d2c16b..21c6f559c 100644 --- a/mistral/engine/base.py +++ b/mistral/engine/base.py @@ -131,6 +131,14 @@ class Engine(object): """ raise NotImplementedError + @abc.abstractmethod + def report_running_actions(self, action_ex_ids): + """Receives the heartbeat about the running actions. + + :param action_ex_ids: The action execution ids. + """ + raise NotImplementedError + @six.add_metaclass(abc.ABCMeta) class TaskPolicy(object): diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index 6ec53e07d..fc074cc83 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -16,6 +16,7 @@ # limitations under the License. from oslo_config import cfg +from oslo_log import log as logging from osprofiler import profiler from mistral.db import utils as db_utils @@ -34,6 +35,8 @@ from mistral.workflow import states # options required at top level of this __init__.py are not imported before # the submodules are referenced. +LOG = logging.getLogger(__name__) + class DefaultEngine(base.Engine): @db_utils.retry_on_db_error @@ -122,6 +125,7 @@ class DefaultEngine(base.Engine): 'input': action_input, 'output': output.to_dict(), 'state': state, + 'is_sync': is_action_sync } return db_api.create_action_execution(values) @@ -201,3 +205,22 @@ class DefaultEngine(base.Engine): def rollback_workflow(self, wf_ex_id): # TODO(rakhmerov): Implement. raise NotImplementedError + + @db_utils.retry_on_db_error + @action_queue.process + def report_running_actions(self, action_ex_ids): + with db_api.transaction(): + now = u.utc_now_sec() + for exec_id in action_ex_ids: + try: + db_api.update_action_execution( + exec_id, + {"last_heartbeat": now}, + insecure=True + ) + except exceptions.DBEntityNotFoundError: + LOG.debug("Action execution heartbeat update failed. {}" + .format(exec_id), exc_info=True) + # Ignore this error and continue with the + # remaining ids. + pass diff --git a/mistral/engine/engine_server.py b/mistral/engine/engine_server.py index c843da784..7cf88aad3 100644 --- a/mistral/engine/engine_server.py +++ b/mistral/engine/engine_server.py @@ -19,6 +19,7 @@ from mistral.db.v2 import api as db_api from mistral.engine import default_engine from mistral.rpc import base as rpc from mistral.service import base as service_base +from mistral.services import action_execution_checker from mistral.services import expiration_policy from mistral.services import scheduler from mistral import utils @@ -50,6 +51,7 @@ class EngineServer(service_base.MistralService): self._scheduler = scheduler.start() self._expiration_policy_tg = expiration_policy.setup() + action_execution_checker.setup() if self._setup_profiler: profiler_utils.setup('mistral-engine', cfg.CONF.engine.host) @@ -258,6 +260,19 @@ class EngineServer(service_base.MistralService): return self.engine.rollback_workflow(wf_ex_id) + def report_running_actions(self, rpc_ctx, action_ex_ids): + """Receives calls over RPC to receive action execution heartbeats. + + :param rpc_ctx: RPC request context. + :param action_ex_ids: Action execution ids. + """ + LOG.info( + "Received RPC request 'report_running_actions'[action_ex_ids=%s]", + action_ex_ids + ) + + return self.engine.report_running_actions(action_ex_ids) + def get_oslo_service(setup_profiler=True): return EngineServer( diff --git a/mistral/executors/executor_server.py b/mistral/executors/executor_server.py index a07e3a770..767ff4001 100644 --- a/mistral/executors/executor_server.py +++ b/mistral/executors/executor_server.py @@ -18,9 +18,11 @@ from mistral import config as cfg from mistral.executors import default_executor as exe from mistral.rpc import base as rpc from mistral.service import base as service_base +from mistral.services import action_execution_reporter from mistral import utils from mistral.utils import profiler as profiler_utils +CONF = cfg.CONF LOG = logging.getLogger(__name__) @@ -37,10 +39,15 @@ class ExecutorServer(service_base.MistralService): self.executor = executor self._rpc_server = None + self._reporter = None + self._aer = None def start(self): super(ExecutorServer, self).start() + self._aer = action_execution_reporter.ActionExecutionReporter(CONF) + self._reporter = action_execution_reporter.setup(self._aer) + if self._setup_profiler: profiler_utils.setup('mistral-executor', cfg.CONF.executor.host) @@ -56,6 +63,9 @@ class ExecutorServer(service_base.MistralService): def stop(self, graceful=False): super(ExecutorServer, self).stop(graceful) + if self._reporter: + self._reporter.stop(graceful) + if self._rpc_server: self._rpc_server.stop(graceful) @@ -90,16 +100,21 @@ class ExecutorServer(service_base.MistralService): redelivered = rpc_ctx.redelivered or False - return self.executor.run_action( - action_ex_id, - action_cls_str, - action_cls_attrs, - params, - safe_rerun, - execution_context, - redelivered, - timeout=timeout - ) + try: + self._aer.add_action_ex_id(action_ex_id) + + return self.executor.run_action( + action_ex_id, + action_cls_str, + action_cls_attrs, + params, + safe_rerun, + execution_context, + redelivered, + timeout=timeout + ) + finally: + self._aer.remove_action_ex_id(action_ex_id) def get_oslo_service(setup_profiler=True): diff --git a/mistral/rpc/clients.py b/mistral/rpc/clients.py index 3760f7bf7..bcce1835d 100644 --- a/mistral/rpc/clients.py +++ b/mistral/rpc/clients.py @@ -320,6 +320,18 @@ class EngineClient(eng.Engine): wf_ex_id=wf_ex_id ) + @base.wrap_messaging_exception + def report_running_actions(self, action_ex_ids): + """Receives action execution heartbeats. + + :param action_ex_ids: Action execution ids. + """ + return self._client.async_call( + auth_ctx.ctx(), + 'report_running_actions', + action_ex_ids=action_ex_ids + ) + class ExecutorClient(exe.Executor): """RPC Executor client.""" diff --git a/mistral/services/action_execution_checker.py b/mistral/services/action_execution_checker.py new file mode 100644 index 000000000..2b69299cd --- /dev/null +++ b/mistral/services/action_execution_checker.py @@ -0,0 +1,83 @@ +# Copyright 2018 Nokia Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime + +from mistral.db import utils as db_utils +from mistral.db.v2 import api as db_api +from mistral.engine import action_handler +from mistral.engine import action_queue +from mistral.services import scheduler +from mistral import utils +from mistral_lib import actions as mistral_lib +from oslo_config import cfg +from oslo_log import log as logging + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF +SCHEDULER_KEY = 'handle_expired_actions_key' + + +@db_utils.retry_on_db_error +@action_queue.process +def handle_expired_actions(): + LOG.debug("Running heartbeat checker...") + + try: + interval = CONF.action_heartbeat.check_interval + max_missed = CONF.action_heartbeat.max_missed_heartbeats + exp_date = utils.utc_now_sec() - datetime.timedelta( + seconds=max_missed * interval + ) + + with db_api.transaction(): + action_exs = db_api.get_running_expired_sync_actions(exp_date) + LOG.debug("Found {} running and expired actions.".format( + len(action_exs)) + ) + if action_exs: + LOG.info("Actions executions to transit to error, because " + "heartbeat wasn't received: {}".format(action_exs)) + for action_ex in action_exs: + result = mistral_lib.Result( + error="Heartbeat wasn't received." + ) + action_handler.on_action_complete(action_ex, result) + finally: + schedule(interval) + + +def setup(): + interval = CONF.action_heartbeat.check_interval + max_missed = CONF.action_heartbeat.max_missed_heartbeats + enabled = interval and max_missed + if not enabled: + LOG.info("Action heartbeat reporting disabled.") + return + + wait_time = interval * max_missed + LOG.debug("First run of action execution checker, wait before " + "checking to make sure executors have time to send " + "heartbeats. ({} seconds)".format(wait_time)) + + schedule(wait_time) + + +def schedule(run_after): + scheduler.schedule_call( + None, + 'mistral.services.action_execution_checker.handle_expired_actions', + run_after=run_after, + key=SCHEDULER_KEY + ) diff --git a/mistral/services/action_execution_reporter.py b/mistral/services/action_execution_reporter.py new file mode 100644 index 000000000..d6852d5a7 --- /dev/null +++ b/mistral/services/action_execution_reporter.py @@ -0,0 +1,93 @@ +# Copyright 2018 Nokia Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg +from oslo_log import log as logging +from oslo_service import periodic_task +from oslo_service import threadgroup + +from mistral import context as auth_ctx +from mistral.rpc import clients as rpc + +LOG = logging.getLogger(__name__) + +CONF = cfg.CONF + + +class ActionExecutionReporter(periodic_task.PeriodicTasks): + """The reporter that reports the running action executions.""" + + def __init__(self, conf): + super(ActionExecutionReporter, self).__init__(conf) + self._engine_client = rpc.get_engine_client() + self._running_actions = set() + + self.interval = CONF.action_heartbeat.check_interval + self.max_missed = CONF.action_heartbeat.max_missed_heartbeats + self.enabled = self.interval and self.max_missed + + _periodic_task = periodic_task.periodic_task( + spacing=self.interval, + run_immediately=True + ) + self.add_periodic_task( + _periodic_task(report) + ) + + def add_action_ex_id(self, action_ex_id): + # With run-action there is no actions_ex_id assigned + if action_ex_id and self.enabled: + self._engine_client.report_running_actions([action_ex_id]) + self._running_actions.add(action_ex_id) + + def remove_action_ex_id(self, action_ex_id): + if action_ex_id and self.enabled: + self._running_actions.discard(action_ex_id) + + +def report(reporter, ctx): + LOG.debug("Running heartbeat reporter...") + + if not reporter._running_actions: + return + + auth_ctx.set_ctx(ctx) + reporter._engine_client.report_running_actions(reporter._running_actions) + + +def setup(action_execution_reporter): + interval = CONF.action_heartbeat.check_interval + max_missed = CONF.action_heartbeat.max_missed_heartbeats + enabled = interval and max_missed + if not enabled: + LOG.info("Action heartbeat reporting disabled.") + return None + + tg = threadgroup.ThreadGroup() + + ctx = auth_ctx.MistralContext( + user=None, + tenant=None, + auth_token=None, + is_admin=True + ) + + tg.add_dynamic_timer( + action_execution_reporter.run_periodic_tasks, + initial_delay=None, + periodic_interval_max=1, + context=ctx + ) + + return tg diff --git a/mistral/tests/unit/engine/test_default_engine.py b/mistral/tests/unit/engine/test_default_engine.py index 31dedaf60..8bc514759 100644 --- a/mistral/tests/unit/engine/test_default_engine.py +++ b/mistral/tests/unit/engine/test_default_engine.py @@ -625,6 +625,41 @@ class DefaultEngineTest(base.DbTestCase): # TODO(akhmerov): Implement. pass + def test_report_running_actions(self): + wf_input = {'param1': 'Hey', 'param2': 'Hi'} + + # Start workflow. + wf_ex = self.engine.start_workflow( + 'wb.wf', + '', + wf_input=wf_input, + description='my execution', + task_name='task2' + ) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + task_execs = wf_ex.task_executions + + self.assertEqual(1, len(task_execs)) + + task_ex = task_execs[0] + + action_execs = db_api.get_action_executions( + task_execution_id=task_ex.id + ) + + task_action_ex = action_execs[0] + + self.engine.report_running_actions([]) + self.engine.report_running_actions([None, None]) + self.engine.report_running_actions([None, task_action_ex.id]) + + task_action_ex = db_api.get_action_execution(task_action_ex.id) + + self.assertIsNotNone(task_action_ex.last_heartbeat) + class DefaultEngineWithTransportTest(eng_test_base.EngineTestCase): def test_engine_client_remote_error(self): diff --git a/mistral/tests/unit/engine/test_with_items.py b/mistral/tests/unit/engine/test_with_items.py index 2b8609a68..6adcfbfd7 100644 --- a/mistral/tests/unit/engine/test_with_items.py +++ b/mistral/tests/unit/engine/test_with_items.py @@ -17,6 +17,7 @@ import mock from oslo_config import cfg from mistral.actions import std_actions +from mistral import config from mistral.db.v2 import api as db_api from mistral import exceptions as exc from mistral.services import workbooks as wb_service @@ -32,7 +33,9 @@ from mistral_lib import actions as actions_base # Use the set_default method to set value otherwise in certain test cases # the change in value is not permanent. -cfg.CONF.set_default('auth_enable', False, group='pecan') +cfg.CONF.set_default('auth_enable', False, group=config.PECAN_GROUP) +cfg.CONF.set_default('max_missed_heartbeats', 0, + group=config.ACTION_HEARTBEAT_GROUP) WB = """ --- diff --git a/releasenotes/notes/close-stuck-running-action-executions-b67deda65d117cee.yaml b/releasenotes/notes/close-stuck-running-action-executions-b67deda65d117cee.yaml new file mode 100644 index 000000000..d09526a3c --- /dev/null +++ b/releasenotes/notes/close-stuck-running-action-executions-b67deda65d117cee.yaml @@ -0,0 +1,5 @@ +--- +features: + - > + [`blueprint action-execution-reporting `_] + Introduced a mechanism to close action executions that stuck in RUNNING state.