Extend capabilities to clean up old executions
* Added a configuration option to the expiration policy to filter out workflow states. Closes-Bug: #1796627 Change-Id: Ife49e6da1d7d52a3f50f1628d808d4c65a22cad9
This commit is contained in:
parent
1c7e242975
commit
6b862e625e
@ -32,6 +32,7 @@ from osprofiler import opts as profiler
|
||||
from mistral import version
|
||||
|
||||
from mistral._i18n import _
|
||||
from mistral.workflow import states
|
||||
|
||||
# Options under default group.
|
||||
launch_opt = cfg.ListOpt(
|
||||
@ -445,6 +446,13 @@ execution_expiration_policy_opts = [
|
||||
'The default value is 0. If it is set to 0, '
|
||||
'size of batch is total number of expired executions '
|
||||
'that is going to be deleted.')
|
||||
),
|
||||
cfg.ListOpt(
|
||||
'ignored_states',
|
||||
default=[],
|
||||
help='The states that the expiration policy will filter '
|
||||
'out and will not delete.'
|
||||
'Valid values are, [{}]'.format(states.TERMINAL_STATES)
|
||||
)
|
||||
]
|
||||
|
||||
|
@ -1393,17 +1393,18 @@ def get_superfluous_executions(max_finished_executions, limit=None, columns=(),
|
||||
def _get_completed_root_executions_query(columns):
|
||||
query = b.model_query(models.WorkflowExecution, columns=columns)
|
||||
|
||||
# Only WorkflowExecution that are not a child of other WorkflowExecution.
|
||||
# This is an empty list by default.
|
||||
ignored_states = CONF.execution_expiration_policy.ignored_states
|
||||
desired_states = states.TERMINAL_STATES - set(ignored_states)
|
||||
|
||||
# Only workflow executions that are not a child of
|
||||
# other workflow executions.
|
||||
query = query.filter(
|
||||
models.WorkflowExecution.task_execution_id == sa.null()
|
||||
)
|
||||
|
||||
query = query.filter(
|
||||
models.WorkflowExecution.state.in_(
|
||||
[states.SUCCESS,
|
||||
states.ERROR,
|
||||
states.CANCELLED]
|
||||
)
|
||||
models.WorkflowExecution.state.in_(desired_states)
|
||||
)
|
||||
|
||||
return query
|
||||
|
@ -22,6 +22,7 @@ from oslo_service import threadgroup
|
||||
|
||||
from mistral import context as auth_ctx
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.workflow import states
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -129,10 +130,22 @@ def run_execution_expiration_policy(self, ctx):
|
||||
_delete_executions(batch_size, exp_time, max_executions)
|
||||
|
||||
|
||||
def _check_ignored_states_config():
|
||||
ignored_states = CONF.execution_expiration_policy.ignored_states
|
||||
|
||||
for state in ignored_states:
|
||||
if state not in states.TERMINAL_STATES:
|
||||
raise ValueError(
|
||||
'{} is not a terminal state. The valid states are [{}]'
|
||||
.format(state, states.TERMINAL_STATES))
|
||||
|
||||
|
||||
def setup():
|
||||
tg = threadgroup.ThreadGroup()
|
||||
pt = ExecutionExpirationPolicy(CONF)
|
||||
|
||||
_check_ignored_states_config()
|
||||
|
||||
ctx = auth_ctx.MistralContext(
|
||||
user=None,
|
||||
tenant=None,
|
||||
|
@ -169,6 +169,42 @@ class ExpirationPolicyTest(base.DbTestCase):
|
||||
|
||||
self.assertEqual(0, len(execs))
|
||||
|
||||
def test_expiration_policy_for_executions_with_ignored_states(self):
|
||||
_create_workflow_executions()
|
||||
now = datetime.datetime.utcnow()
|
||||
|
||||
_set_expiration_policy_config(
|
||||
evaluation_interval=1,
|
||||
older_than=30,
|
||||
ignored_states=['SUCCESS']
|
||||
)
|
||||
|
||||
expiration_policy.run_execution_expiration_policy(self, ctx)
|
||||
|
||||
execs = db_api.get_expired_executions(now)
|
||||
self.assertEqual(1, len(execs))
|
||||
self.assertEqual('cancelled_not_expired', execs[0].get('id'))
|
||||
|
||||
_set_expiration_policy_config(
|
||||
evaluation_interval=1,
|
||||
older_than=30,
|
||||
ignored_states=['SUCCESS', 'CANCELLED']
|
||||
)
|
||||
|
||||
expiration_policy.run_execution_expiration_policy(self, ctx)
|
||||
|
||||
execs = db_api.get_expired_executions(now)
|
||||
self.assertEqual(0, len(execs))
|
||||
|
||||
def test_expiration_policy_invalid_ignored_states(self):
|
||||
_set_expiration_policy_config(
|
||||
evaluation_interval=1,
|
||||
older_than=30,
|
||||
ignored_states=['RUNNING']
|
||||
)
|
||||
|
||||
self.assertRaises(ValueError, expiration_policy.setup)
|
||||
|
||||
def test_deletion_of_expired_executions_with_batch_size_scenario1(self):
|
||||
"""scenario1
|
||||
|
||||
@ -331,7 +367,7 @@ class ExpirationPolicyTest(base.DbTestCase):
|
||||
|
||||
|
||||
def _set_expiration_policy_config(evaluation_interval, older_than, mfe=0,
|
||||
batch_size=0):
|
||||
batch_size=0, ignored_states=[]):
|
||||
cfg.CONF.set_default(
|
||||
'evaluation_interval',
|
||||
evaluation_interval,
|
||||
@ -352,3 +388,8 @@ def _set_expiration_policy_config(evaluation_interval, older_than, mfe=0,
|
||||
batch_size,
|
||||
group='execution_expiration_policy'
|
||||
)
|
||||
cfg.CONF.set_default(
|
||||
'ignored_states',
|
||||
ignored_states,
|
||||
group='execution_expiration_policy'
|
||||
)
|
||||
|
@ -66,6 +66,8 @@ _VALID_TRANSITIONS = {
|
||||
ERROR: [RUNNING]
|
||||
}
|
||||
|
||||
TERMINAL_STATES = {SUCCESS, ERROR, CANCELLED}
|
||||
|
||||
|
||||
def is_valid(state):
|
||||
return state in _ALL
|
||||
|
Loading…
Reference in New Issue
Block a user