From c73edcbe1d37b257c02446fa9a04d1b70515db9d Mon Sep 17 00:00:00 2001 From: Lingxian Kong Date: Wed, 17 Aug 2016 23:39:33 +1200 Subject: [PATCH] Add event configuration for event trigger When using event trigger feature, some of the notification data may be useful to end users in their workflows. However, the whole data can not be visible to end users due to security reasons. This patch will give operators a chance to define what information will be available to end users. If the definition is not found, a 'safe' default setting will be used. Change-Id: I8b6e9cbe318011d2d11a2bb91aaff4d37222c7eb Implements: blueprint event-notification-trigger --- etc/event_definitions.yml.sample | 6 + mistral/config.py | 5 + mistral/messaging.py | 17 ++- mistral/services/event_engine.py | 135 ++++++++++++++---- .../tests/unit/services/test_event_engine.py | 92 ++++++++++-- 5 files changed, 209 insertions(+), 46 deletions(-) create mode 100644 etc/event_definitions.yml.sample diff --git a/etc/event_definitions.yml.sample b/etc/event_definitions.yml.sample new file mode 100644 index 000000000..83626cbed --- /dev/null +++ b/etc/event_definitions.yml.sample @@ -0,0 +1,6 @@ +- event_types: + - compute.instance.create.* + properties: + resource_id: <% $.payload.instance_id %> + project_id: <% $.context.project_id %> + user_id: <% $.context.user_id %> diff --git a/mistral/config.py b/mistral/config.py index 013c9bb02..c2e998a35 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -191,6 +191,11 @@ event_engine_opts = [ default='mistral_event_engine', help='The message topic that the event engine listens on.' ), + cfg.StrOpt( + 'event_definitions_cfg_file', + default='/etc/mistral/event_definitions.yaml', + help='Configuration file for event definitions.' + ), ] execution_expiration_policy_opts = [ diff --git a/mistral/messaging.py b/mistral/messaging.py index 3dbad34c5..4707505e9 100644 --- a/mistral/messaging.py +++ b/mistral/messaging.py @@ -26,6 +26,7 @@ from oslo_messaging.notify import dispatcher from oslo_messaging.notify import listener from oslo_messaging import target from oslo_messaging import transport +from oslo_utils import timeutils import six LOG = logging.getLogger(__name__) @@ -52,12 +53,16 @@ def handle_event(self, ctxt, publisher_id, event_type, payload, metadata): 'payload: %s, metadata: %s.', publisher_id, event_type, payload, metadata) - self.event_engine.process_notification_event( - ctxt, - event_type, - payload, - metadata - ) + notification = { + 'event_type': event_type, + 'payload': payload, + 'publisher': publisher_id, + 'timestamp': metadata.get('timestamp', + ctxt.get('timestamp', timeutils.utcnow())), + 'context': ctxt + } + + self.event_engine.process_notification_event(notification) return dispatcher.NotificationResult.HANDLED diff --git a/mistral/services/event_engine.py b/mistral/services/event_engine.py index 9d073f601..aef425c42 100644 --- a/mistral/services/event_engine.py +++ b/mistral/services/event_engine.py @@ -13,27 +13,115 @@ # under the License. from collections import defaultdict +import os import threading from oslo_config import cfg from oslo_log import log as logging from oslo_service import threadgroup +from oslo_utils import fnmatch import six +import yaml from mistral import context as auth_ctx from mistral import coordination from mistral.db.v2 import api as db_api +from mistral import exceptions +from mistral import expressions from mistral import messaging as mistral_messaging from mistral.services import security LOG = logging.getLogger(__name__) CONF = cfg.CONF -# Event queue event constants. -EVENT_CONTEXT = 'context' -EVENT_TYPE = 'type' -EVENT_PAYLOAD = 'payload' -EVENT_METADATA = 'metadata' +DEFAULT_PROPERTIES = { + 'service': '<% $.publisher %>', + 'project_id': '<% $.context.project_id %>', + 'user_id': '<% $.context.user_id %>', + 'timestamp': '<% $.timestamp %>' +} + + +class EventDefinition(object): + def __init__(self, definition_cfg): + self.cfg = definition_cfg + + try: + self.event_types = self.cfg['event_types'] + self.properties = self.cfg['properties'] + except KeyError as err: + raise exceptions.MistralException( + "Required field %s not specified" % err.args[0] + ) + + if isinstance(self.event_types, six.string_types): + self.event_types = [self.event_types] + + def match_type(self, event_type): + for t in self.event_types: + if fnmatch.fnmatch(event_type, t): + return True + + return False + + def convert(self, event): + return expressions.evaluate_recursively(self.properties, event) + + +class NotificationsConverter(object): + def __init__(self): + config_file = CONF.event_engine.event_definitions_cfg_file + definition_cfg = [] + + if os.path.exists(config_file): + with open(config_file) as cf: + config = cf.read() + + try: + definition_cfg = yaml.safe_load(config) + except yaml.YAMLError as err: + if hasattr(err, 'problem_mark'): + mark = err.problem_mark + errmsg = ( + "Invalid YAML syntax in Definitions file " + "%(file)s at line: %(line)s, column: %(column)s." + % dict(file=config_file, + line=mark.line + 1, + column=mark.column + 1) + ) + else: + errmsg = ( + "YAML error reading Definitions file %s" % + CONF.event_engine.event_definitions_cfg_file + ) + + LOG.error(errmsg) + + raise exceptions.MistralError( + 'Invalid event definition configuration file. %s' % + config_file + ) + + self.definitions = [EventDefinition(event_def) + for event_def in reversed(definition_cfg)] + + def get_event_definition(self, event_type): + for d in self.definitions: + if d.match_type(event_type): + return d + + return None + + def convert(self, event_type, event): + edef = self.get_event_definition(event_type) + + if edef is None: + LOG.debug('No event definition found for type: %s, use default ' + 'settings instead.', event_type) + + return expressions.evaluate_recursively(DEFAULT_PROPERTIES, event) + + return edef.convert(event) class EventEngine(coordination.Service): @@ -55,6 +143,10 @@ class EventEngine(coordination.Service): self.lock = threading.Lock() + LOG.debug('Loading notification definitions.') + + self.notification_converter = NotificationsConverter() + self._start_handler() self._start_listeners() @@ -131,15 +223,13 @@ class EventEngine(coordination.Service): exchange, topic = ex_t self._add_event_listener(exchange, topic, events) - def _start_workflow(self, triggers, payload, metadata): + def _start_workflow(self, triggers, event_params): """Start workflows defined in event triggers.""" for t in triggers: LOG.info('Start to process event trigger: %s', t['id']) workflow_params = t.get('workflow_params', {}) - workflow_params.update( - {'event_payload': payload, 'event_metadata': metadata} - ) + workflow_params.update({'event_params': event_params}) # Setup context before schedule triggers. ctx = security.create_context(t['trust_id'], t['project_id']) @@ -167,10 +257,8 @@ class EventEngine(coordination.Service): while True: event = self.event_queue.get() - context = event.get(EVENT_CONTEXT) - event_type = event.get(EVENT_TYPE) - payload = event.get(EVENT_PAYLOAD) - metadata = event.get(EVENT_METADATA) + context = event.get('context') + event_type = event.get('event_type') # NOTE(kong): Use lock here to protect event_triggers_map variable # from being updated outside the thread. @@ -190,7 +278,12 @@ class EventEngine(coordination.Service): LOG.debug('Start to handle event: %s, %d trigger(s) ' 'registered.', event_type, len(triggers)) - self._start_workflow(triggers, payload, metadata) + event_params = self.notification_converter.convert( + event_type, + event + ) + + self._start_workflow(triggers, event_params) self.event_queue.task_done() @@ -200,22 +293,14 @@ class EventEngine(coordination.Service): self.handler_tg.add_thread(self._process_event_queue) - def process_notification_event(self, context, event_type, payload, - metadata): + def process_notification_event(self, notification): """Callback funtion by event handler. Just put notification into a queue. """ - event = { - EVENT_CONTEXT: context, - EVENT_TYPE: event_type, - EVENT_PAYLOAD: payload, - EVENT_METADATA: metadata - } + LOG.debug("Putting notification event to event queue.") - LOG.debug("Adding notification event to event queue: %s", event) - - self.event_queue.put(event) + self.event_queue.put(notification) def create_event_trigger(self, trigger, events): """An endpoint method for creating event trigger. diff --git a/mistral/tests/unit/services/test_event_engine.py b/mistral/tests/unit/services/test_event_engine.py index b335d2885..3a3f6cac2 100644 --- a/mistral/tests/unit/services/test_event_engine.py +++ b/mistral/tests/unit/services/test_event_engine.py @@ -35,7 +35,7 @@ my_wf: """ EXCHANGE_TOPIC = ('openstack', 'notification') -EVENT = 'compute.instance.create.start' +EVENT_TYPE = 'compute.instance.create.start' EVENT_TRIGGER = { 'name': 'trigger1', @@ -44,7 +44,7 @@ EVENT_TRIGGER = { 'workflow_params': {}, 'exchange': 'openstack', 'topic': 'notification', - 'event': EVENT, + 'event': EVENT_TYPE, } cfg.CONF.set_default('auth_enable', False, group='pecan') @@ -74,30 +74,31 @@ class EventEngineTest(base.DbTestCase): self.assertEqual(1, len(e_engine.exchange_topic_events_map)) self.assertEqual( - EVENT, + EVENT_TYPE, list(e_engine.exchange_topic_events_map[EXCHANGE_TOPIC])[0] ) self.assertEqual(1, len(e_engine.event_triggers_map)) - self.assertEqual(1, len(e_engine.event_triggers_map[EVENT])) + self.assertEqual(1, len(e_engine.event_triggers_map[EVENT_TYPE])) self._assert_dict_contains_subset( trigger.to_dict(), - e_engine.event_triggers_map[EVENT][0] + e_engine.event_triggers_map[EVENT_TYPE][0] ) self.assertEqual(1, len(e_engine.exchange_topic_listener_map)) @mock.patch('mistral.messaging.start_listener') def test_process_event_queue(self, mock_start): - trigger = db_api.create_event_trigger(EVENT_TRIGGER) + db_api.create_event_trigger(EVENT_TRIGGER) client = mock.MagicMock() e_engine = event_engine.EventEngine(client) self.addCleanup(e_engine.handler_tg.stop) event = { - event_engine.EVENT_CONTEXT: {}, - event_engine.EVENT_TYPE: EVENT, - event_engine.EVENT_PAYLOAD: {}, - event_engine.EVENT_METADATA: {} + 'event_type': EVENT_TYPE, + 'payload': {}, + 'publisher': 'fake_publisher', + 'timestamp': '', + 'context': {'project_id': 'fake_project', 'user_id': 'fake_user'}, } with mock.patch.object(e_engine, 'engine_client') as client_mock: @@ -111,10 +112,71 @@ class EventEngineTest(base.DbTestCase): self.assertEqual((EVENT_TRIGGER['workflow_id'], {}), args) self.assertDictEqual( { - 'description': 'Workflow execution created by event ' - 'trigger %s.' % trigger.id, - 'event_payload': {}, - 'event_metadata': {} + 'service': 'fake_publisher', + 'project_id': 'fake_project', + 'user_id': 'fake_user', + 'timestamp': '' }, - kwargs + kwargs['event_params'] ) + + +class NotificationsConverterTest(base.BaseTest): + def test_convert(self): + definition_cfg = [ + { + 'event_types': EVENT_TYPE, + 'properties': {'resource_id': '<% $.payload.instance_id %>'} + } + ] + + converter = event_engine.NotificationsConverter() + converter.definitions = [event_engine.EventDefinition(event_def) + for event_def in reversed(definition_cfg)] + + notification = { + 'event_type': EVENT_TYPE, + 'payload': {'instance_id': '12345'}, + 'publisher': 'fake_publisher', + 'timestamp': '', + 'context': {'project_id': 'fake_project', 'user_id': 'fake_user'} + } + + event = converter.convert(EVENT_TYPE, notification) + + self.assertDictEqual( + {'resource_id': '12345'}, + event + ) + + def test_convert_event_type_not_defined(self): + definition_cfg = [ + { + 'event_types': EVENT_TYPE, + 'properties': {'resource_id': '<% $.payload.instance_id %>'} + } + ] + + converter = event_engine.NotificationsConverter() + converter.definitions = [event_engine.EventDefinition(event_def) + for event_def in reversed(definition_cfg)] + + notification = { + 'event_type': 'fake_event', + 'payload': {'instance_id': '12345'}, + 'publisher': 'fake_publisher', + 'timestamp': '', + 'context': {'project_id': 'fake_project', 'user_id': 'fake_user'} + } + + event = converter.convert('fake_event', notification) + + self.assertDictEqual( + { + 'service': 'fake_publisher', + 'project_id': 'fake_project', + 'user_id': 'fake_user', + 'timestamp': '' + }, + event + )