Add event configuration for event trigger

When using event trigger feature, some of the notification data may be
useful to end users in their workflows. However, the whole data can not
be visible to end users due to security reasons.

This patch will give operators a chance to define what information will
be available to end users. If the definition is not found, a 'safe'
default setting will be used.

Change-Id: I8b6e9cbe318011d2d11a2bb91aaff4d37222c7eb
Implements: blueprint event-notification-trigger
This commit is contained in:
Lingxian Kong 2016-08-17 23:39:33 +12:00
parent 471ce2d043
commit c73edcbe1d
5 changed files with 209 additions and 46 deletions

View File

@ -0,0 +1,6 @@
- event_types:
- compute.instance.create.*
properties:
resource_id: <% $.payload.instance_id %>
project_id: <% $.context.project_id %>
user_id: <% $.context.user_id %>

View File

@ -191,6 +191,11 @@ event_engine_opts = [
default='mistral_event_engine', default='mistral_event_engine',
help='The message topic that the event engine listens on.' help='The message topic that the event engine listens on.'
), ),
cfg.StrOpt(
'event_definitions_cfg_file',
default='/etc/mistral/event_definitions.yaml',
help='Configuration file for event definitions.'
),
] ]
execution_expiration_policy_opts = [ execution_expiration_policy_opts = [

View File

@ -26,6 +26,7 @@ from oslo_messaging.notify import dispatcher
from oslo_messaging.notify import listener from oslo_messaging.notify import listener
from oslo_messaging import target from oslo_messaging import target
from oslo_messaging import transport from oslo_messaging import transport
from oslo_utils import timeutils
import six import six
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -52,12 +53,16 @@ def handle_event(self, ctxt, publisher_id, event_type, payload, metadata):
'payload: %s, metadata: %s.', publisher_id, event_type, payload, 'payload: %s, metadata: %s.', publisher_id, event_type, payload,
metadata) metadata)
self.event_engine.process_notification_event( notification = {
ctxt, 'event_type': event_type,
event_type, 'payload': payload,
payload, 'publisher': publisher_id,
metadata 'timestamp': metadata.get('timestamp',
) ctxt.get('timestamp', timeutils.utcnow())),
'context': ctxt
}
self.event_engine.process_notification_event(notification)
return dispatcher.NotificationResult.HANDLED return dispatcher.NotificationResult.HANDLED

View File

@ -13,27 +13,115 @@
# under the License. # under the License.
from collections import defaultdict from collections import defaultdict
import os
import threading import threading
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from oslo_service import threadgroup from oslo_service import threadgroup
from oslo_utils import fnmatch
import six import six
import yaml
from mistral import context as auth_ctx from mistral import context as auth_ctx
from mistral import coordination from mistral import coordination
from mistral.db.v2 import api as db_api from mistral.db.v2 import api as db_api
from mistral import exceptions
from mistral import expressions
from mistral import messaging as mistral_messaging from mistral import messaging as mistral_messaging
from mistral.services import security from mistral.services import security
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
CONF = cfg.CONF CONF = cfg.CONF
# Event queue event constants. DEFAULT_PROPERTIES = {
EVENT_CONTEXT = 'context' 'service': '<% $.publisher %>',
EVENT_TYPE = 'type' 'project_id': '<% $.context.project_id %>',
EVENT_PAYLOAD = 'payload' 'user_id': '<% $.context.user_id %>',
EVENT_METADATA = 'metadata' 'timestamp': '<% $.timestamp %>'
}
class EventDefinition(object):
def __init__(self, definition_cfg):
self.cfg = definition_cfg
try:
self.event_types = self.cfg['event_types']
self.properties = self.cfg['properties']
except KeyError as err:
raise exceptions.MistralException(
"Required field %s not specified" % err.args[0]
)
if isinstance(self.event_types, six.string_types):
self.event_types = [self.event_types]
def match_type(self, event_type):
for t in self.event_types:
if fnmatch.fnmatch(event_type, t):
return True
return False
def convert(self, event):
return expressions.evaluate_recursively(self.properties, event)
class NotificationsConverter(object):
def __init__(self):
config_file = CONF.event_engine.event_definitions_cfg_file
definition_cfg = []
if os.path.exists(config_file):
with open(config_file) as cf:
config = cf.read()
try:
definition_cfg = yaml.safe_load(config)
except yaml.YAMLError as err:
if hasattr(err, 'problem_mark'):
mark = err.problem_mark
errmsg = (
"Invalid YAML syntax in Definitions file "
"%(file)s at line: %(line)s, column: %(column)s."
% dict(file=config_file,
line=mark.line + 1,
column=mark.column + 1)
)
else:
errmsg = (
"YAML error reading Definitions file %s" %
CONF.event_engine.event_definitions_cfg_file
)
LOG.error(errmsg)
raise exceptions.MistralError(
'Invalid event definition configuration file. %s' %
config_file
)
self.definitions = [EventDefinition(event_def)
for event_def in reversed(definition_cfg)]
def get_event_definition(self, event_type):
for d in self.definitions:
if d.match_type(event_type):
return d
return None
def convert(self, event_type, event):
edef = self.get_event_definition(event_type)
if edef is None:
LOG.debug('No event definition found for type: %s, use default '
'settings instead.', event_type)
return expressions.evaluate_recursively(DEFAULT_PROPERTIES, event)
return edef.convert(event)
class EventEngine(coordination.Service): class EventEngine(coordination.Service):
@ -55,6 +143,10 @@ class EventEngine(coordination.Service):
self.lock = threading.Lock() self.lock = threading.Lock()
LOG.debug('Loading notification definitions.')
self.notification_converter = NotificationsConverter()
self._start_handler() self._start_handler()
self._start_listeners() self._start_listeners()
@ -131,15 +223,13 @@ class EventEngine(coordination.Service):
exchange, topic = ex_t exchange, topic = ex_t
self._add_event_listener(exchange, topic, events) self._add_event_listener(exchange, topic, events)
def _start_workflow(self, triggers, payload, metadata): def _start_workflow(self, triggers, event_params):
"""Start workflows defined in event triggers.""" """Start workflows defined in event triggers."""
for t in triggers: for t in triggers:
LOG.info('Start to process event trigger: %s', t['id']) LOG.info('Start to process event trigger: %s', t['id'])
workflow_params = t.get('workflow_params', {}) workflow_params = t.get('workflow_params', {})
workflow_params.update( workflow_params.update({'event_params': event_params})
{'event_payload': payload, 'event_metadata': metadata}
)
# Setup context before schedule triggers. # Setup context before schedule triggers.
ctx = security.create_context(t['trust_id'], t['project_id']) ctx = security.create_context(t['trust_id'], t['project_id'])
@ -167,10 +257,8 @@ class EventEngine(coordination.Service):
while True: while True:
event = self.event_queue.get() event = self.event_queue.get()
context = event.get(EVENT_CONTEXT) context = event.get('context')
event_type = event.get(EVENT_TYPE) event_type = event.get('event_type')
payload = event.get(EVENT_PAYLOAD)
metadata = event.get(EVENT_METADATA)
# NOTE(kong): Use lock here to protect event_triggers_map variable # NOTE(kong): Use lock here to protect event_triggers_map variable
# from being updated outside the thread. # from being updated outside the thread.
@ -190,7 +278,12 @@ class EventEngine(coordination.Service):
LOG.debug('Start to handle event: %s, %d trigger(s) ' LOG.debug('Start to handle event: %s, %d trigger(s) '
'registered.', event_type, len(triggers)) 'registered.', event_type, len(triggers))
self._start_workflow(triggers, payload, metadata) event_params = self.notification_converter.convert(
event_type,
event
)
self._start_workflow(triggers, event_params)
self.event_queue.task_done() self.event_queue.task_done()
@ -200,22 +293,14 @@ class EventEngine(coordination.Service):
self.handler_tg.add_thread(self._process_event_queue) self.handler_tg.add_thread(self._process_event_queue)
def process_notification_event(self, context, event_type, payload, def process_notification_event(self, notification):
metadata):
"""Callback funtion by event handler. """Callback funtion by event handler.
Just put notification into a queue. Just put notification into a queue.
""" """
event = { LOG.debug("Putting notification event to event queue.")
EVENT_CONTEXT: context,
EVENT_TYPE: event_type,
EVENT_PAYLOAD: payload,
EVENT_METADATA: metadata
}
LOG.debug("Adding notification event to event queue: %s", event) self.event_queue.put(notification)
self.event_queue.put(event)
def create_event_trigger(self, trigger, events): def create_event_trigger(self, trigger, events):
"""An endpoint method for creating event trigger. """An endpoint method for creating event trigger.

View File

@ -35,7 +35,7 @@ my_wf:
""" """
EXCHANGE_TOPIC = ('openstack', 'notification') EXCHANGE_TOPIC = ('openstack', 'notification')
EVENT = 'compute.instance.create.start' EVENT_TYPE = 'compute.instance.create.start'
EVENT_TRIGGER = { EVENT_TRIGGER = {
'name': 'trigger1', 'name': 'trigger1',
@ -44,7 +44,7 @@ EVENT_TRIGGER = {
'workflow_params': {}, 'workflow_params': {},
'exchange': 'openstack', 'exchange': 'openstack',
'topic': 'notification', 'topic': 'notification',
'event': EVENT, 'event': EVENT_TYPE,
} }
cfg.CONF.set_default('auth_enable', False, group='pecan') cfg.CONF.set_default('auth_enable', False, group='pecan')
@ -74,30 +74,31 @@ class EventEngineTest(base.DbTestCase):
self.assertEqual(1, len(e_engine.exchange_topic_events_map)) self.assertEqual(1, len(e_engine.exchange_topic_events_map))
self.assertEqual( self.assertEqual(
EVENT, EVENT_TYPE,
list(e_engine.exchange_topic_events_map[EXCHANGE_TOPIC])[0] list(e_engine.exchange_topic_events_map[EXCHANGE_TOPIC])[0]
) )
self.assertEqual(1, len(e_engine.event_triggers_map)) self.assertEqual(1, len(e_engine.event_triggers_map))
self.assertEqual(1, len(e_engine.event_triggers_map[EVENT])) self.assertEqual(1, len(e_engine.event_triggers_map[EVENT_TYPE]))
self._assert_dict_contains_subset( self._assert_dict_contains_subset(
trigger.to_dict(), trigger.to_dict(),
e_engine.event_triggers_map[EVENT][0] e_engine.event_triggers_map[EVENT_TYPE][0]
) )
self.assertEqual(1, len(e_engine.exchange_topic_listener_map)) self.assertEqual(1, len(e_engine.exchange_topic_listener_map))
@mock.patch('mistral.messaging.start_listener') @mock.patch('mistral.messaging.start_listener')
def test_process_event_queue(self, mock_start): def test_process_event_queue(self, mock_start):
trigger = db_api.create_event_trigger(EVENT_TRIGGER) db_api.create_event_trigger(EVENT_TRIGGER)
client = mock.MagicMock() client = mock.MagicMock()
e_engine = event_engine.EventEngine(client) e_engine = event_engine.EventEngine(client)
self.addCleanup(e_engine.handler_tg.stop) self.addCleanup(e_engine.handler_tg.stop)
event = { event = {
event_engine.EVENT_CONTEXT: {}, 'event_type': EVENT_TYPE,
event_engine.EVENT_TYPE: EVENT, 'payload': {},
event_engine.EVENT_PAYLOAD: {}, 'publisher': 'fake_publisher',
event_engine.EVENT_METADATA: {} 'timestamp': '',
'context': {'project_id': 'fake_project', 'user_id': 'fake_user'},
} }
with mock.patch.object(e_engine, 'engine_client') as client_mock: with mock.patch.object(e_engine, 'engine_client') as client_mock:
@ -111,10 +112,71 @@ class EventEngineTest(base.DbTestCase):
self.assertEqual((EVENT_TRIGGER['workflow_id'], {}), args) self.assertEqual((EVENT_TRIGGER['workflow_id'], {}), args)
self.assertDictEqual( self.assertDictEqual(
{ {
'description': 'Workflow execution created by event ' 'service': 'fake_publisher',
'trigger %s.' % trigger.id, 'project_id': 'fake_project',
'event_payload': {}, 'user_id': 'fake_user',
'event_metadata': {} 'timestamp': ''
}, },
kwargs kwargs['event_params']
)
class NotificationsConverterTest(base.BaseTest):
def test_convert(self):
definition_cfg = [
{
'event_types': EVENT_TYPE,
'properties': {'resource_id': '<% $.payload.instance_id %>'}
}
]
converter = event_engine.NotificationsConverter()
converter.definitions = [event_engine.EventDefinition(event_def)
for event_def in reversed(definition_cfg)]
notification = {
'event_type': EVENT_TYPE,
'payload': {'instance_id': '12345'},
'publisher': 'fake_publisher',
'timestamp': '',
'context': {'project_id': 'fake_project', 'user_id': 'fake_user'}
}
event = converter.convert(EVENT_TYPE, notification)
self.assertDictEqual(
{'resource_id': '12345'},
event
)
def test_convert_event_type_not_defined(self):
definition_cfg = [
{
'event_types': EVENT_TYPE,
'properties': {'resource_id': '<% $.payload.instance_id %>'}
}
]
converter = event_engine.NotificationsConverter()
converter.definitions = [event_engine.EventDefinition(event_def)
for event_def in reversed(definition_cfg)]
notification = {
'event_type': 'fake_event',
'payload': {'instance_id': '12345'},
'publisher': 'fake_publisher',
'timestamp': '',
'context': {'project_id': 'fake_project', 'user_id': 'fake_user'}
}
event = converter.convert('fake_event', notification)
self.assertDictEqual(
{
'service': 'fake_publisher',
'project_id': 'fake_project',
'user_id': 'fake_user',
'timestamp': ''
},
event
) )