Merge "[Trusts] Fix deleting trust"
This commit is contained in:
commit
3de1918c30
@ -504,6 +504,10 @@ def get_event_trigger(id, insecure=False):
|
||||
return IMPL.get_event_trigger(id, insecure)
|
||||
|
||||
|
||||
def load_event_trigger(id, insecure=False):
|
||||
return IMPL.load_event_trigger(id, insecure)
|
||||
|
||||
|
||||
def get_event_triggers(insecure=False, limit=None, marker=None, sort_keys=None,
|
||||
sort_dirs=None, fields=None, **kwargs):
|
||||
return IMPL.get_event_triggers(
|
||||
|
@ -1506,6 +1506,11 @@ def get_event_trigger(id, insecure=False, session=None):
|
||||
return event_trigger
|
||||
|
||||
|
||||
@b.session_aware()
|
||||
def load_event_trigger(id, insecure=False, session=None):
|
||||
return _get_event_trigger(id, insecure)
|
||||
|
||||
|
||||
@b.session_aware()
|
||||
def get_event_triggers(insecure=False, session=None, **kwargs):
|
||||
return _get_collection_sorted_by_time(
|
||||
|
@ -27,6 +27,7 @@ from mistral import exceptions as exc
|
||||
from mistral.lang import parser as spec_parser
|
||||
from mistral.rpc import clients as rpc
|
||||
from mistral.services import scheduler
|
||||
from mistral.services import triggers
|
||||
from mistral.services import workflows as wf_service
|
||||
from mistral import utils
|
||||
from mistral.utils import merge_dicts
|
||||
@ -294,6 +295,8 @@ class Workflow(object):
|
||||
# lookup cache anymore.
|
||||
lookup_utils.invalidate_cached_task_executions(self.wf_ex.id)
|
||||
|
||||
triggers.on_workflow_complete(self.wf_ex)
|
||||
|
||||
if recursive and self.wf_ex.task_execution_id:
|
||||
parent_task_ex = db_api.get_task_execution(
|
||||
self.wf_ex.task_execution_id
|
||||
|
@ -14,6 +14,7 @@
|
||||
# under the License.
|
||||
|
||||
from collections import defaultdict
|
||||
import json
|
||||
import os
|
||||
import threading
|
||||
|
||||
@ -237,12 +238,23 @@ class DefaultEventEngine(base.EventEngine):
|
||||
ctx = security.create_context(t['trust_id'], t['project_id'])
|
||||
auth_ctx.set_ctx(ctx)
|
||||
|
||||
description = {
|
||||
"description": (
|
||||
"Workflow execution created by event"
|
||||
" trigger '(%s)'." % t['id']
|
||||
),
|
||||
"triggered_by": {
|
||||
"type": "event_trigger",
|
||||
"id": t['id'],
|
||||
"name": t['name']
|
||||
}
|
||||
}
|
||||
|
||||
try:
|
||||
self.engine_client.start_workflow(
|
||||
t['workflow_id'],
|
||||
t['workflow_input'],
|
||||
description="Workflow execution created by event "
|
||||
"trigger %s." % t['id'],
|
||||
description=json.dumps(description),
|
||||
**workflow_params
|
||||
)
|
||||
except Exception as e:
|
||||
@ -390,4 +402,6 @@ class DefaultEventEngine(base.EventEngine):
|
||||
|
||||
return
|
||||
|
||||
security.delete_trust(trigger['trust_id'])
|
||||
|
||||
self._add_event_listener(trigger['exchange'], trigger['topic'], events)
|
||||
|
@ -12,6 +12,8 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_service import periodic_task
|
||||
@ -59,12 +61,23 @@ class MistralPeriodicTasks(periodic_task.PeriodicTasks):
|
||||
trigger.name
|
||||
)
|
||||
|
||||
description = {
|
||||
"description": (
|
||||
"Workflow execution created by cron"
|
||||
" trigger '(%s)'." % trigger.id
|
||||
),
|
||||
"triggered_by": {
|
||||
"type": "cron_trigger",
|
||||
"id": trigger.id,
|
||||
"name": trigger.name,
|
||||
}
|
||||
}
|
||||
|
||||
rpc.get_engine_client().start_workflow(
|
||||
trigger.workflow.name,
|
||||
trigger.workflow.namespace,
|
||||
trigger.workflow_input,
|
||||
description="Workflow execution created "
|
||||
"by cron trigger '(%s)'." % trigger.id,
|
||||
description=json.dumps(description),
|
||||
**trigger.workflow_params
|
||||
)
|
||||
except Exception:
|
||||
@ -89,7 +102,8 @@ def advance_cron_trigger(t):
|
||||
if t.remaining_executions == 0:
|
||||
modified_count = triggers.delete_cron_trigger(
|
||||
t.name,
|
||||
trust_id=t.trust_id
|
||||
trust_id=t.trust_id,
|
||||
delete_trust=False
|
||||
)
|
||||
else: # if remaining execution = None or > 0.
|
||||
next_time = triggers.get_next_execution_time(
|
||||
|
@ -80,7 +80,12 @@ def create_context(trust_id, project_id):
|
||||
)
|
||||
|
||||
|
||||
def delete_trust(trust_id):
|
||||
def delete_trust(trust_id=None):
|
||||
if not trust_id:
|
||||
# Try to retrieve trust from context.
|
||||
if auth_ctx.has_ctx():
|
||||
trust_id = auth_ctx.ctx().trust_id
|
||||
|
||||
if not trust_id:
|
||||
return
|
||||
|
||||
|
@ -14,8 +14,11 @@
|
||||
|
||||
import croniter
|
||||
import datetime
|
||||
import json
|
||||
import six
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.engine import utils as eng_utils
|
||||
from mistral import exceptions as exc
|
||||
@ -24,6 +27,9 @@ from mistral.rpc import clients as rpc
|
||||
from mistral.services import security
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_next_execution_time(pattern, start_time):
|
||||
return croniter.croniter(pattern, start_time).get_next(
|
||||
datetime.datetime
|
||||
@ -130,13 +136,14 @@ def create_cron_trigger(name, workflow_name, workflow_input,
|
||||
return trig
|
||||
|
||||
|
||||
def delete_cron_trigger(name, trust_id=None):
|
||||
def delete_cron_trigger(name, trust_id=None, delete_trust=True):
|
||||
if not trust_id:
|
||||
trigger = db_api.get_cron_trigger(name)
|
||||
trust_id = trigger.trust_id
|
||||
|
||||
modified_count = db_api.delete_cron_trigger(name)
|
||||
if modified_count:
|
||||
|
||||
if modified_count and delete_trust:
|
||||
# Delete trust only together with deleting trigger.
|
||||
security.delete_trust(trust_id)
|
||||
|
||||
@ -217,3 +224,29 @@ def update_event_trigger(id, values):
|
||||
rpc.get_event_engine_client().update_event_trigger(trig.to_dict())
|
||||
|
||||
return trig
|
||||
|
||||
|
||||
def on_workflow_complete(wf_ex):
|
||||
if wf_ex.task_execution_id:
|
||||
return
|
||||
|
||||
try:
|
||||
description = json.loads(wf_ex.description)
|
||||
except ValueError as e:
|
||||
LOG.debug(str(e))
|
||||
return
|
||||
|
||||
if not isinstance(description, dict):
|
||||
return
|
||||
|
||||
triggered = description.get('triggered_by')
|
||||
|
||||
if not triggered:
|
||||
return
|
||||
|
||||
if triggered['type'] == 'cron_trigger':
|
||||
if not db_api.load_cron_trigger(triggered['name']):
|
||||
security.delete_trust()
|
||||
elif triggered['type'] == 'event_trigger':
|
||||
if not db_api.load_event_trigger(triggered['id'], True):
|
||||
security.delete_trust()
|
||||
|
@ -234,7 +234,7 @@ class TriggerServiceV2Test(base.DbTestCase):
|
||||
mock.MagicMock(side_effect=new_advance_cron_trigger)
|
||||
)
|
||||
@mock.patch.object(security, 'delete_trust')
|
||||
def test_create_delete_trust_in_trigger(self, create_ctx, delete_trust):
|
||||
def test_create_delete_trust_in_trigger(self, delete_trust, create_ctx):
|
||||
create_ctx.return_value = self.ctx
|
||||
cfg.CONF.set_default('auth_enable', True, group='pecan')
|
||||
trigger_thread = periodic.setup()
|
||||
@ -255,10 +255,8 @@ class TriggerServiceV2Test(base.DbTestCase):
|
||||
datetime.datetime(2010, 8, 25)
|
||||
)
|
||||
|
||||
self._await(
|
||||
lambda: delete_trust.call_count == 1, timeout=10
|
||||
)
|
||||
self.assertEqual('my_trust_id', delete_trust.mock_calls[0][1][0])
|
||||
eventlet.sleep(1)
|
||||
self.assertEqual(0, delete_trust.call_count)
|
||||
|
||||
def test_get_trigger_in_correct_orders(self):
|
||||
t1_name = 'trigger-%s' % utils.generate_unicode_uuid()
|
||||
|
Loading…
x
Reference in New Issue
Block a user