Cache action definitions
Now to perform some action mistral gets its definition from the database first. It's not really optimal, because if there are a lot of similar action calls, mistral will reread the same data from db. It increases the whole execution time and the load on the database. To improve the performance it's suggested to cache read definitions and take them from the cache instead of the database in the subsequent times. Cache ttl can be configured with ``action_definition_cache_time`` option from [engine] group. The default value is 60 seconds. Change-Id: I330b7cde982821d4f0a06cdd2954499ac0b7be37
This commit is contained in:
parent
7da5b880c9
commit
1ae082794a
mistral
releasenotes/notes
@ -172,6 +172,12 @@ engine_opts = [
|
||||
' will be restored automatically. If this property is'
|
||||
' set to a negative value Mistral will never be doing '
|
||||
' this check.')
|
||||
),
|
||||
cfg.IntOpt(
|
||||
'action_definition_cache_time',
|
||||
default=60,
|
||||
help=_('A number of seconds that indicates how long action '
|
||||
'definitions should be stored in the local cache.')
|
||||
)
|
||||
]
|
||||
|
||||
|
@ -32,6 +32,7 @@ from mistral.services import security
|
||||
from mistral import utils
|
||||
from mistral.utils import wf_trace
|
||||
from mistral.workflow import data_flow
|
||||
from mistral.workflow import lookup_utils
|
||||
from mistral.workflow import states
|
||||
from mistral_lib import actions as ml_actions
|
||||
|
||||
@ -367,11 +368,11 @@ class AdHocAction(PythonAction):
|
||||
wf_ctx=None):
|
||||
self.action_spec = spec_parser.get_action_spec(action_def.spec)
|
||||
|
||||
try:
|
||||
base_action_def = db_api.get_action_definition(
|
||||
self.action_spec.get_base()
|
||||
)
|
||||
except exc.DBEntityNotFoundError:
|
||||
base_action_def = lookup_utils.find_action_definition_by_name(
|
||||
self.action_spec.get_base()
|
||||
)
|
||||
|
||||
if not base_action_def:
|
||||
raise exc.InvalidActionException(
|
||||
"Failed to find action [action_name=%s]" %
|
||||
self.action_spec.get_base()
|
||||
@ -607,10 +608,14 @@ def resolve_action_definition(action_spec_name, wf_name=None,
|
||||
|
||||
action_full_name = "%s.%s" % (wb_name, action_spec_name)
|
||||
|
||||
action_db = db_api.load_action_definition(action_full_name)
|
||||
action_db = lookup_utils.find_action_definition_by_name(
|
||||
action_full_name
|
||||
)
|
||||
|
||||
if not action_db:
|
||||
action_db = db_api.load_action_definition(action_spec_name)
|
||||
action_db = lookup_utils.find_action_definition_by_name(
|
||||
action_spec_name
|
||||
)
|
||||
|
||||
if not action_db:
|
||||
raise exc.InvalidActionException(
|
||||
|
@ -12,9 +12,13 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import time
|
||||
|
||||
import cachetools
|
||||
from oslo_config import cfg
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.services import actions as action_service
|
||||
from mistral.services import workflows as wf_service
|
||||
from mistral.tests.unit.engine import base
|
||||
from mistral.workflow import lookup_utils
|
||||
@ -80,3 +84,81 @@ class LookupUtilsTest(base.EngineTestCase):
|
||||
# Expecting that the cache size is 0 because the workflow has
|
||||
# finished and invalidated corresponding cache entry.
|
||||
self.assertEqual(0, lookup_utils.get_task_execution_cache_size())
|
||||
|
||||
def test_action_definition_cache_ttl(self):
|
||||
action = """---
|
||||
version: '2.0'
|
||||
|
||||
action1:
|
||||
base: std.echo output='Hi'
|
||||
output:
|
||||
result: $
|
||||
"""
|
||||
|
||||
wf_text = """---
|
||||
version: '2.0'
|
||||
|
||||
wf:
|
||||
tasks:
|
||||
task1:
|
||||
action: action1
|
||||
on-success: join_task
|
||||
|
||||
task2:
|
||||
action: action1
|
||||
on-success: join_task
|
||||
|
||||
join_task:
|
||||
join: all
|
||||
on-success: task4
|
||||
|
||||
task4:
|
||||
action: action1
|
||||
pause-before: true
|
||||
"""
|
||||
|
||||
wf_service.create_workflows(wf_text)
|
||||
|
||||
# Create an action.
|
||||
db_actions = action_service.create_actions(action)
|
||||
|
||||
self.assertEqual(1, len(db_actions))
|
||||
self._assert_single_item(db_actions, name='action1')
|
||||
|
||||
# Explicitly mark the action to be deleted after the test execution.
|
||||
self.addCleanup(db_api.delete_action_definitions, name='action1')
|
||||
|
||||
# Reinitialise the cache with reduced action_definition_cache_time
|
||||
# to make the test faster.
|
||||
# Save the existing cache into a temporary variable and restore
|
||||
# the value when the test passed.
|
||||
old_cache = lookup_utils._ACTION_DEF_CACHE
|
||||
lookup_utils._ACTION_DEF_CACHE = cachetools.TTLCache(
|
||||
maxsize=1000,
|
||||
ttl=5 # 5 seconds
|
||||
)
|
||||
self.addCleanup(setattr, lookup_utils, '_ACTION_DEF_CACHE', old_cache)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('wf')
|
||||
|
||||
self.await_workflow_paused(wf_ex.id)
|
||||
|
||||
# Check that 'action1' 'echo' and 'noop' are cached.
|
||||
self.assertEqual(3, lookup_utils.get_action_definition_cache_size())
|
||||
self.assertIn('action1', lookup_utils._ACTION_DEF_CACHE)
|
||||
self.assertIn('std.noop', lookup_utils._ACTION_DEF_CACHE)
|
||||
self.assertIn('std.echo', lookup_utils._ACTION_DEF_CACHE)
|
||||
|
||||
# Wait some time until cache expires
|
||||
time.sleep(7)
|
||||
self.assertEqual(0, lookup_utils.get_action_definition_cache_size())
|
||||
|
||||
self.engine.resume_workflow(wf_ex.id)
|
||||
|
||||
self.await_workflow_success(wf_ex.id)
|
||||
|
||||
# Check all actions are cached again.
|
||||
self.assertEqual(2, lookup_utils.get_action_definition_cache_size())
|
||||
self.assertIn('action1', lookup_utils._ACTION_DEF_CACHE)
|
||||
self.assertIn('std.echo', lookup_utils._ACTION_DEF_CACHE)
|
||||
|
@ -29,13 +29,18 @@ Mostly, they are useful for doing any kind of fast lookups with in order
|
||||
to make some decision based on their state.
|
||||
"""
|
||||
|
||||
import cachetools
|
||||
import threading
|
||||
|
||||
import cachetools
|
||||
from oslo_config import cfg
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.workflow import states
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
def _create_lru_cache_for_workflow_execution(wf_ex_id):
|
||||
return cachetools.LRUCache(maxsize=500)
|
||||
|
||||
@ -49,7 +54,33 @@ _TASK_EX_CACHE = cachetools.LRUCache(
|
||||
missing=_create_lru_cache_for_workflow_execution
|
||||
)
|
||||
|
||||
_CACHE_LOCK = threading.RLock()
|
||||
_ACTION_DEF_CACHE = cachetools.TTLCache(
|
||||
maxsize=1000,
|
||||
ttl=CONF.engine.action_definition_cache_time # 60 seconds by default
|
||||
)
|
||||
|
||||
_TASK_EX_CACHE_LOCK = threading.RLock()
|
||||
_ACTION_DEF_CACHE_LOCK = threading.RLock()
|
||||
|
||||
|
||||
def find_action_definition_by_name(action_name):
|
||||
"""Find action definition name.
|
||||
|
||||
:param action_name: Action name.
|
||||
:return: Action definition (possibly a cached value).
|
||||
"""
|
||||
with _ACTION_DEF_CACHE_LOCK:
|
||||
action_definition = _ACTION_DEF_CACHE.get(action_name)
|
||||
|
||||
if action_definition:
|
||||
return action_definition
|
||||
|
||||
action_definition = db_api.load_action_definition(action_name)
|
||||
|
||||
with _ACTION_DEF_CACHE_LOCK:
|
||||
_ACTION_DEF_CACHE[action_name] = action_definition
|
||||
|
||||
return action_definition
|
||||
|
||||
|
||||
def find_task_executions_by_name(wf_ex_id, task_name):
|
||||
@ -59,7 +90,7 @@ def find_task_executions_by_name(wf_ex_id, task_name):
|
||||
:param task_name: Task name.
|
||||
:return: Task executions (possibly a cached value).
|
||||
"""
|
||||
with _CACHE_LOCK:
|
||||
with _TASK_EX_CACHE_LOCK:
|
||||
t_execs = _TASK_EX_CACHE[wf_ex_id].get(task_name)
|
||||
|
||||
if t_execs:
|
||||
@ -78,7 +109,7 @@ def find_task_executions_by_name(wf_ex_id, task_name):
|
||||
)
|
||||
|
||||
if all_finished:
|
||||
with _CACHE_LOCK:
|
||||
with _TASK_EX_CACHE_LOCK:
|
||||
_TASK_EX_CACHE[wf_ex_id][task_name] = t_execs
|
||||
|
||||
return t_execs
|
||||
@ -124,12 +155,19 @@ def get_task_execution_cache_size():
|
||||
return len(_TASK_EX_CACHE)
|
||||
|
||||
|
||||
def get_action_definition_cache_size():
|
||||
return len(_ACTION_DEF_CACHE)
|
||||
|
||||
|
||||
def invalidate_cached_task_executions(wf_ex_id):
|
||||
with _CACHE_LOCK:
|
||||
with _TASK_EX_CACHE_LOCK:
|
||||
if wf_ex_id in _TASK_EX_CACHE:
|
||||
del _TASK_EX_CACHE[wf_ex_id]
|
||||
|
||||
|
||||
def clear_caches():
|
||||
with _CACHE_LOCK:
|
||||
with _TASK_EX_CACHE_LOCK:
|
||||
_TASK_EX_CACHE.clear()
|
||||
|
||||
with _ACTION_DEF_CACHE_LOCK:
|
||||
_ACTION_DEF_CACHE.clear()
|
||||
|
@ -0,0 +1,9 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Enable caching of action definitions in local memory. Now, instead of
|
||||
downloading the definitions from the database every time, mistral engine
|
||||
will store them in a local cache. This should reduce the number of
|
||||
database requests and improve the whole performance of the the system.
|
||||
Cache ttl can be configured with ``action_definition_cache_time`` option
|
||||
from [engine] group. The default value is 60 seconds.
|
Loading…
x
Reference in New Issue
Block a user