diff --git a/etc/event_definitions.yml.sample b/etc/event_definitions.yml.sample new file mode 100644 index 000000000..83626cbed --- /dev/null +++ b/etc/event_definitions.yml.sample @@ -0,0 +1,6 @@ +- event_types: + - compute.instance.create.* + properties: + resource_id: <% $.payload.instance_id %> + project_id: <% $.context.project_id %> + user_id: <% $.context.user_id %> diff --git a/mistral/config.py b/mistral/config.py index 013c9bb02..c2e998a35 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -191,6 +191,11 @@ event_engine_opts = [ default='mistral_event_engine', help='The message topic that the event engine listens on.' ), + cfg.StrOpt( + 'event_definitions_cfg_file', + default='/etc/mistral/event_definitions.yaml', + help='Configuration file for event definitions.' + ), ] execution_expiration_policy_opts = [ diff --git a/mistral/messaging.py b/mistral/messaging.py index 3dbad34c5..4707505e9 100644 --- a/mistral/messaging.py +++ b/mistral/messaging.py @@ -26,6 +26,7 @@ from oslo_messaging.notify import dispatcher from oslo_messaging.notify import listener from oslo_messaging import target from oslo_messaging import transport +from oslo_utils import timeutils import six LOG = logging.getLogger(__name__) @@ -52,12 +53,16 @@ def handle_event(self, ctxt, publisher_id, event_type, payload, metadata): 'payload: %s, metadata: %s.', publisher_id, event_type, payload, metadata) - self.event_engine.process_notification_event( - ctxt, - event_type, - payload, - metadata - ) + notification = { + 'event_type': event_type, + 'payload': payload, + 'publisher': publisher_id, + 'timestamp': metadata.get('timestamp', + ctxt.get('timestamp', timeutils.utcnow())), + 'context': ctxt + } + + self.event_engine.process_notification_event(notification) return dispatcher.NotificationResult.HANDLED diff --git a/mistral/services/event_engine.py b/mistral/services/event_engine.py index 9d073f601..aef425c42 100644 --- a/mistral/services/event_engine.py +++ b/mistral/services/event_engine.py @@ -13,27 +13,115 @@ # under the License. from collections import defaultdict +import os import threading from oslo_config import cfg from oslo_log import log as logging from oslo_service import threadgroup +from oslo_utils import fnmatch import six +import yaml from mistral import context as auth_ctx from mistral import coordination from mistral.db.v2 import api as db_api +from mistral import exceptions +from mistral import expressions from mistral import messaging as mistral_messaging from mistral.services import security LOG = logging.getLogger(__name__) CONF = cfg.CONF -# Event queue event constants. -EVENT_CONTEXT = 'context' -EVENT_TYPE = 'type' -EVENT_PAYLOAD = 'payload' -EVENT_METADATA = 'metadata' +DEFAULT_PROPERTIES = { + 'service': '<% $.publisher %>', + 'project_id': '<% $.context.project_id %>', + 'user_id': '<% $.context.user_id %>', + 'timestamp': '<% $.timestamp %>' +} + + +class EventDefinition(object): + def __init__(self, definition_cfg): + self.cfg = definition_cfg + + try: + self.event_types = self.cfg['event_types'] + self.properties = self.cfg['properties'] + except KeyError as err: + raise exceptions.MistralException( + "Required field %s not specified" % err.args[0] + ) + + if isinstance(self.event_types, six.string_types): + self.event_types = [self.event_types] + + def match_type(self, event_type): + for t in self.event_types: + if fnmatch.fnmatch(event_type, t): + return True + + return False + + def convert(self, event): + return expressions.evaluate_recursively(self.properties, event) + + +class NotificationsConverter(object): + def __init__(self): + config_file = CONF.event_engine.event_definitions_cfg_file + definition_cfg = [] + + if os.path.exists(config_file): + with open(config_file) as cf: + config = cf.read() + + try: + definition_cfg = yaml.safe_load(config) + except yaml.YAMLError as err: + if hasattr(err, 'problem_mark'): + mark = err.problem_mark + errmsg = ( + "Invalid YAML syntax in Definitions file " + "%(file)s at line: %(line)s, column: %(column)s." + % dict(file=config_file, + line=mark.line + 1, + column=mark.column + 1) + ) + else: + errmsg = ( + "YAML error reading Definitions file %s" % + CONF.event_engine.event_definitions_cfg_file + ) + + LOG.error(errmsg) + + raise exceptions.MistralError( + 'Invalid event definition configuration file. %s' % + config_file + ) + + self.definitions = [EventDefinition(event_def) + for event_def in reversed(definition_cfg)] + + def get_event_definition(self, event_type): + for d in self.definitions: + if d.match_type(event_type): + return d + + return None + + def convert(self, event_type, event): + edef = self.get_event_definition(event_type) + + if edef is None: + LOG.debug('No event definition found for type: %s, use default ' + 'settings instead.', event_type) + + return expressions.evaluate_recursively(DEFAULT_PROPERTIES, event) + + return edef.convert(event) class EventEngine(coordination.Service): @@ -55,6 +143,10 @@ class EventEngine(coordination.Service): self.lock = threading.Lock() + LOG.debug('Loading notification definitions.') + + self.notification_converter = NotificationsConverter() + self._start_handler() self._start_listeners() @@ -131,15 +223,13 @@ class EventEngine(coordination.Service): exchange, topic = ex_t self._add_event_listener(exchange, topic, events) - def _start_workflow(self, triggers, payload, metadata): + def _start_workflow(self, triggers, event_params): """Start workflows defined in event triggers.""" for t in triggers: LOG.info('Start to process event trigger: %s', t['id']) workflow_params = t.get('workflow_params', {}) - workflow_params.update( - {'event_payload': payload, 'event_metadata': metadata} - ) + workflow_params.update({'event_params': event_params}) # Setup context before schedule triggers. ctx = security.create_context(t['trust_id'], t['project_id']) @@ -167,10 +257,8 @@ class EventEngine(coordination.Service): while True: event = self.event_queue.get() - context = event.get(EVENT_CONTEXT) - event_type = event.get(EVENT_TYPE) - payload = event.get(EVENT_PAYLOAD) - metadata = event.get(EVENT_METADATA) + context = event.get('context') + event_type = event.get('event_type') # NOTE(kong): Use lock here to protect event_triggers_map variable # from being updated outside the thread. @@ -190,7 +278,12 @@ class EventEngine(coordination.Service): LOG.debug('Start to handle event: %s, %d trigger(s) ' 'registered.', event_type, len(triggers)) - self._start_workflow(triggers, payload, metadata) + event_params = self.notification_converter.convert( + event_type, + event + ) + + self._start_workflow(triggers, event_params) self.event_queue.task_done() @@ -200,22 +293,14 @@ class EventEngine(coordination.Service): self.handler_tg.add_thread(self._process_event_queue) - def process_notification_event(self, context, event_type, payload, - metadata): + def process_notification_event(self, notification): """Callback funtion by event handler. Just put notification into a queue. """ - event = { - EVENT_CONTEXT: context, - EVENT_TYPE: event_type, - EVENT_PAYLOAD: payload, - EVENT_METADATA: metadata - } + LOG.debug("Putting notification event to event queue.") - LOG.debug("Adding notification event to event queue: %s", event) - - self.event_queue.put(event) + self.event_queue.put(notification) def create_event_trigger(self, trigger, events): """An endpoint method for creating event trigger. diff --git a/mistral/tests/unit/services/test_event_engine.py b/mistral/tests/unit/services/test_event_engine.py index b335d2885..3a3f6cac2 100644 --- a/mistral/tests/unit/services/test_event_engine.py +++ b/mistral/tests/unit/services/test_event_engine.py @@ -35,7 +35,7 @@ my_wf: """ EXCHANGE_TOPIC = ('openstack', 'notification') -EVENT = 'compute.instance.create.start' +EVENT_TYPE = 'compute.instance.create.start' EVENT_TRIGGER = { 'name': 'trigger1', @@ -44,7 +44,7 @@ EVENT_TRIGGER = { 'workflow_params': {}, 'exchange': 'openstack', 'topic': 'notification', - 'event': EVENT, + 'event': EVENT_TYPE, } cfg.CONF.set_default('auth_enable', False, group='pecan') @@ -74,30 +74,31 @@ class EventEngineTest(base.DbTestCase): self.assertEqual(1, len(e_engine.exchange_topic_events_map)) self.assertEqual( - EVENT, + EVENT_TYPE, list(e_engine.exchange_topic_events_map[EXCHANGE_TOPIC])[0] ) self.assertEqual(1, len(e_engine.event_triggers_map)) - self.assertEqual(1, len(e_engine.event_triggers_map[EVENT])) + self.assertEqual(1, len(e_engine.event_triggers_map[EVENT_TYPE])) self._assert_dict_contains_subset( trigger.to_dict(), - e_engine.event_triggers_map[EVENT][0] + e_engine.event_triggers_map[EVENT_TYPE][0] ) self.assertEqual(1, len(e_engine.exchange_topic_listener_map)) @mock.patch('mistral.messaging.start_listener') def test_process_event_queue(self, mock_start): - trigger = db_api.create_event_trigger(EVENT_TRIGGER) + db_api.create_event_trigger(EVENT_TRIGGER) client = mock.MagicMock() e_engine = event_engine.EventEngine(client) self.addCleanup(e_engine.handler_tg.stop) event = { - event_engine.EVENT_CONTEXT: {}, - event_engine.EVENT_TYPE: EVENT, - event_engine.EVENT_PAYLOAD: {}, - event_engine.EVENT_METADATA: {} + 'event_type': EVENT_TYPE, + 'payload': {}, + 'publisher': 'fake_publisher', + 'timestamp': '', + 'context': {'project_id': 'fake_project', 'user_id': 'fake_user'}, } with mock.patch.object(e_engine, 'engine_client') as client_mock: @@ -111,10 +112,71 @@ class EventEngineTest(base.DbTestCase): self.assertEqual((EVENT_TRIGGER['workflow_id'], {}), args) self.assertDictEqual( { - 'description': 'Workflow execution created by event ' - 'trigger %s.' % trigger.id, - 'event_payload': {}, - 'event_metadata': {} + 'service': 'fake_publisher', + 'project_id': 'fake_project', + 'user_id': 'fake_user', + 'timestamp': '' }, - kwargs + kwargs['event_params'] ) + + +class NotificationsConverterTest(base.BaseTest): + def test_convert(self): + definition_cfg = [ + { + 'event_types': EVENT_TYPE, + 'properties': {'resource_id': '<% $.payload.instance_id %>'} + } + ] + + converter = event_engine.NotificationsConverter() + converter.definitions = [event_engine.EventDefinition(event_def) + for event_def in reversed(definition_cfg)] + + notification = { + 'event_type': EVENT_TYPE, + 'payload': {'instance_id': '12345'}, + 'publisher': 'fake_publisher', + 'timestamp': '', + 'context': {'project_id': 'fake_project', 'user_id': 'fake_user'} + } + + event = converter.convert(EVENT_TYPE, notification) + + self.assertDictEqual( + {'resource_id': '12345'}, + event + ) + + def test_convert_event_type_not_defined(self): + definition_cfg = [ + { + 'event_types': EVENT_TYPE, + 'properties': {'resource_id': '<% $.payload.instance_id %>'} + } + ] + + converter = event_engine.NotificationsConverter() + converter.definitions = [event_engine.EventDefinition(event_def) + for event_def in reversed(definition_cfg)] + + notification = { + 'event_type': 'fake_event', + 'payload': {'instance_id': '12345'}, + 'publisher': 'fake_publisher', + 'timestamp': '', + 'context': {'project_id': 'fake_project', 'user_id': 'fake_user'} + } + + event = converter.convert('fake_event', notification) + + self.assertDictEqual( + { + 'service': 'fake_publisher', + 'project_id': 'fake_project', + 'user_id': 'fake_user', + 'timestamp': '' + }, + event + )