Merge "Extend capabilities to clean up old executions"
This commit is contained in:
commit
66d1776f1b
@ -32,6 +32,7 @@ from osprofiler import opts as profiler
|
|||||||
from mistral import version
|
from mistral import version
|
||||||
|
|
||||||
from mistral._i18n import _
|
from mistral._i18n import _
|
||||||
|
from mistral.workflow import states
|
||||||
|
|
||||||
# Options under default group.
|
# Options under default group.
|
||||||
launch_opt = cfg.ListOpt(
|
launch_opt = cfg.ListOpt(
|
||||||
@ -445,6 +446,13 @@ execution_expiration_policy_opts = [
|
|||||||
'The default value is 0. If it is set to 0, '
|
'The default value is 0. If it is set to 0, '
|
||||||
'size of batch is total number of expired executions '
|
'size of batch is total number of expired executions '
|
||||||
'that is going to be deleted.')
|
'that is going to be deleted.')
|
||||||
|
),
|
||||||
|
cfg.ListOpt(
|
||||||
|
'ignored_states',
|
||||||
|
default=[],
|
||||||
|
help='The states that the expiration policy will filter '
|
||||||
|
'out and will not delete.'
|
||||||
|
'Valid values are, [{}]'.format(states.TERMINAL_STATES)
|
||||||
)
|
)
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -1393,17 +1393,18 @@ def get_superfluous_executions(max_finished_executions, limit=None, columns=(),
|
|||||||
def _get_completed_root_executions_query(columns):
|
def _get_completed_root_executions_query(columns):
|
||||||
query = b.model_query(models.WorkflowExecution, columns=columns)
|
query = b.model_query(models.WorkflowExecution, columns=columns)
|
||||||
|
|
||||||
# Only WorkflowExecution that are not a child of other WorkflowExecution.
|
# This is an empty list by default.
|
||||||
|
ignored_states = CONF.execution_expiration_policy.ignored_states
|
||||||
|
desired_states = states.TERMINAL_STATES - set(ignored_states)
|
||||||
|
|
||||||
|
# Only workflow executions that are not a child of
|
||||||
|
# other workflow executions.
|
||||||
query = query.filter(
|
query = query.filter(
|
||||||
models.WorkflowExecution.task_execution_id == sa.null()
|
models.WorkflowExecution.task_execution_id == sa.null()
|
||||||
)
|
)
|
||||||
|
|
||||||
query = query.filter(
|
query = query.filter(
|
||||||
models.WorkflowExecution.state.in_(
|
models.WorkflowExecution.state.in_(desired_states)
|
||||||
[states.SUCCESS,
|
|
||||||
states.ERROR,
|
|
||||||
states.CANCELLED]
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
return query
|
return query
|
||||||
|
@ -22,6 +22,7 @@ from oslo_service import threadgroup
|
|||||||
|
|
||||||
from mistral import context as auth_ctx
|
from mistral import context as auth_ctx
|
||||||
from mistral.db.v2 import api as db_api
|
from mistral.db.v2 import api as db_api
|
||||||
|
from mistral.workflow import states
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -129,10 +130,22 @@ def run_execution_expiration_policy(self, ctx):
|
|||||||
_delete_executions(batch_size, exp_time, max_executions)
|
_delete_executions(batch_size, exp_time, max_executions)
|
||||||
|
|
||||||
|
|
||||||
|
def _check_ignored_states_config():
|
||||||
|
ignored_states = CONF.execution_expiration_policy.ignored_states
|
||||||
|
|
||||||
|
for state in ignored_states:
|
||||||
|
if state not in states.TERMINAL_STATES:
|
||||||
|
raise ValueError(
|
||||||
|
'{} is not a terminal state. The valid states are [{}]'
|
||||||
|
.format(state, states.TERMINAL_STATES))
|
||||||
|
|
||||||
|
|
||||||
def setup():
|
def setup():
|
||||||
tg = threadgroup.ThreadGroup()
|
tg = threadgroup.ThreadGroup()
|
||||||
pt = ExecutionExpirationPolicy(CONF)
|
pt = ExecutionExpirationPolicy(CONF)
|
||||||
|
|
||||||
|
_check_ignored_states_config()
|
||||||
|
|
||||||
ctx = auth_ctx.MistralContext(
|
ctx = auth_ctx.MistralContext(
|
||||||
user=None,
|
user=None,
|
||||||
tenant=None,
|
tenant=None,
|
||||||
|
@ -169,6 +169,42 @@ class ExpirationPolicyTest(base.DbTestCase):
|
|||||||
|
|
||||||
self.assertEqual(0, len(execs))
|
self.assertEqual(0, len(execs))
|
||||||
|
|
||||||
|
def test_expiration_policy_for_executions_with_ignored_states(self):
|
||||||
|
_create_workflow_executions()
|
||||||
|
now = datetime.datetime.utcnow()
|
||||||
|
|
||||||
|
_set_expiration_policy_config(
|
||||||
|
evaluation_interval=1,
|
||||||
|
older_than=30,
|
||||||
|
ignored_states=['SUCCESS']
|
||||||
|
)
|
||||||
|
|
||||||
|
expiration_policy.run_execution_expiration_policy(self, ctx)
|
||||||
|
|
||||||
|
execs = db_api.get_expired_executions(now)
|
||||||
|
self.assertEqual(1, len(execs))
|
||||||
|
self.assertEqual('cancelled_not_expired', execs[0].get('id'))
|
||||||
|
|
||||||
|
_set_expiration_policy_config(
|
||||||
|
evaluation_interval=1,
|
||||||
|
older_than=30,
|
||||||
|
ignored_states=['SUCCESS', 'CANCELLED']
|
||||||
|
)
|
||||||
|
|
||||||
|
expiration_policy.run_execution_expiration_policy(self, ctx)
|
||||||
|
|
||||||
|
execs = db_api.get_expired_executions(now)
|
||||||
|
self.assertEqual(0, len(execs))
|
||||||
|
|
||||||
|
def test_expiration_policy_invalid_ignored_states(self):
|
||||||
|
_set_expiration_policy_config(
|
||||||
|
evaluation_interval=1,
|
||||||
|
older_than=30,
|
||||||
|
ignored_states=['RUNNING']
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertRaises(ValueError, expiration_policy.setup)
|
||||||
|
|
||||||
def test_deletion_of_expired_executions_with_batch_size_scenario1(self):
|
def test_deletion_of_expired_executions_with_batch_size_scenario1(self):
|
||||||
"""scenario1
|
"""scenario1
|
||||||
|
|
||||||
@ -331,7 +367,7 @@ class ExpirationPolicyTest(base.DbTestCase):
|
|||||||
|
|
||||||
|
|
||||||
def _set_expiration_policy_config(evaluation_interval, older_than, mfe=0,
|
def _set_expiration_policy_config(evaluation_interval, older_than, mfe=0,
|
||||||
batch_size=0):
|
batch_size=0, ignored_states=[]):
|
||||||
cfg.CONF.set_default(
|
cfg.CONF.set_default(
|
||||||
'evaluation_interval',
|
'evaluation_interval',
|
||||||
evaluation_interval,
|
evaluation_interval,
|
||||||
@ -352,3 +388,8 @@ def _set_expiration_policy_config(evaluation_interval, older_than, mfe=0,
|
|||||||
batch_size,
|
batch_size,
|
||||||
group='execution_expiration_policy'
|
group='execution_expiration_policy'
|
||||||
)
|
)
|
||||||
|
cfg.CONF.set_default(
|
||||||
|
'ignored_states',
|
||||||
|
ignored_states,
|
||||||
|
group='execution_expiration_policy'
|
||||||
|
)
|
||||||
|
@ -66,6 +66,8 @@ _VALID_TRANSITIONS = {
|
|||||||
ERROR: [RUNNING]
|
ERROR: [RUNNING]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TERMINAL_STATES = {SUCCESS, ERROR, CANCELLED}
|
||||||
|
|
||||||
|
|
||||||
def is_valid(state):
|
def is_valid(state):
|
||||||
return state in _ALL
|
return state in _ALL
|
||||||
|
Loading…
Reference in New Issue
Block a user