Merge "Really make the cron trigger execution interval configurable"
This commit is contained in:
commit
0727c35bc1
@ -34,66 +34,75 @@ CONF = cfg.CONF
|
||||
_periodic_tasks = {}
|
||||
|
||||
|
||||
class MistralPeriodicTasks(periodic_task.PeriodicTasks):
|
||||
@periodic_task.periodic_task(spacing=CONF.cron_trigger.execution_interval,
|
||||
run_immediately=True)
|
||||
def process_cron_triggers_v2(self, ctx):
|
||||
LOG.debug("Processing cron triggers...")
|
||||
def process_cron_triggers_v2(self, ctx):
|
||||
LOG.debug("Processing cron triggers...")
|
||||
|
||||
for trigger in triggers.get_next_cron_triggers():
|
||||
LOG.debug("Processing cron trigger: %s", trigger)
|
||||
for trigger in triggers.get_next_cron_triggers():
|
||||
LOG.debug("Processing cron trigger: %s", trigger)
|
||||
|
||||
try:
|
||||
# Setup admin context before schedule triggers.
|
||||
ctx = security.create_context(
|
||||
trigger.trust_id,
|
||||
trigger.project_id
|
||||
try:
|
||||
# Setup admin context before schedule triggers.
|
||||
ctx = security.create_context(
|
||||
trigger.trust_id,
|
||||
trigger.project_id
|
||||
)
|
||||
|
||||
auth_ctx.set_ctx(ctx)
|
||||
|
||||
LOG.debug("Cron trigger security context: %s", ctx)
|
||||
|
||||
# Try to advance the cron trigger next_execution_time and
|
||||
# remaining_executions if relevant.
|
||||
modified = advance_cron_trigger(trigger)
|
||||
|
||||
# If cron trigger was not already modified by another engine.
|
||||
if modified:
|
||||
LOG.debug(
|
||||
"Starting workflow '%s' by cron trigger '%s'",
|
||||
trigger.workflow.name,
|
||||
trigger.name
|
||||
)
|
||||
|
||||
auth_ctx.set_ctx(ctx)
|
||||
|
||||
LOG.debug("Cron trigger security context: %s", ctx)
|
||||
|
||||
# Try to advance the cron trigger next_execution_time and
|
||||
# remaining_executions if relevant.
|
||||
modified = advance_cron_trigger(trigger)
|
||||
|
||||
# If cron trigger was not already modified by another engine.
|
||||
if modified:
|
||||
LOG.debug(
|
||||
"Starting workflow '%s' by cron trigger '%s'",
|
||||
trigger.workflow.name,
|
||||
trigger.name
|
||||
)
|
||||
|
||||
description = {
|
||||
"description": (
|
||||
"Workflow execution created by cron"
|
||||
" trigger '(%s)'." % trigger.id
|
||||
),
|
||||
"triggered_by": {
|
||||
"type": "cron_trigger",
|
||||
"id": trigger.id,
|
||||
"name": trigger.name,
|
||||
}
|
||||
description = {
|
||||
"description": (
|
||||
"Workflow execution created by cron"
|
||||
" trigger '(%s)'." % trigger.id
|
||||
),
|
||||
"triggered_by": {
|
||||
"type": "cron_trigger",
|
||||
"id": trigger.id,
|
||||
"name": trigger.name,
|
||||
}
|
||||
}
|
||||
|
||||
rpc.get_engine_client().start_workflow(
|
||||
trigger.workflow.name,
|
||||
trigger.workflow.namespace,
|
||||
None,
|
||||
trigger.workflow_input,
|
||||
description=json.dumps(description),
|
||||
**trigger.workflow_params
|
||||
)
|
||||
except Exception:
|
||||
# Log and continue to next cron trigger.
|
||||
LOG.exception(
|
||||
"Failed to process cron trigger %s",
|
||||
str(trigger)
|
||||
rpc.get_engine_client().start_workflow(
|
||||
trigger.workflow.name,
|
||||
trigger.workflow.namespace,
|
||||
None,
|
||||
trigger.workflow_input,
|
||||
description=json.dumps(description),
|
||||
**trigger.workflow_params
|
||||
)
|
||||
finally:
|
||||
auth_ctx.set_ctx(None)
|
||||
except Exception:
|
||||
# Log and continue to next cron trigger.
|
||||
LOG.exception(
|
||||
"Failed to process cron trigger %s",
|
||||
str(trigger)
|
||||
)
|
||||
finally:
|
||||
auth_ctx.set_ctx(None)
|
||||
|
||||
|
||||
class MistralPeriodicTasks(periodic_task.PeriodicTasks):
|
||||
|
||||
def __init__(self, conf):
|
||||
super(MistralPeriodicTasks, self).__init__(conf)
|
||||
|
||||
periodic_task_ = periodic_task.periodic_task(
|
||||
spacing=CONF.cron_trigger.execution_interval,
|
||||
run_immediately=True,
|
||||
)
|
||||
self.add_periodic_task(periodic_task_(process_cron_triggers_v2))
|
||||
|
||||
|
||||
def advance_cron_trigger(t):
|
||||
|
@ -68,7 +68,7 @@ class ProcessCronTriggerTest(base.EngineTestCase):
|
||||
next_trigger = triggers.get_next_cron_triggers()[0]
|
||||
next_execution_time_before = next_trigger.next_execution_time
|
||||
|
||||
periodic.MistralPeriodicTasks(cfg.CONF).process_cron_triggers_v2(None)
|
||||
periodic.process_cron_triggers_v2(None, None)
|
||||
|
||||
start_wf_mock = get_engine_client_mock.return_value.start_workflow
|
||||
|
||||
@ -121,7 +121,7 @@ class ProcessCronTriggerTest(base.EngineTestCase):
|
||||
next_trigger = next_triggers[0]
|
||||
next_execution_time_before = next_trigger.next_execution_time
|
||||
|
||||
periodic.MistralPeriodicTasks(cfg.CONF).process_cron_triggers_v2(None)
|
||||
periodic.process_cron_triggers_v2(None, None)
|
||||
|
||||
next_triggers = triggers.get_next_cron_triggers()
|
||||
|
||||
@ -167,7 +167,7 @@ class ProcessCronTriggerTest(base.EngineTestCase):
|
||||
cron_trigger.next_execution_time
|
||||
)
|
||||
|
||||
periodic.MistralPeriodicTasks(cfg.CONF).process_cron_triggers_v2(None)
|
||||
periodic.process_cron_triggers_v2(None, None)
|
||||
|
||||
# After process_triggers context is set to None, need to reset it.
|
||||
auth_ctx.set_ctx(self.ctx)
|
||||
|
Loading…
Reference in New Issue
Block a user