[Event-triggers] Allow public triggers
* Allowed creating via API * Added the corresponding processing in the event engine * Corresponding test Closes-Bug: #1704111 Change-Id: I73a1d10fe684f1ec962e42e9700064d06cb0bcbe
This commit is contained in:
parent
7fad3e7b18
commit
e780ffb06d
@ -58,6 +58,7 @@
|
||||
"workflows:update": "rule:admin_or_owner",
|
||||
|
||||
"event_triggers:create": "rule:admin_or_owner",
|
||||
"event_triggers:create:public": "rule:admin_only",
|
||||
"event_triggers:delete": "rule:admin_or_owner",
|
||||
"event_triggers:get": "rule:admin_or_owner",
|
||||
"event_triggers:list": "rule:admin_or_owner",
|
||||
|
@ -61,6 +61,9 @@ class EventTriggersController(rest.RestController):
|
||||
CREATE_MANDATORY
|
||||
)
|
||||
|
||||
if values.get('scope') == 'public':
|
||||
acl.enforce('event_triggers:create:public', auth_ctx.ctx())
|
||||
|
||||
LOG.info('Create event trigger: %s', values)
|
||||
|
||||
db_model = triggers.create_event_trigger(
|
||||
@ -69,6 +72,7 @@ class EventTriggersController(rest.RestController):
|
||||
values.get('topic'),
|
||||
values.get('event'),
|
||||
values.get('workflow_id'),
|
||||
values.get('scope'),
|
||||
workflow_input=values.get('workflow_input'),
|
||||
workflow_params=values.get('workflow_params'),
|
||||
)
|
||||
|
@ -271,12 +271,26 @@ class DefaultEventEngine(base.EventEngine):
|
||||
# There may be more projects registered the same event.
|
||||
project_ids = [t['project_id'] for t in triggers]
|
||||
|
||||
any_public = any(
|
||||
[t['scope'] == 'public' for t in triggers]
|
||||
)
|
||||
|
||||
# Skip the event doesn't belong to any event trigger owner.
|
||||
if (CONF.pecan.auth_enable and
|
||||
if (not any_public and CONF.pecan.auth_enable and
|
||||
context.get('project_id', '') not in project_ids):
|
||||
self.event_queue.task_done()
|
||||
continue
|
||||
|
||||
# Need to choose what trigger(s) should be called exactly.
|
||||
triggers_to_call = []
|
||||
for t in triggers:
|
||||
project_trigger = (
|
||||
t['project_id'] == context.get('project_id')
|
||||
)
|
||||
public_trigger = t['scope'] == 'public'
|
||||
if project_trigger or public_trigger:
|
||||
triggers_to_call.append(t)
|
||||
|
||||
LOG.debug('Start to handle event: %s, %d trigger(s) '
|
||||
'registered.', event_type, len(triggers))
|
||||
|
||||
@ -285,7 +299,7 @@ class DefaultEventEngine(base.EventEngine):
|
||||
event
|
||||
)
|
||||
|
||||
self._start_workflow(triggers, event_params)
|
||||
self._start_workflow(triggers_to_call, event_params)
|
||||
|
||||
self.event_queue.task_done()
|
||||
|
||||
|
@ -147,7 +147,8 @@ def delete_cron_trigger(name, trust_id=None):
|
||||
|
||||
|
||||
def create_event_trigger(name, exchange, topic, event, workflow_id,
|
||||
workflow_input=None, workflow_params=None):
|
||||
scope='private', workflow_input=None,
|
||||
workflow_params=None):
|
||||
with db_api.transaction():
|
||||
wf_def = db_api.get_workflow_definition_by_id(workflow_id)
|
||||
|
||||
@ -172,6 +173,7 @@ def create_event_trigger(name, exchange, topic, event, workflow_id,
|
||||
'exchange': exchange,
|
||||
'topic': topic,
|
||||
'event': event,
|
||||
'scope': scope,
|
||||
}
|
||||
|
||||
security.add_trust_id(values)
|
||||
|
@ -19,6 +19,7 @@ import mock
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.db.v2.sqlalchemy import models
|
||||
from mistral import exceptions as exc
|
||||
from mistral.services import triggers
|
||||
from mistral.tests.unit.api import base
|
||||
from mistral.tests.unit import base as unit_base
|
||||
|
||||
@ -108,6 +109,21 @@ class TestEventTriggerController(base.APITest):
|
||||
client.create_event_trigger.call_args[0][1]
|
||||
)
|
||||
|
||||
@mock.patch.object(db_api, "get_workflow_definition_by_id", MOCK_WF)
|
||||
@mock.patch.object(db_api, "get_workflow_definition", MOCK_WF)
|
||||
@mock.patch.object(triggers, "create_event_trigger")
|
||||
def test_post_public(self, create_trigger):
|
||||
trigger = copy.deepcopy(TRIGGER)
|
||||
trigger['scope'] = 'public'
|
||||
trigger.pop('id')
|
||||
|
||||
resp = self.app.post_json('/v2/event_triggers', trigger)
|
||||
|
||||
self.assertEqual(201, resp.status_int)
|
||||
|
||||
self.assertTrue(create_trigger.called)
|
||||
self.assertEqual('public', create_trigger.call_args[0][5])
|
||||
|
||||
def test_post_no_workflow_id(self):
|
||||
CREATE_TRIGGER = copy.deepcopy(TRIGGER)
|
||||
CREATE_TRIGGER.pop('id')
|
||||
|
@ -13,11 +13,13 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import copy
|
||||
import time
|
||||
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
|
||||
from mistral import context as auth_context
|
||||
from mistral.db.v2.sqlalchemy import api as db_api
|
||||
from mistral.event_engine import default_event_engine as evt_eng
|
||||
from mistral.rpc import clients as rpc
|
||||
@ -92,9 +94,67 @@ class EventEngineTest(base.DbTestCase):
|
||||
)
|
||||
self.assertEqual(1, len(e_engine.exchange_topic_listener_map))
|
||||
|
||||
@mock.patch('mistral.messaging.start_listener')
|
||||
@mock.patch.object(rpc, 'get_engine_client', mock.Mock())
|
||||
def test_event_engine_public_trigger(self, mock_start):
|
||||
t = copy.deepcopy(EVENT_TRIGGER)
|
||||
|
||||
# Create public trigger as an admin
|
||||
self.ctx = base.get_context(default=False, admin=True)
|
||||
auth_context.set_ctx(self.ctx)
|
||||
|
||||
t['scope'] = 'public'
|
||||
t['project_id'] = self.ctx.tenant
|
||||
trigger = db_api.create_event_trigger(t)
|
||||
|
||||
# Switch to the user.
|
||||
self.ctx = base.get_context(default=True)
|
||||
auth_context.set_ctx(self.ctx)
|
||||
|
||||
e_engine = evt_eng.DefaultEventEngine()
|
||||
|
||||
self.addCleanup(e_engine.handler_tg.stop)
|
||||
|
||||
event = {
|
||||
'event_type': EVENT_TYPE,
|
||||
'payload': {},
|
||||
'publisher': 'fake_publisher',
|
||||
'timestamp': '',
|
||||
'context': {
|
||||
'project_id': '%s' % self.ctx.project_id,
|
||||
'user_id': 'fake_user'
|
||||
},
|
||||
}
|
||||
|
||||
# Moreover, assert that trigger.project_id != event.project_id
|
||||
self.assertNotEqual(
|
||||
trigger.project_id, event['context']['project_id']
|
||||
)
|
||||
|
||||
with mock.patch.object(e_engine, 'engine_client') as client_mock:
|
||||
e_engine.event_queue.put(event)
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
self.assertEqual(1, client_mock.start_workflow.call_count)
|
||||
|
||||
args, kwargs = client_mock.start_workflow.call_args
|
||||
|
||||
self.assertEqual((EVENT_TRIGGER['workflow_id'], {}), args)
|
||||
self.assertDictEqual(
|
||||
{
|
||||
'service': 'fake_publisher',
|
||||
'project_id': '%s' % self.ctx.project_id,
|
||||
'user_id': 'fake_user',
|
||||
'timestamp': ''
|
||||
},
|
||||
kwargs['event_params']
|
||||
)
|
||||
|
||||
@mock.patch('mistral.messaging.start_listener')
|
||||
@mock.patch.object(rpc, 'get_engine_client', mock.Mock())
|
||||
def test_process_event_queue(self, mock_start):
|
||||
EVENT_TRIGGER['project_id'] = self.ctx.project_id
|
||||
db_api.create_event_trigger(EVENT_TRIGGER)
|
||||
|
||||
e_engine = evt_eng.DefaultEventEngine()
|
||||
@ -106,7 +166,10 @@ class EventEngineTest(base.DbTestCase):
|
||||
'payload': {},
|
||||
'publisher': 'fake_publisher',
|
||||
'timestamp': '',
|
||||
'context': {'project_id': 'fake_project', 'user_id': 'fake_user'},
|
||||
'context': {
|
||||
'project_id': '%s' % self.ctx.project_id,
|
||||
'user_id': 'fake_user'
|
||||
},
|
||||
}
|
||||
|
||||
with mock.patch.object(e_engine, 'engine_client') as client_mock:
|
||||
@ -122,7 +185,7 @@ class EventEngineTest(base.DbTestCase):
|
||||
self.assertDictEqual(
|
||||
{
|
||||
'service': 'fake_publisher',
|
||||
'project_id': 'fake_project',
|
||||
'project_id': '%s' % self.ctx.project_id,
|
||||
'user_id': 'fake_user',
|
||||
'timestamp': ''
|
||||
},
|
||||
|
Loading…
Reference in New Issue
Block a user