diff --git a/mistral/config.py b/mistral/config.py index 5a592004a..2a7208316 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -32,6 +32,7 @@ from osprofiler import opts as profiler from mistral import version from mistral._i18n import _ +from mistral.workflow import states # Options under default group. launch_opt = cfg.ListOpt( @@ -445,6 +446,13 @@ execution_expiration_policy_opts = [ 'The default value is 0. If it is set to 0, ' 'size of batch is total number of expired executions ' 'that is going to be deleted.') + ), + cfg.ListOpt( + 'ignored_states', + default=[], + help='The states that the expiration policy will filter ' + 'out and will not delete.' + 'Valid values are, [{}]'.format(states.TERMINAL_STATES) ) ] diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index 951ad4e13..03aafcf05 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -1393,17 +1393,18 @@ def get_superfluous_executions(max_finished_executions, limit=None, columns=(), def _get_completed_root_executions_query(columns): query = b.model_query(models.WorkflowExecution, columns=columns) - # Only WorkflowExecution that are not a child of other WorkflowExecution. + # This is an empty list by default. + ignored_states = CONF.execution_expiration_policy.ignored_states + desired_states = states.TERMINAL_STATES - set(ignored_states) + + # Only workflow executions that are not a child of + # other workflow executions. query = query.filter( models.WorkflowExecution.task_execution_id == sa.null() ) query = query.filter( - models.WorkflowExecution.state.in_( - [states.SUCCESS, - states.ERROR, - states.CANCELLED] - ) + models.WorkflowExecution.state.in_(desired_states) ) return query diff --git a/mistral/services/expiration_policy.py b/mistral/services/expiration_policy.py index a99a2ec49..a71875bfd 100644 --- a/mistral/services/expiration_policy.py +++ b/mistral/services/expiration_policy.py @@ -22,6 +22,7 @@ from oslo_service import threadgroup from mistral import context as auth_ctx from mistral.db.v2 import api as db_api +from mistral.workflow import states LOG = logging.getLogger(__name__) @@ -129,10 +130,22 @@ def run_execution_expiration_policy(self, ctx): _delete_executions(batch_size, exp_time, max_executions) +def _check_ignored_states_config(): + ignored_states = CONF.execution_expiration_policy.ignored_states + + for state in ignored_states: + if state not in states.TERMINAL_STATES: + raise ValueError( + '{} is not a terminal state. The valid states are [{}]' + .format(state, states.TERMINAL_STATES)) + + def setup(): tg = threadgroup.ThreadGroup() pt = ExecutionExpirationPolicy(CONF) + _check_ignored_states_config() + ctx = auth_ctx.MistralContext( user=None, tenant=None, diff --git a/mistral/tests/unit/services/test_expiration_policy.py b/mistral/tests/unit/services/test_expiration_policy.py index a7812f5e8..ac0cc6599 100644 --- a/mistral/tests/unit/services/test_expiration_policy.py +++ b/mistral/tests/unit/services/test_expiration_policy.py @@ -169,6 +169,42 @@ class ExpirationPolicyTest(base.DbTestCase): self.assertEqual(0, len(execs)) + def test_expiration_policy_for_executions_with_ignored_states(self): + _create_workflow_executions() + now = datetime.datetime.utcnow() + + _set_expiration_policy_config( + evaluation_interval=1, + older_than=30, + ignored_states=['SUCCESS'] + ) + + expiration_policy.run_execution_expiration_policy(self, ctx) + + execs = db_api.get_expired_executions(now) + self.assertEqual(1, len(execs)) + self.assertEqual('cancelled_not_expired', execs[0].get('id')) + + _set_expiration_policy_config( + evaluation_interval=1, + older_than=30, + ignored_states=['SUCCESS', 'CANCELLED'] + ) + + expiration_policy.run_execution_expiration_policy(self, ctx) + + execs = db_api.get_expired_executions(now) + self.assertEqual(0, len(execs)) + + def test_expiration_policy_invalid_ignored_states(self): + _set_expiration_policy_config( + evaluation_interval=1, + older_than=30, + ignored_states=['RUNNING'] + ) + + self.assertRaises(ValueError, expiration_policy.setup) + def test_deletion_of_expired_executions_with_batch_size_scenario1(self): """scenario1 @@ -331,7 +367,7 @@ class ExpirationPolicyTest(base.DbTestCase): def _set_expiration_policy_config(evaluation_interval, older_than, mfe=0, - batch_size=0): + batch_size=0, ignored_states=[]): cfg.CONF.set_default( 'evaluation_interval', evaluation_interval, @@ -352,3 +388,8 @@ def _set_expiration_policy_config(evaluation_interval, older_than, mfe=0, batch_size, group='execution_expiration_policy' ) + cfg.CONF.set_default( + 'ignored_states', + ignored_states, + group='execution_expiration_policy' + ) diff --git a/mistral/workflow/states.py b/mistral/workflow/states.py index bf9d5d64e..c02262056 100644 --- a/mistral/workflow/states.py +++ b/mistral/workflow/states.py @@ -66,6 +66,8 @@ _VALID_TRANSITIONS = { ERROR: [RUNNING] } +TERMINAL_STATES = {SUCCESS, ERROR, CANCELLED} + def is_valid(state): return state in _ALL