From ce18c0bf67579db0d105e7c703622b2c1e28adf7 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Kupai=20J=C3=B3zsef?= <jozsef.kupai@nokia.com>
Date: Mon, 7 May 2018 14:50:19 +0200
Subject: [PATCH] A mechanism to close stuck running action executions

If an executor dies while running an action execution, then the
execution will remain in RUNNING state (because the dead executor
can't signal the error).

Implements blueprint: action-execution-reporting

Change-Id: I51b4db6aa321d0e53bbb85a74f8ebaea0376d22e
---
 doc/source/configuration/config-guide.rst     | 37 ++++++++
 mistral/config.py                             | 55 +++++++++--
 ..._add_last_heartbeat_to_action_execution.py | 51 ++++++++++
 mistral/db/v2/api.py                          | 12 ++-
 mistral/db/v2/sqlalchemy/api.py               | 21 ++++-
 mistral/db/v2/sqlalchemy/models.py            |  9 ++
 mistral/engine/actions.py                     |  7 +-
 mistral/engine/base.py                        |  8 ++
 mistral/engine/default_engine.py              | 23 +++++
 mistral/engine/engine_server.py               | 15 +++
 mistral/executors/executor_server.py          | 35 +++++--
 mistral/rpc/clients.py                        | 12 +++
 mistral/services/action_execution_checker.py  | 83 +++++++++++++++++
 mistral/services/action_execution_reporter.py | 93 +++++++++++++++++++
 .../tests/unit/engine/test_default_engine.py  | 35 +++++++
 mistral/tests/unit/engine/test_with_items.py  |  5 +-
 ...ng-action-executions-b67deda65d117cee.yaml |  5 +
 17 files changed, 477 insertions(+), 29 deletions(-)
 create mode 100644 mistral/db/sqlalchemy/migration/alembic_migrations/versions/027_add_last_heartbeat_to_action_execution.py
 create mode 100644 mistral/services/action_execution_checker.py
 create mode 100644 mistral/services/action_execution_reporter.py
 create mode 100644 releasenotes/notes/close-stuck-running-action-executions-b67deda65d117cee.yaml

diff --git a/doc/source/configuration/config-guide.rst b/doc/source/configuration/config-guide.rst
index 635798cf6..116dd3d7b 100644
--- a/doc/source/configuration/config-guide.rst
+++ b/doc/source/configuration/config-guide.rst
@@ -131,6 +131,43 @@ directory.
    For more details see `policy.json file
    <https://docs.openstack.org/oslo.policy/latest/admin/policy-json-file.html>`_.
 
+#. Modify the action execution reporting configuration if needed.
+
+   It is possible that actions stuck in *"RUNNING"* state, for example if the
+   assigned executor dies or the message that signals the completion of the
+   action is lost. This section describes a heartbeat based solution to close
+   these forgotten action executions. The related configuration options are
+   ``max_missed_heartbeats`` and ``check_interval``. Note that if either
+   of these options are *"0"* then the feature won't be enabled.
+
+   The default configuration is the following::
+
+     [action_heartbeat]
+     max_missed_heartbeats = 15
+     check_interval = 20
+     first_heartbeat_timeout = 3600
+
+   *"check_interval = 20"*, so check action executions every
+   20 seconds. When the checker runs it will transit all running action
+   executions to error if the last heartbeat received is older than *"20 \*
+   15"* seconds. Note that *"first_heartbeat_timeout = 3600"*, so the action
+   execution won't be closed for 3600 seconds if no heartbeat was received for
+   it.
+
+   - **max_missed_heartbeats**
+
+    Defines the maximum amount of missed heartbeats to be allowed. If the number
+    of missed heartbeats exceeds this number, then the related action execution
+    will be transited to *"ERROR"* state with cause *"Heartbeat wasn't received."*.
+
+   - **check_interval**
+
+    The interval between checks (in seconds).
+
+   - **first_heartbeat_timeout**
+
+    The grace period for the first heartbeat (in seconds).
+
 #. Finally, try to run mistral engine and verify that it is running without
    any error::
 
diff --git a/mistral/config.py b/mistral/config.py
index 3c8486409..d437f9eb5 100644
--- a/mistral/config.py
+++ b/mistral/config.py
@@ -337,7 +337,7 @@ execution_expiration_policy_opts = [
         'evaluation_interval',
         help=_('How often will the executions be evaluated '
                '(in minutes). For example for value 120 the interval '
-               'will be 2 hours (every 2 hours).'
+               'will be 2 hours (every 2 hours). '
                'Note that only final state executions will be removed: '
                '( SUCCESS / ERROR / CANCELLED ).')
     ),
@@ -351,12 +351,12 @@ execution_expiration_policy_opts = [
     cfg.IntOpt(
         'max_finished_executions',
         default=0,
-        help=_('The maximum number of finished workflow executions'
-               'to be stored. For example when max_finished_executions = 100,'
-               'only the 100 latest finished executions will be preserved.'
-               'This means that even unexpired executions are eligible'
-               'for deletion, to decrease the number of executions in the'
-               'database. The default value is 0. If it is set to 0,'
+        help=_('The maximum number of finished workflow executions '
+               'to be stored. For example when max_finished_executions = 100, '
+               'only the 100 latest finished executions will be preserved. '
+               'This means that even unexpired executions are eligible '
+               'for deletion, to decrease the number of executions in the '
+               'database. The default value is 0. If it is set to 0, '
                'this constraint won\'t be applied.')
     ),
     cfg.IntOpt(
@@ -364,11 +364,44 @@ execution_expiration_policy_opts = [
         default=0,
         help=_('Size of batch of expired executions to be deleted.'
                'The default value is 0. If it is set to 0, '
-               'size of batch is total number of expired executions'
+               'size of batch is total number of expired executions '
                'that is going to be deleted.')
     )
 ]
 
+action_heartbeat_opts = [
+    cfg.IntOpt(
+        'max_missed_heartbeats',
+        min=0,
+        default=15,
+        help=_('The maximum amount of missed heartbeats to be allowed. '
+               'If set to 0 then this feature won\'t be enabled. '
+               'See check_interval for more details.')
+    ),
+    cfg.IntOpt(
+        'check_interval',
+        min=0,
+        default=20,
+        help=_('How often the action executions are checked (in seconds). '
+               'For example when check_interval = 10, check action '
+               'executions every 10 seconds. When the checker runs it will '
+               'transit all running action executions to error if the last '
+               'heartbeat received is older than 10 * max_missed_heartbeats '
+               'seconds. If set to 0 then this feature won\'t be enabled.')
+    ),
+    cfg.IntOpt(
+        'first_heartbeat_timeout',
+        min=0,
+        default=3600,
+        help=_('The first heartbeat is handled differently, to provide a '
+               'grace period in case there is no available executor to handle '
+               'the action execution. For example when '
+               'first_heartbeat_timeout = 3600, wait 3600 seconds before '
+               'closing the action executions that never received a heartbeat.'
+               )
+    )
+]
+
 coordination_opts = [
     cfg.StrOpt(
         'backend_url',
@@ -514,6 +547,7 @@ NOTIFIER_GROUP = 'notifier'
 PECAN_GROUP = 'pecan'
 COORDINATION_GROUP = 'coordination'
 EXECUTION_EXPIRATION_POLICY_GROUP = 'execution_expiration_policy'
+ACTION_HEARTBEAT_GROUP = 'action_heartbeat'
 PROFILER_GROUP = profiler.list_opts()[0][0]
 KEYCLOAK_OIDC_GROUP = "keycloak_oidc"
 OPENSTACK_ACTIONS_GROUP = 'openstack_actions'
@@ -536,6 +570,10 @@ CONF.register_opts(
     execution_expiration_policy_opts,
     group=EXECUTION_EXPIRATION_POLICY_GROUP
 )
+CONF.register_opts(
+    action_heartbeat_opts,
+    group=ACTION_HEARTBEAT_GROUP
+)
 CONF.register_opts(event_engine_opts, group=EVENT_ENGINE_GROUP)
 CONF.register_opts(notifier_opts, group=NOTIFIER_GROUP)
 CONF.register_opts(pecan_opts, group=PECAN_GROUP)
@@ -591,6 +629,7 @@ def list_opts():
         (KEYCLOAK_OIDC_GROUP, keycloak_oidc_opts),
         (OPENSTACK_ACTIONS_GROUP, openstack_actions_opts),
         (YAQL_GROUP, yaql_opts),
+        (ACTION_HEARTBEAT_GROUP, action_heartbeat_opts),
         (None, default_group_opts)
     ]
 
diff --git a/mistral/db/sqlalchemy/migration/alembic_migrations/versions/027_add_last_heartbeat_to_action_execution.py b/mistral/db/sqlalchemy/migration/alembic_migrations/versions/027_add_last_heartbeat_to_action_execution.py
new file mode 100644
index 000000000..0bf76ece5
--- /dev/null
+++ b/mistral/db/sqlalchemy/migration/alembic_migrations/versions/027_add_last_heartbeat_to_action_execution.py
@@ -0,0 +1,51 @@
+# Copyright 2018 OpenStack Foundation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Add last_heartbeat to action execution
+
+Revision ID: 027
+Revises: 026
+Create Date: 2018-09-05 16:49:50.342349
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '027'
+down_revision = '026'
+
+from alembic import op
+import datetime
+from mistral import utils
+from oslo_config import cfg
+from sqlalchemy import Column, DateTime, Boolean
+
+CONF = cfg.CONF
+
+
+def upgrade():
+    op.add_column(
+        'action_executions_v2',
+        Column(
+            'last_heartbeat',
+            DateTime,
+            default=lambda: utils.utc_now_sec() + datetime.timedelta(
+                seconds=CONF.action_heartbeat.first_heartbeat_timeout
+            )
+        )
+    )
+    op.add_column(
+        'action_executions_v2',
+        Column('is_sync', Boolean, default=None, nullable=True)
+    )
diff --git a/mistral/db/v2/api.py b/mistral/db/v2/api.py
index 98738bd17..eb81c50ce 100644
--- a/mistral/db/v2/api.py
+++ b/mistral/db/v2/api.py
@@ -211,8 +211,8 @@ def delete_action_definitions(**kwargs):
 
 # Action executions.
 
-def get_action_execution(id, fields=()):
-    return IMPL.get_action_execution(id, fields=fields)
+def get_action_execution(id, fields=(), insecure=False):
+    return IMPL.get_action_execution(id, fields=fields, insecure=insecure)
 
 
 def load_action_execution(name, fields=()):
@@ -228,8 +228,8 @@ def create_action_execution(values):
     return IMPL.create_action_execution(values)
 
 
-def update_action_execution(id, values):
-    return IMPL.update_action_execution(id, values)
+def update_action_execution(id, values, insecure=False):
+    return IMPL.update_action_execution(id, values, insecure)
 
 
 def create_or_update_action_execution(id, values):
@@ -413,6 +413,10 @@ def get_expired_executions(expiration_time, limit=None, columns=()):
     )
 
 
+def get_running_expired_sync_actions(expiration_time, session=None):
+    return IMPL.get_running_expired_sync_actions(expiration_time)
+
+
 def get_superfluous_executions(max_finished_executions, limit=None,
                                columns=()):
     return IMPL.get_superfluous_executions(
diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py
index 4270d974b..98a41e056 100644
--- a/mistral/db/v2/sqlalchemy/api.py
+++ b/mistral/db/v2/sqlalchemy/api.py
@@ -669,8 +669,9 @@ def delete_action_definitions(session=None, **kwargs):
 # Action executions.
 
 @b.session_aware()
-def get_action_execution(id, fields=(), session=None):
-    a_ex = _get_db_object_by_id(models.ActionExecution, id, columns=fields)
+def get_action_execution(id, insecure=False, fields=(), session=None):
+    a_ex = _get_db_object_by_id(models.ActionExecution, id, insecure=insecure,
+                                columns=fields)
 
     if not a_ex:
         raise exc.DBEntityNotFoundError(
@@ -707,8 +708,8 @@ def create_action_execution(values, session=None):
 
 
 @b.session_aware()
-def update_action_execution(id, values, session=None):
-    a_ex = get_action_execution(id)
+def update_action_execution(id, values, insecure=False, session=None):
+    a_ex = get_action_execution(id, insecure)
 
     a_ex.update(values.copy())
 
@@ -1098,6 +1099,18 @@ def get_expired_executions(expiration_time, limit=None, columns=(),
     return query.all()
 
 
+@b.session_aware()
+def get_running_expired_sync_actions(expiration_time, session=None):
+    query = b.model_query(models.ActionExecution)
+    query = query.filter(
+        models.ActionExecution.last_heartbeat < expiration_time
+    )
+    query = query.filter_by(is_sync=True)
+    query = query.filter(models.ActionExecution.state == states.RUNNING)
+
+    return query.all()
+
+
 @b.session_aware()
 def get_superfluous_executions(max_finished_executions, limit=None, columns=(),
                                session=None):
diff --git a/mistral/db/v2/sqlalchemy/models.py b/mistral/db/v2/sqlalchemy/models.py
index b83f554ce..1f54eec1c 100644
--- a/mistral/db/v2/sqlalchemy/models.py
+++ b/mistral/db/v2/sqlalchemy/models.py
@@ -13,6 +13,7 @@
 #    See the License for the specific language governing permissions and
 #    limitations under the License.
 
+import datetime
 import hashlib
 import json
 import sys
@@ -33,6 +34,7 @@ from mistral import utils
 
 # Definition objects.
 
+CONF = cfg.CONF
 LOG = logging.getLogger(__name__)
 
 
@@ -195,6 +197,13 @@ class ActionExecution(Execution):
     accepted = sa.Column(sa.Boolean(), default=False)
     input = sa.Column(st.JsonLongDictType(), nullable=True)
     output = sa.orm.deferred(sa.Column(st.JsonLongDictType(), nullable=True))
+    last_heartbeat = sa.Column(
+        sa.DateTime,
+        default=lambda: utils.utc_now_sec() + datetime.timedelta(
+            seconds=CONF.action_heartbeat.first_heartbeat_timeout
+        )
+    )
+    is_sync = sa.Column(sa.Boolean(), default=None, nullable=True)
 
 
 class WorkflowExecution(Execution):
diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py
index 014b59b87..8e5affea9 100644
--- a/mistral/engine/actions.py
+++ b/mistral/engine/actions.py
@@ -150,7 +150,7 @@ class Action(object):
         """
         return True
 
-    def _create_action_execution(self, input_dict, runtime_ctx,
+    def _create_action_execution(self, input_dict, runtime_ctx, is_sync,
                                  desc='', action_ex_id=None):
         action_ex_id = action_ex_id or utils.generate_unicode_uuid()
 
@@ -161,7 +161,8 @@ class Action(object):
             'state': states.RUNNING,
             'input': input_dict,
             'runtime_context': runtime_ctx,
-            'description': desc
+            'description': desc,
+            'is_sync': is_sync
         }
 
         if self.task_ex:
@@ -246,6 +247,7 @@ class PythonAction(Action):
         self._create_action_execution(
             self._prepare_input(input_dict),
             self._prepare_runtime_context(index, safe_rerun),
+            self.is_sync(input_dict),
             desc=desc,
             action_ex_id=action_ex_id
         )
@@ -278,6 +280,7 @@ class PythonAction(Action):
             self._create_action_execution(
                 input_dict,
                 runtime_ctx,
+                self.is_sync(input_dict),
                 desc=desc,
                 action_ex_id=action_ex_id
             )
diff --git a/mistral/engine/base.py b/mistral/engine/base.py
index aa6d2c16b..21c6f559c 100644
--- a/mistral/engine/base.py
+++ b/mistral/engine/base.py
@@ -131,6 +131,14 @@ class Engine(object):
         """
         raise NotImplementedError
 
+    @abc.abstractmethod
+    def report_running_actions(self, action_ex_ids):
+        """Receives the heartbeat about the running actions.
+
+        :param action_ex_ids: The action execution ids.
+        """
+        raise NotImplementedError
+
 
 @six.add_metaclass(abc.ABCMeta)
 class TaskPolicy(object):
diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py
index 6ec53e07d..fc074cc83 100644
--- a/mistral/engine/default_engine.py
+++ b/mistral/engine/default_engine.py
@@ -16,6 +16,7 @@
 #    limitations under the License.
 
 from oslo_config import cfg
+from oslo_log import log as logging
 from osprofiler import profiler
 
 from mistral.db import utils as db_utils
@@ -34,6 +35,8 @@ from mistral.workflow import states
 # options required at top level of this  __init__.py are not imported before
 # the submodules are referenced.
 
+LOG = logging.getLogger(__name__)
+
 
 class DefaultEngine(base.Engine):
     @db_utils.retry_on_db_error
@@ -122,6 +125,7 @@ class DefaultEngine(base.Engine):
                 'input': action_input,
                 'output': output.to_dict(),
                 'state': state,
+                'is_sync': is_action_sync
             }
 
             return db_api.create_action_execution(values)
@@ -201,3 +205,22 @@ class DefaultEngine(base.Engine):
     def rollback_workflow(self, wf_ex_id):
         # TODO(rakhmerov): Implement.
         raise NotImplementedError
+
+    @db_utils.retry_on_db_error
+    @action_queue.process
+    def report_running_actions(self, action_ex_ids):
+        with db_api.transaction():
+            now = u.utc_now_sec()
+            for exec_id in action_ex_ids:
+                try:
+                    db_api.update_action_execution(
+                        exec_id,
+                        {"last_heartbeat": now},
+                        insecure=True
+                    )
+                except exceptions.DBEntityNotFoundError:
+                    LOG.debug("Action execution heartbeat update failed. {}"
+                              .format(exec_id), exc_info=True)
+                    # Ignore this error and continue with the
+                    # remaining ids.
+                    pass
diff --git a/mistral/engine/engine_server.py b/mistral/engine/engine_server.py
index c843da784..7cf88aad3 100644
--- a/mistral/engine/engine_server.py
+++ b/mistral/engine/engine_server.py
@@ -19,6 +19,7 @@ from mistral.db.v2 import api as db_api
 from mistral.engine import default_engine
 from mistral.rpc import base as rpc
 from mistral.service import base as service_base
+from mistral.services import action_execution_checker
 from mistral.services import expiration_policy
 from mistral.services import scheduler
 from mistral import utils
@@ -50,6 +51,7 @@ class EngineServer(service_base.MistralService):
 
         self._scheduler = scheduler.start()
         self._expiration_policy_tg = expiration_policy.setup()
+        action_execution_checker.setup()
 
         if self._setup_profiler:
             profiler_utils.setup('mistral-engine', cfg.CONF.engine.host)
@@ -258,6 +260,19 @@ class EngineServer(service_base.MistralService):
 
         return self.engine.rollback_workflow(wf_ex_id)
 
+    def report_running_actions(self, rpc_ctx, action_ex_ids):
+        """Receives calls over RPC to receive action execution heartbeats.
+
+        :param rpc_ctx: RPC request context.
+        :param action_ex_ids: Action execution ids.
+        """
+        LOG.info(
+            "Received RPC request 'report_running_actions'[action_ex_ids=%s]",
+            action_ex_ids
+        )
+
+        return self.engine.report_running_actions(action_ex_ids)
+
 
 def get_oslo_service(setup_profiler=True):
     return EngineServer(
diff --git a/mistral/executors/executor_server.py b/mistral/executors/executor_server.py
index a07e3a770..767ff4001 100644
--- a/mistral/executors/executor_server.py
+++ b/mistral/executors/executor_server.py
@@ -18,9 +18,11 @@ from mistral import config as cfg
 from mistral.executors import default_executor as exe
 from mistral.rpc import base as rpc
 from mistral.service import base as service_base
+from mistral.services import action_execution_reporter
 from mistral import utils
 from mistral.utils import profiler as profiler_utils
 
+CONF = cfg.CONF
 LOG = logging.getLogger(__name__)
 
 
@@ -37,10 +39,15 @@ class ExecutorServer(service_base.MistralService):
 
         self.executor = executor
         self._rpc_server = None
+        self._reporter = None
+        self._aer = None
 
     def start(self):
         super(ExecutorServer, self).start()
 
+        self._aer = action_execution_reporter.ActionExecutionReporter(CONF)
+        self._reporter = action_execution_reporter.setup(self._aer)
+
         if self._setup_profiler:
             profiler_utils.setup('mistral-executor', cfg.CONF.executor.host)
 
@@ -56,6 +63,9 @@ class ExecutorServer(service_base.MistralService):
     def stop(self, graceful=False):
         super(ExecutorServer, self).stop(graceful)
 
+        if self._reporter:
+            self._reporter.stop(graceful)
+
         if self._rpc_server:
             self._rpc_server.stop(graceful)
 
@@ -90,16 +100,21 @@ class ExecutorServer(service_base.MistralService):
 
         redelivered = rpc_ctx.redelivered or False
 
-        return self.executor.run_action(
-            action_ex_id,
-            action_cls_str,
-            action_cls_attrs,
-            params,
-            safe_rerun,
-            execution_context,
-            redelivered,
-            timeout=timeout
-        )
+        try:
+            self._aer.add_action_ex_id(action_ex_id)
+
+            return self.executor.run_action(
+                action_ex_id,
+                action_cls_str,
+                action_cls_attrs,
+                params,
+                safe_rerun,
+                execution_context,
+                redelivered,
+                timeout=timeout
+            )
+        finally:
+            self._aer.remove_action_ex_id(action_ex_id)
 
 
 def get_oslo_service(setup_profiler=True):
diff --git a/mistral/rpc/clients.py b/mistral/rpc/clients.py
index 3760f7bf7..bcce1835d 100644
--- a/mistral/rpc/clients.py
+++ b/mistral/rpc/clients.py
@@ -320,6 +320,18 @@ class EngineClient(eng.Engine):
             wf_ex_id=wf_ex_id
         )
 
+    @base.wrap_messaging_exception
+    def report_running_actions(self, action_ex_ids):
+        """Receives action execution heartbeats.
+
+        :param action_ex_ids: Action execution ids.
+        """
+        return self._client.async_call(
+            auth_ctx.ctx(),
+            'report_running_actions',
+            action_ex_ids=action_ex_ids
+        )
+
 
 class ExecutorClient(exe.Executor):
     """RPC Executor client."""
diff --git a/mistral/services/action_execution_checker.py b/mistral/services/action_execution_checker.py
new file mode 100644
index 000000000..2b69299cd
--- /dev/null
+++ b/mistral/services/action_execution_checker.py
@@ -0,0 +1,83 @@
+# Copyright 2018 Nokia Networks.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License");
+#    you may not use this file except in compliance with the License.
+#    You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+
+import datetime
+
+from mistral.db import utils as db_utils
+from mistral.db.v2 import api as db_api
+from mistral.engine import action_handler
+from mistral.engine import action_queue
+from mistral.services import scheduler
+from mistral import utils
+from mistral_lib import actions as mistral_lib
+from oslo_config import cfg
+from oslo_log import log as logging
+
+LOG = logging.getLogger(__name__)
+CONF = cfg.CONF
+SCHEDULER_KEY = 'handle_expired_actions_key'
+
+
+@db_utils.retry_on_db_error
+@action_queue.process
+def handle_expired_actions():
+    LOG.debug("Running heartbeat checker...")
+
+    try:
+        interval = CONF.action_heartbeat.check_interval
+        max_missed = CONF.action_heartbeat.max_missed_heartbeats
+        exp_date = utils.utc_now_sec() - datetime.timedelta(
+            seconds=max_missed * interval
+        )
+
+        with db_api.transaction():
+            action_exs = db_api.get_running_expired_sync_actions(exp_date)
+            LOG.debug("Found {} running and expired actions.".format(
+                len(action_exs))
+            )
+            if action_exs:
+                LOG.info("Actions executions to transit to error, because "
+                         "heartbeat wasn't received: {}".format(action_exs))
+                for action_ex in action_exs:
+                    result = mistral_lib.Result(
+                        error="Heartbeat wasn't received."
+                    )
+                    action_handler.on_action_complete(action_ex, result)
+    finally:
+        schedule(interval)
+
+
+def setup():
+    interval = CONF.action_heartbeat.check_interval
+    max_missed = CONF.action_heartbeat.max_missed_heartbeats
+    enabled = interval and max_missed
+    if not enabled:
+        LOG.info("Action heartbeat reporting disabled.")
+        return
+
+    wait_time = interval * max_missed
+    LOG.debug("First run of action execution checker, wait before "
+              "checking to make sure executors have time to send "
+              "heartbeats. ({} seconds)".format(wait_time))
+
+    schedule(wait_time)
+
+
+def schedule(run_after):
+    scheduler.schedule_call(
+        None,
+        'mistral.services.action_execution_checker.handle_expired_actions',
+        run_after=run_after,
+        key=SCHEDULER_KEY
+    )
diff --git a/mistral/services/action_execution_reporter.py b/mistral/services/action_execution_reporter.py
new file mode 100644
index 000000000..d6852d5a7
--- /dev/null
+++ b/mistral/services/action_execution_reporter.py
@@ -0,0 +1,93 @@
+# Copyright 2018 Nokia Networks.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License");
+#    you may not use this file except in compliance with the License.
+#    You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+
+from oslo_config import cfg
+from oslo_log import log as logging
+from oslo_service import periodic_task
+from oslo_service import threadgroup
+
+from mistral import context as auth_ctx
+from mistral.rpc import clients as rpc
+
+LOG = logging.getLogger(__name__)
+
+CONF = cfg.CONF
+
+
+class ActionExecutionReporter(periodic_task.PeriodicTasks):
+    """The reporter that reports the running action executions."""
+
+    def __init__(self, conf):
+        super(ActionExecutionReporter, self).__init__(conf)
+        self._engine_client = rpc.get_engine_client()
+        self._running_actions = set()
+
+        self.interval = CONF.action_heartbeat.check_interval
+        self.max_missed = CONF.action_heartbeat.max_missed_heartbeats
+        self.enabled = self.interval and self.max_missed
+
+        _periodic_task = periodic_task.periodic_task(
+            spacing=self.interval,
+            run_immediately=True
+        )
+        self.add_periodic_task(
+            _periodic_task(report)
+        )
+
+    def add_action_ex_id(self, action_ex_id):
+        # With run-action there is no actions_ex_id assigned
+        if action_ex_id and self.enabled:
+            self._engine_client.report_running_actions([action_ex_id])
+            self._running_actions.add(action_ex_id)
+
+    def remove_action_ex_id(self, action_ex_id):
+        if action_ex_id and self.enabled:
+            self._running_actions.discard(action_ex_id)
+
+
+def report(reporter, ctx):
+    LOG.debug("Running heartbeat reporter...")
+
+    if not reporter._running_actions:
+        return
+
+    auth_ctx.set_ctx(ctx)
+    reporter._engine_client.report_running_actions(reporter._running_actions)
+
+
+def setup(action_execution_reporter):
+    interval = CONF.action_heartbeat.check_interval
+    max_missed = CONF.action_heartbeat.max_missed_heartbeats
+    enabled = interval and max_missed
+    if not enabled:
+        LOG.info("Action heartbeat reporting disabled.")
+        return None
+
+    tg = threadgroup.ThreadGroup()
+
+    ctx = auth_ctx.MistralContext(
+        user=None,
+        tenant=None,
+        auth_token=None,
+        is_admin=True
+    )
+
+    tg.add_dynamic_timer(
+        action_execution_reporter.run_periodic_tasks,
+        initial_delay=None,
+        periodic_interval_max=1,
+        context=ctx
+    )
+
+    return tg
diff --git a/mistral/tests/unit/engine/test_default_engine.py b/mistral/tests/unit/engine/test_default_engine.py
index 31dedaf60..8bc514759 100644
--- a/mistral/tests/unit/engine/test_default_engine.py
+++ b/mistral/tests/unit/engine/test_default_engine.py
@@ -625,6 +625,41 @@ class DefaultEngineTest(base.DbTestCase):
         # TODO(akhmerov): Implement.
         pass
 
+    def test_report_running_actions(self):
+        wf_input = {'param1': 'Hey', 'param2': 'Hi'}
+
+        # Start workflow.
+        wf_ex = self.engine.start_workflow(
+            'wb.wf',
+            '',
+            wf_input=wf_input,
+            description='my execution',
+            task_name='task2'
+        )
+
+        with db_api.transaction():
+            wf_ex = db_api.get_workflow_execution(wf_ex.id)
+
+            task_execs = wf_ex.task_executions
+
+        self.assertEqual(1, len(task_execs))
+
+        task_ex = task_execs[0]
+
+        action_execs = db_api.get_action_executions(
+            task_execution_id=task_ex.id
+        )
+
+        task_action_ex = action_execs[0]
+
+        self.engine.report_running_actions([])
+        self.engine.report_running_actions([None, None])
+        self.engine.report_running_actions([None, task_action_ex.id])
+
+        task_action_ex = db_api.get_action_execution(task_action_ex.id)
+
+        self.assertIsNotNone(task_action_ex.last_heartbeat)
+
 
 class DefaultEngineWithTransportTest(eng_test_base.EngineTestCase):
     def test_engine_client_remote_error(self):
diff --git a/mistral/tests/unit/engine/test_with_items.py b/mistral/tests/unit/engine/test_with_items.py
index 2b8609a68..6adcfbfd7 100644
--- a/mistral/tests/unit/engine/test_with_items.py
+++ b/mistral/tests/unit/engine/test_with_items.py
@@ -17,6 +17,7 @@ import mock
 from oslo_config import cfg
 
 from mistral.actions import std_actions
+from mistral import config
 from mistral.db.v2 import api as db_api
 from mistral import exceptions as exc
 from mistral.services import workbooks as wb_service
@@ -32,7 +33,9 @@ from mistral_lib import actions as actions_base
 
 # Use the set_default method to set value otherwise in certain test cases
 # the change in value is not permanent.
-cfg.CONF.set_default('auth_enable', False, group='pecan')
+cfg.CONF.set_default('auth_enable', False, group=config.PECAN_GROUP)
+cfg.CONF.set_default('max_missed_heartbeats', 0,
+                     group=config.ACTION_HEARTBEAT_GROUP)
 
 WB = """
 ---
diff --git a/releasenotes/notes/close-stuck-running-action-executions-b67deda65d117cee.yaml b/releasenotes/notes/close-stuck-running-action-executions-b67deda65d117cee.yaml
new file mode 100644
index 000000000..d09526a3c
--- /dev/null
+++ b/releasenotes/notes/close-stuck-running-action-executions-b67deda65d117cee.yaml
@@ -0,0 +1,5 @@
+---
+features:
+  - >
+    [`blueprint action-execution-reporting <https://blueprints.launchpad.net/mistral/+spec/action-execution-reporting>`_]
+    Introduced a mechanism to close action executions that stuck in RUNNING state.