Implement notification of execution events

Introduce execution events and notification server and plugins for
publishing these events for consumers. Event notification is defined per
workflow execution and can be configured to notify on all the events or
only for specific events.

Change-Id: I9820bdc4792a374dad9ad5310f84cd7aaddab8ca
Implements: blueprint mistral-execution-event-subscription
This commit is contained in:
Winson Chan 2017-05-24 22:03:31 +00:00 committed by Dougal Matthews
parent ce454f5f26
commit 3f48e24dc4
27 changed files with 1975 additions and 31 deletions

View File

@ -2,6 +2,7 @@
# Copyright 2015 - StackStorm, Inc.
# Copyright 2015 Huawei Technologies Co., Ltd.
# Copyright 2016 - Brocade Communications Systems, Inc.
# Copyright 2018 - Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -258,8 +259,8 @@ class ExecutionsController(rest.RestController):
result_exec_dict.get('workflow_namespace', ''),
exec_id,
result_exec_dict.get('input'),
result_exec_dict.get('description', ''),
**result_exec_dict.get('params') or {}
description=result_exec_dict.get('description', ''),
**result_exec_dict.get('params', {})
)
return resources.Execution.from_dict(result)

View File

@ -1,4 +1,5 @@
# Copyright 2013 - Mirantis, Inc.
# Copyright 2018 - Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -284,7 +285,8 @@ class Execution(resource.Resource):
@classmethod
def sample(cls):
return cls(id='123e4567-e89b-12d3-a456-426655440000',
return cls(
id='123e4567-e89b-12d3-a456-426655440000',
workflow_name='flow',
workflow_namespace='some_namespace',
workflow_id='123e4567-e89b-12d3-a456-426655441111',
@ -293,9 +295,29 @@ class Execution(resource.Resource):
state='SUCCESS',
input={},
output={},
params={'env': {'k1': 'abc', 'k2': 123}},
params={
'env': {'k1': 'abc', 'k2': 123},
'notify': [
{
'type': 'webhook',
'url': 'http://endpoint/of/webhook',
'headers': {
'Content-Type': 'application/json',
'X-Auth-Token': '123456789'
}
},
{
'type': 'queue',
'topic': 'failover_queue',
'backend': 'rabbitmq',
'host': '127.0.0.1',
'port': 5432
}
]
},
created_at='1970-01-01T00:00:00.000000',
updated_at='1970-01-01T00:00:00.000000')
updated_at='1970-01-01T00:00:00.000000'
)
class Executions(resource.ResourceList):

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python
# Copyright 2016 - Brocade Communications Systems, Inc.
# Copyright 2018 - Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -46,6 +47,7 @@ from mistral import config
from mistral.engine import engine_server
from mistral.event_engine import event_engine_server
from mistral.executors import executor_server
from mistral.notifiers import notification_server
from mistral.rpc import base as rpc
from mistral import version
@ -93,6 +95,10 @@ def launch_event_engine():
launch_thread(event_engine_server.get_oslo_service())
def launch_notifier():
launch_thread(notification_server.get_oslo_service())
def launch_api():
server = api_service.WSGIService('mistral_api')
launch_process(server, workers=server.workers)
@ -118,7 +124,8 @@ LAUNCH_OPTIONS = {
'api': launch_api,
'engine': launch_engine,
'executor': launch_executor,
'event-engine': launch_event_engine
'event-engine': launch_event_engine,
'notifier': launch_notifier
}

View File

@ -1,5 +1,6 @@
# Copyright 2013 - Mirantis, Inc.
# Copyright 2016 - Brocade Communications Systems, Inc.
# Copyright 2018 - Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -300,6 +301,37 @@ event_engine_opts = [
),
]
notifier_opts = [
cfg.StrOpt(
'type',
choices=['local', 'remote'],
default='remote',
help=(
'Type of notifier. Use local to run the notifier within the '
'engine server. Use remote if the notifier is launched as '
'a separate server to process events.'
)
),
cfg.StrOpt(
'host',
default='0.0.0.0',
help=_('Name of the notifier node. This can be an opaque '
'identifier. It is not necessarily a hostname, '
'FQDN, or IP address.')
),
cfg.StrOpt(
'topic',
default='mistral_notifier',
help=_('The message topic that the notifier server listens on.')
),
cfg.ListOpt(
'notify',
item_type=eval,
bounds=True,
help=_('List of publishers to publish notification.')
)
]
execution_expiration_policy_opts = [
cfg.IntOpt(
'evaluation_interval',
@ -425,6 +457,7 @@ EXECUTOR_GROUP = 'executor'
SCHEDULER_GROUP = 'scheduler'
CRON_TRIGGER_GROUP = 'cron_trigger'
EVENT_ENGINE_GROUP = 'event_engine'
NOTIFIER_GROUP = 'notifier'
PECAN_GROUP = 'pecan'
COORDINATION_GROUP = 'coordination'
EXECUTION_EXPIRATION_POLICY_GROUP = 'execution_expiration_policy'
@ -450,6 +483,7 @@ CONF.register_opts(
group=EXECUTION_EXPIRATION_POLICY_GROUP
)
CONF.register_opts(event_engine_opts, group=EVENT_ENGINE_GROUP)
CONF.register_opts(notifier_opts, group=NOTIFIER_GROUP)
CONF.register_opts(pecan_opts, group=PECAN_GROUP)
CONF.register_opts(coordination_opts, group=COORDINATION_GROUP)
CONF.register_opts(profiler_opts, group=PROFILER_GROUP)
@ -494,6 +528,7 @@ def list_opts():
(EVENT_ENGINE_GROUP, event_engine_opts),
(SCHEDULER_GROUP, scheduler_opts),
(CRON_TRIGGER_GROUP, cron_trigger_opts),
(NOTIFIER_GROUP, notifier_opts),
(PECAN_GROUP, pecan_opts),
(COORDINATION_GROUP, coordination_opts),
(EXECUTION_EXPIRATION_POLICY_GROUP, execution_expiration_policy_opts),

View File

@ -1,5 +1,6 @@
# Copyright 2016 - Nokia Networks.
# Copyright 2016 - Brocade Communications Systems, Inc.
# Copyright 2018 - Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -558,6 +559,9 @@ class WorkflowAction(Action):
wf_params['env'] = parent_wf_ex.params['env']
wf_params['evaluate_env'] = parent_wf_ex.params.get('evaluate_env')
if 'notify' in parent_wf_ex.params:
wf_params['notify'] = parent_wf_ex.params['notify']
for k, v in list(input_dict.items()):
if k not in wf_spec.get_input():
wf_params[k] = v

View File

@ -1,6 +1,7 @@
# Copyright 2013 - Mirantis, Inc.
# Copyright 2015 - StackStorm, Inc.
# Copyright 2016 - Brocade Communications Systems, Inc.
# Copyright 2018 - Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -14,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from osprofiler import profiler
from mistral.db import utils as db_utils
@ -42,6 +44,12 @@ class DefaultEngine(base.Engine):
if wf_namespace:
params['namespace'] = wf_namespace
if cfg.CONF.notifier.notify:
if 'notify' not in params or not params['notify']:
params['notify'] = []
params['notify'].extend(cfg.CONF.notifier.notify)
try:
with db_api.transaction():
wf_ex = wf_handler.start_workflow(

View File

@ -1,5 +1,6 @@
# Copyright 2016 - Nokia Networks.
# Copyright 2016 - Brocade Communications Systems, Inc.
# Copyright 2018 - Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -15,6 +16,7 @@
import abc
import copy
from oslo_config import cfg
from oslo_log import log as logging
from osprofiler import profiler
import six
@ -25,6 +27,8 @@ from mistral.engine import dispatcher
from mistral.engine import policies
from mistral import exceptions as exc
from mistral import expressions as expr
from mistral.notifiers import base as notif
from mistral.notifiers import notification_events as events
from mistral import utils
from mistral.utils import wf_trace
from mistral.workflow import base as wf_base
@ -57,6 +61,23 @@ class Task(object):
self.created = False
self.state_changed = False
def notify(self, old_task_state, new_task_state):
publishers = self.wf_ex.params.get('notify')
if not publishers and not isinstance(publishers, list):
return
notifier = notif.get_notifier(cfg.CONF.notifier.type)
event = events.identify_task_event(old_task_state, new_task_state)
notifier.notify(
self.task_ex.id,
self.task_ex.to_dict(),
event,
self.task_ex.updated_at,
publishers
)
def is_completed(self):
return self.task_ex and states.is_completed(self.task_ex.state)
@ -177,8 +198,15 @@ class Task(object):
assert self.task_ex
# Record the current task state.
old_task_state = self.task_ex.state
# Ignore if task already completed.
if self.is_completed():
# Publish task event again so subscribers know
# task completed state is being processed again.
self.notify(old_task_state, self.task_ex.state)
return
# If we were unable to change the task state it means that it was
@ -205,6 +233,9 @@ class Task(object):
# If workflow is paused we shouldn't schedule new commands
# and mark task as processed.
if states.is_paused(self.wf_ex.state):
# Publish task event even if the workflow is paused.
self.notify(old_task_state, self.task_ex.state)
return
wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec)
@ -216,6 +247,9 @@ class Task(object):
# upon its completion.
self.task_ex.processed = True
# Publish task event.
self.notify(old_task_state, self.task_ex.state)
dispatcher.dispatch_workflow_commands(self.wf_ex, cmds)
@profiler.trace('task-update')
@ -230,8 +264,15 @@ class Task(object):
assert self.task_ex
# Record the current task state.
old_task_state = self.task_ex.state
# Ignore if task already completed.
if states.is_completed(self.task_ex.state):
# Publish task event again so subscribers know
# task completed state is being processed again.
self.notify(old_task_state, self.task_ex.state)
return
# Update only if state transition is valid.
@ -247,6 +288,9 @@ class Task(object):
self.set_state(state, state_info)
# Publish event.
self.notify(old_task_state, self.task_ex.state)
def _before_task_start(self):
policies_spec = self.task_spec.get_policies()
@ -340,6 +384,9 @@ class RegularTask(Task):
self._create_task_execution()
# Publish event.
self.notify(None, self.task_ex.state)
LOG.debug(
'Starting task [workflow=%s, task=%s, init_state=%s]',
self.wf_ex.name,
@ -367,8 +414,14 @@ class RegularTask(Task):
'Rerunning succeeded tasks is not supported.'
)
# Record the current task state.
old_task_state = self.task_ex.state
self.set_state(states.RUNNING, None, processed=False)
# Publish event.
self.notify(old_task_state, self.task_ex.state)
self._update_inbound_context()
self._update_triggered_by()
self._reset_actions()

View File

@ -1,5 +1,6 @@
# Copyright 2016 - Nokia Networks.
# Copyright 2016 - Brocade Communications Systems, Inc.
# Copyright 2018 - Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -26,6 +27,8 @@ from mistral.engine import dispatcher
from mistral.engine import utils as engine_utils
from mistral import exceptions as exc
from mistral.lang import parser as spec_parser
from mistral.notifiers import base as notif
from mistral.notifiers import notification_events as events
from mistral.services import triggers
from mistral.services import workflows as wf_service
from mistral import utils
@ -61,6 +64,22 @@ class Workflow(object):
else:
self.wf_spec = None
def notify(self, event):
publishers = self.wf_ex.params.get('notify')
if not publishers and not isinstance(publishers, list):
return
notifier = notif.get_notifier(cfg.CONF.notifier.type)
notifier.notify(
self.wf_ex.id,
self.wf_ex.to_dict(),
event,
self.wf_ex.updated_at,
publishers
)
@profiler.trace('workflow-start')
def start(self, wf_def, wf_ex_id, input_dict, desc='', params=None):
"""Start workflow.
@ -100,6 +119,9 @@ class Workflow(object):
self.set_state(states.RUNNING)
# Publish event as soon as state is set to running.
self.notify(events.WORKFLOW_LAUNCHED)
wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec)
dispatcher.dispatch_workflow_commands(
@ -113,7 +135,6 @@ class Workflow(object):
:param state: New workflow state.
:param msg: Additional explaining message.
"""
assert self.wf_ex
if state == states.SUCCESS:
@ -137,6 +158,9 @@ class Workflow(object):
# Set the state of this workflow to paused.
self.set_state(states.PAUSED, state_info=msg)
# Publish event.
self.notify(events.WORKFLOW_PAUSED)
# If workflow execution is a subworkflow,
# schedule update to the task execution.
if self.wf_ex.task_execution_id:
@ -144,8 +168,6 @@ class Workflow(object):
from mistral.engine import task_handler
task_handler.schedule_on_action_update(self.wf_ex)
return
def resume(self, env=None):
"""Resume workflow.
@ -158,6 +180,9 @@ class Workflow(object):
self.set_state(states.RUNNING)
# Publish event.
self.notify(events.WORKFLOW_RESUMED)
wf_ctrl = wf_base.get_controller(self.wf_ex)
# Calculate commands to process next.
@ -403,6 +428,9 @@ class Workflow(object):
# Set workflow execution to success until after output is evaluated.
self.set_state(states.SUCCESS, msg)
# Publish event.
self.notify(events.WORKFLOW_SUCCEEDED)
if self.wf_ex.task_execution_id:
self._send_result_to_parent_workflow()
@ -448,6 +476,9 @@ class Workflow(object):
self.wf_ex.output = merge_dicts({'result': msg}, output_on_error)
# Publish event.
self.notify(events.WORKFLOW_FAILED)
if self.wf_ex.task_execution_id:
self._send_result_to_parent_workflow()
@ -466,6 +497,9 @@ class Workflow(object):
self.wf_ex.output = {'result': msg}
# Publish event.
self.notify(events.WORKFLOW_CANCELLED)
if self.wf_ex.task_execution_id:
self._send_result_to_parent_workflow()

View File

81
mistral/notifiers/base.py Normal file
View File

@ -0,0 +1,81 @@
# Copyright 2018 - Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import six
from oslo_log import log as logging
from stevedore import driver
LOG = logging.getLogger(__name__)
_NOTIFIERS = {}
_NOTIFICATION_PUBLISHERS = {}
def cleanup():
global _NOTIFIERS
global _NOTIFICATION_PUBLISHERS
_NOTIFIERS = {}
_NOTIFICATION_PUBLISHERS = {}
def get_notifier(notifier_name):
global _NOTIFIERS
if not _NOTIFIERS.get(notifier_name):
mgr = driver.DriverManager(
'mistral.notifiers',
notifier_name,
invoke_on_load=True
)
_NOTIFIERS[notifier_name] = mgr.driver
return _NOTIFIERS[notifier_name]
def get_notification_publisher(publisher_name):
global _NOTIFICATION_PUBLISHERS
if not _NOTIFICATION_PUBLISHERS.get(publisher_name):
mgr = driver.DriverManager(
'mistral.notification.publishers',
publisher_name,
invoke_on_load=True
)
_NOTIFICATION_PUBLISHERS[publisher_name] = mgr.driver
return _NOTIFICATION_PUBLISHERS[publisher_name]
@six.add_metaclass(abc.ABCMeta)
class Notifier(object):
"""Notifier interface."""
@abc.abstractmethod
def notify(self, ex_id, data, event, timestamp, **kwargs):
raise NotImplementedError()
@six.add_metaclass(abc.ABCMeta)
class NotificationPublisher(object):
"""Notifier plugin interface."""
@abc.abstractmethod
def publish(self, ex_id, data, event, timestamp, **kwargs):
raise NotImplementedError()

View File

@ -0,0 +1,44 @@
# Copyright 2018 - Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from oslo_log import log as logging
from mistral.notifiers import base
LOG = logging.getLogger(__name__)
class DefaultNotifier(base.Notifier):
"""Local notifier that process notification request."""
def notify(self, ex_id, data, event, timestamp, publishers):
for entry in publishers:
params = copy.deepcopy(entry)
publisher_name = params.pop('type', None)
if not publisher_name:
LOG.error('Notification publisher type is not specified.')
continue
try:
publisher = base.get_notification_publisher(publisher_name)
publisher.publish(ex_id, data, event, timestamp, **params)
except Exception:
LOG.exception(
'Unable to process event for publisher "%s".',
publisher_name
)

View File

@ -0,0 +1,82 @@
# Copyright 2018 - Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.workflow import states
WORKFLOW_LAUNCHED = 'WORKFLOW_LAUNCHED'
WORKFLOW_SUCCEEDED = 'WORKFLOW_SUCCEEDED'
WORKFLOW_FAILED = 'WORKFLOW_FAILED'
WORKFLOW_CANCELLED = 'WORKFLOW_CANCELLED'
WORKFLOW_PAUSED = 'WORKFLOW_PAUSED'
WORKFLOW_RESUMED = 'WORKFLOW_RESUMED'
WORKFLOWS = [
WORKFLOW_LAUNCHED,
WORKFLOW_SUCCEEDED,
WORKFLOW_FAILED,
WORKFLOW_CANCELLED,
WORKFLOW_PAUSED,
WORKFLOW_RESUMED
]
TASK_LAUNCHED = 'TASK_LAUNCHED'
TASK_SUCCEEDED = 'TASK_SUCCEEDED'
TASK_FAILED = 'TASK_FAILED'
TASK_CANCELLED = 'TASK_CANCELLED'
TASK_PAUSED = 'TASK_PAUSED'
TASK_RESUMED = 'TASK_RESUMED'
TASKS = [
TASK_LAUNCHED,
TASK_SUCCEEDED,
TASK_FAILED,
TASK_CANCELLED,
TASK_PAUSED,
TASK_RESUMED
]
EVENTS = WORKFLOWS + TASKS
TASK_STATE_TRANSITION_MAP = {
states.RUNNING: {
'ANY': TASK_LAUNCHED,
'IDLE': TASK_RESUMED,
'PAUSED': TASK_RESUMED,
'WAITING': TASK_RESUMED
},
states.SUCCESS: {'ANY': TASK_SUCCEEDED},
states.ERROR: {'ANY': TASK_FAILED},
states.CANCELLED: {'ANY': TASK_CANCELLED},
states.PAUSED: {'ANY': TASK_PAUSED}
}
def identify_task_event(old_task_state, new_task_state):
event_options = (
TASK_STATE_TRANSITION_MAP[new_task_state]
if new_task_state in TASK_STATE_TRANSITION_MAP
else {}
)
if not event_options:
return None
event = (
event_options[old_task_state]
if old_task_state and old_task_state in event_options
else event_options['ANY']
)
return event

View File

@ -0,0 +1,93 @@
# Copyright 2018 - Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from mistral import config as cfg
from mistral.notifiers import default_notifier as notif
from mistral.rpc import base as rpc
from mistral.service import base as service_base
from mistral import utils
from mistral.utils import profiler as profiler_utils
LOG = logging.getLogger(__name__)
class NotificationServer(service_base.MistralService):
def __init__(self, notifier, setup_profiler=True):
super(NotificationServer, self).__init__(
'notifier_group',
setup_profiler
)
self.notifier = notifier
self._rpc_server = None
def start(self):
super(NotificationServer, self).start()
if self._setup_profiler:
profiler_utils.setup('mistral-notifier', cfg.CONF.notifier.host)
# Initialize and start RPC server.
self._rpc_server = rpc.get_rpc_server_driver()(cfg.CONF.notifier)
self._rpc_server.register_endpoint(self)
self._rpc_server.run(executor='threading')
self._notify_started('Notification server started.')
def stop(self, graceful=False):
super(NotificationServer, self).stop(graceful)
if self._rpc_server:
self._rpc_server.stop(graceful)
def notify(self, rpc_ctx, ex_id, data, event, timestamp, publishers):
"""Receives calls over RPC to notify on notification server.
:param rpc_ctx: RPC request context dictionary.
:param ex_id: Workflow, task, or action execution id.
:param data: Dictionary to include in the notification message.
:param event: Event being notified on.
:param timestamp: Datetime when this event occurred.
:param publishers: The list of publishers to send the notification.
"""
LOG.info(
"Received RPC request 'notify'[ex_id=%s, event=%s, "
"timestamp=%s, data=%s, publishers=%s]",
ex_id,
event,
timestamp,
data,
utils.cut(publishers)
)
self.notifier.notify(
ex_id,
data,
event,
timestamp,
publishers
)
def get_oslo_service(setup_profiler=True):
return NotificationServer(
notif.DefaultNotifier(),
setup_profiler=setup_profiler
)

View File

View File

@ -0,0 +1,31 @@
# Copyright 2018 - Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from mistral.notifiers import base
LOG = logging.getLogger(__name__)
class NoopPublisher(base.NotificationPublisher):
def publish(self, ex_id, data, event, timestamp, **kwargs):
LOG.info(
'The event %s for %s is published by the '
'noop notification publisher.',
event,
ex_id
)

View File

@ -0,0 +1,36 @@
# Copyright 2018 - Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import requests
from six.moves import http_client
from oslo_log import log as logging
from mistral.notifiers import base
LOG = logging.getLogger(__name__)
class WebhookPublisher(base.NotificationPublisher):
def publish(self, ex_id, data, event, timestamp, **kwargs):
url = kwargs.get('url')
headers = kwargs.get('headers', {})
resp = requests.post(url, data=json.dumps(data), headers=headers)
if resp.status_code not in [http_client.OK, http_client.CREATED]:
raise Exception(resp.text)

View File

@ -0,0 +1,30 @@
# Copyright 2018 - Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log as logging
from mistral.rpc import base as rpc_base
from mistral.rpc import clients as rpc_clients
LOG = logging.getLogger(__name__)
class RemoteNotifier(rpc_clients.NotifierClient):
"""Notifier that passes notification request to a remote notifier."""
def __init__(self):
self.topic = cfg.CONF.notifier.topic
self._client = rpc_base.get_rpc_client_driver()(cfg.CONF.notifier)

View File

@ -1,6 +1,7 @@
# Copyright 2014 - Mirantis, Inc.
# Copyright 2015 - StackStorm, Inc.
# Copyright 2017 - Brocade Communications Systems, Inc.
# Copyright 2018 - Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -15,6 +16,7 @@
# limitations under the License.
from oslo_config import cfg
from oslo_log import log as logging
from osprofiler import profiler
import threading
@ -22,9 +24,12 @@ from mistral import context as auth_ctx
from mistral.engine import base as eng
from mistral.event_engine import base as evt_eng
from mistral.executors import base as exe
from mistral.notifiers import base as notif
from mistral.rpc import base
LOG = logging.getLogger(__name__)
_ENGINE_CLIENT = None
_ENGINE_CLIENT_LOCK = threading.Lock()
@ -34,6 +39,9 @@ _EXECUTOR_CLIENT_LOCK = threading.Lock()
_EVENT_ENGINE_CLIENT = None
_EVENT_ENGINE_CLIENT_LOCK = threading.Lock()
_NOTIFIER_CLIENT = None
_NOTIFIER_CLIENT_LOCK = threading.Lock()
def cleanup():
"""Clean all the RPC clients.
@ -46,15 +54,17 @@ def cleanup():
global _ENGINE_CLIENT
global _EXECUTOR_CLIENT
global _EVENT_ENGINE_CLIENT
global _NOTIFIER_CLIENT
_ENGINE_CLIENT = None
_EXECUTOR_CLIENT = None
_EVENT_ENGINE_CLIENT = None
_NOTIFIER_CLIENT = None
def get_engine_client():
global _ENGINE_CLIENT
global _EVENT_ENGINE_CLIENT_LOCK
global _ENGINE_CLIENT_LOCK
with _ENGINE_CLIENT_LOCK:
if not _ENGINE_CLIENT:
@ -85,6 +95,17 @@ def get_event_engine_client():
return _EVENT_ENGINE_CLIENT
def get_notifier_client():
global _NOTIFIER_CLIENT
global _NOTIFIER_CLIENT_LOCK
with _NOTIFIER_CLIENT_LOCK:
if not _NOTIFIER_CLIENT:
_NOTIFIER_CLIENT = NotifierClient(cfg.CONF.notifier)
return _NOTIFIER_CLIENT
class EngineClient(eng.Engine):
"""RPC Engine client."""
@ -379,3 +400,25 @@ class EventEngineClient(evt_eng.EventEngine):
'update_event_trigger',
trigger=trigger,
)
class NotifierClient(notif.Notifier):
"""RPC Notifier client."""
def __init__(self, rpc_conf_dict):
"""Constructs an RPC client for the Notifier service."""
self._client = base.get_rpc_client_driver()(rpc_conf_dict)
def notify(self, ex_id, data, event, timestamp, publishers):
try:
return self._client.async_call(
auth_ctx.ctx(),
'notify',
ex_id=ex_id,
data=data,
event=event,
timestamp=timestamp,
publishers=publishers
)
except Exception:
LOG.exception('Unable to send notification.')

View File

@ -2,6 +2,7 @@
# Copyright 2015 - StackStorm, Inc.
# Copyright 2015 Huawei Technologies Co., Ltd.
# Copyright 2016 - Brocade Communications Systems, Inc.
# Copyright 2018 - Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -505,13 +506,15 @@ class TestExecutionsController(base.APITest):
load_wf_ex_func.assert_not_called()
kwargs = json.loads(expected_json['params'])
kwargs['description'] = expected_json['description']
start_wf_func.assert_called_once_with(
expected_json['workflow_id'],
'',
None,
json.loads(expected_json['input']),
expected_json['description'],
**json.loads(expected_json['params'])
**kwargs
)
@mock.patch.object(rpc_clients.EngineClient, 'start_workflow')
@ -540,13 +543,15 @@ class TestExecutionsController(base.APITest):
load_wf_ex_func.assert_called_once_with(expected_json['id'])
kwargs = json.loads(expected_json['params'])
kwargs['description'] = expected_json['description']
start_wf_func.assert_called_once_with(
expected_json['workflow_id'],
'',
expected_json['id'],
json.loads(expected_json['input']),
expected_json['description'],
**json.loads(expected_json['params'])
**kwargs
)
@mock.patch.object(rpc_clients.EngineClient, 'start_workflow')
@ -600,7 +605,7 @@ class TestExecutionsController(base.APITest):
'',
exec_dict['id'],
json.loads(exec_dict['input']),
exec_dict['description'],
description=exec_dict['description'],
**json.loads(exec_dict['params'])
)
@ -629,7 +634,7 @@ class TestExecutionsController(base.APITest):
'',
'',
json.loads(exec_dict['input']),
exec_dict['description'],
description=exec_dict['description'],
**json.loads(exec_dict['params'])
)
@ -659,7 +664,7 @@ class TestExecutionsController(base.APITest):
'',
exec_dict['id'],
json.loads(exec_dict['input']),
exec_dict['description'],
description=exec_dict['description'],
**json.loads(exec_dict['params'])
)

View File

@ -1,6 +1,7 @@
# Copyright 2014 - Mirantis, Inc.
# Copyright 2015 - StackStorm, Inc.
# Copyright 2016 - Brocade Communications Systems, Inc.
# Copyright 2018 - Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -24,6 +25,7 @@ from mistral.db.v2 import api as db_api
from mistral.engine import engine_server
from mistral.executors import base as exe
from mistral.executors import executor_server
from mistral.notifiers import notification_server as notif_server
from mistral.rpc import base as rpc_base
from mistral.rpc import clients as rpc_clients
from mistral.tests.unit import base
@ -76,6 +78,18 @@ class EngineTestCase(base.DbTestCase):
self.threads.append(eventlet.spawn(launch_service, exe_svc))
self.addCleanup(exe_svc.stop, True)
# Start remote notifier.
if cfg.CONF.notifier.type == 'remote':
LOG.info("Starting remote notifier threads...")
self.notifier_client = rpc_clients.get_notifier_client()
notif_svc = notif_server.get_oslo_service(setup_profiler=False)
self.notifier = notif_svc.notifier
self.threads.append(eventlet.spawn(launch_service, notif_svc))
self.addCleanup(notif_svc.stop, True)
# Start engine.
LOG.info("Starting engine threads...")
@ -95,6 +109,9 @@ class EngineTestCase(base.DbTestCase):
if cfg.CONF.executor.type == 'remote':
exe_svc.wait_started()
if cfg.CONF.notifier.type == 'remote':
notif_svc.wait_started()
eng_svc.wait_started()
def kill_threads(self):

View File

@ -1,4 +1,5 @@
# Copyright 2017 - Brocade Communications Systems, Inc.
# Copyright 2018 - Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -13,10 +14,11 @@
# limitations under the License.
from oslo_log import log as logging
from stevedore import exception as sd_exc
from mistral.executors import base as exe
from mistral.executors import default_executor as d_exe
from mistral.executors import remote_executor as r_exe
from mistral.executors import default_executor as d
from mistral.executors import remote_executor as r
from mistral.tests.unit.executors import base
@ -32,9 +34,12 @@ class PluginTestCase(base.ExecutorTestCase):
def test_get_local_executor(self):
executor = exe.get_executor('local')
self.assertIsInstance(executor, d_exe.DefaultExecutor)
self.assertIsInstance(executor, d.DefaultExecutor)
def test_get_remote_executor(self):
executor = exe.get_executor('remote')
self.assertIsInstance(executor, r_exe.RemoteExecutor)
self.assertIsInstance(executor, r.RemoteExecutor)
def test_get_bad_executor(self):
self.assertRaises(sd_exc.NoMatches, exe.get_executor, 'foobar')

View File

View File

@ -0,0 +1,47 @@
# Copyright 2018 - Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from mistral.tests.unit.engine import base as engine_test_base
LOG = logging.getLogger(__name__)
class NotifierTestCase(engine_test_base.EngineTestCase):
def await_workflow_success(self, wf_ex_id, post_delay=1):
# Override the original wait method to add a delay to allow enough
# time for the notification events to get processed.
super(NotifierTestCase, self).await_workflow_success(wf_ex_id)
self._sleep(post_delay)
def await_workflow_error(self, wf_ex_id, post_delay=1):
# Override the original wait method to add a delay to allow enough
# time for the notification events to get processed.
super(NotifierTestCase, self).await_workflow_error(wf_ex_id)
self._sleep(post_delay)
def await_workflow_paused(self, wf_ex_id, post_delay=1):
# Override the original wait method to add a delay to allow enough
# time for the notification events to get processed.
super(NotifierTestCase, self).await_workflow_paused(wf_ex_id)
self._sleep(post_delay)
def await_workflow_cancelled(self, wf_ex_id, post_delay=1):
# Override the original wait method to add a delay to allow enough
# time for the notification events to get processed.
super(NotifierTestCase, self).await_workflow_cancelled(wf_ex_id)
self._sleep(post_delay)

View File

@ -0,0 +1,221 @@
# Copyright 2018 - Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from oslo_config import cfg
from stevedore import exception as sd_exc
from mistral.db.v2 import api as db_api
from mistral.notifiers import base as notif
from mistral.notifiers import default_notifier as d_notif
from mistral.notifiers import notification_events as events
from mistral.notifiers import remote_notifier as r_notif
from mistral.services import workflows as wf_svc
from mistral.tests.unit.notifiers import base
from mistral.workflow import states
# Use the set_default method to set value otherwise in certain test cases
# the change in value is not permanent.
cfg.CONF.set_default('auth_enable', False, group='pecan')
EVENT_LOGS = []
def publisher_process(ex_id, data, event, timestamp, **kwargs):
EVENT_LOGS.append((ex_id, event))
def notifier_process(ex_id, data, event, timestamp, publishers):
EVENT_LOGS.append((ex_id, event))
class ServerPluginTestCase(base.NotifierTestCase):
def tearDown(self):
notif.cleanup()
super(ServerPluginTestCase, self).tearDown()
def test_get_bad_notifier(self):
self.assertRaises(sd_exc.NoMatches, notif.get_notifier, 'foobar')
@mock.patch.object(
r_notif.RemoteNotifier,
'notify',
mock.MagicMock(return_value=None)
)
class LocalNotifServerTestCase(base.NotifierTestCase):
@classmethod
def setUpClass(cls):
super(LocalNotifServerTestCase, cls).setUpClass()
cfg.CONF.set_default('type', 'local', group='notifier')
@classmethod
def tearDownClass(cls):
cfg.CONF.set_default('type', 'remote', group='notifier')
super(LocalNotifServerTestCase, cls).tearDownClass()
def setUp(self):
super(LocalNotifServerTestCase, self).setUp()
self.publisher = notif.get_notification_publisher('webhook')
self.publisher.publish = mock.MagicMock(side_effect=publisher_process)
self.publisher.publish.reset_mock()
del EVENT_LOGS[:]
def tearDown(self):
notif.cleanup()
super(LocalNotifServerTestCase, self).tearDown()
def test_get_notifier(self):
notifier = notif.get_notifier(cfg.CONF.notifier.type)
self.assertEqual('local', cfg.CONF.notifier.type)
self.assertIsInstance(notifier, d_notif.DefaultNotifier)
def test_notify(self):
wf_def = """
version: '2.0'
wf:
tasks:
t1:
action: std.noop
on-success:
- t2
t2:
action: std.noop
"""
wf_svc.create_workflows(wf_def)
notif_options = [{'type': 'webhook'}]
wf_ex = self.engine.start_workflow(
'wf',
'',
wf_input={},
notify=notif_options
)
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_exs = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertEqual(2, len(task_exs))
t1_ex = self._assert_single_item(task_exs, name='t1')
t2_ex = self._assert_single_item(task_exs, name='t2')
self.assertEqual(states.SUCCESS, t1_ex.state)
self.assertIsNone(t1_ex.state_info)
self.assertEqual(states.SUCCESS, t2_ex.state)
self.assertIsNone(t2_ex.state_info)
expected_order = [
(wf_ex.id, events.WORKFLOW_LAUNCHED),
(t1_ex.id, events.TASK_LAUNCHED),
(t1_ex.id, events.TASK_SUCCEEDED),
(t2_ex.id, events.TASK_LAUNCHED),
(t2_ex.id, events.TASK_SUCCEEDED),
(wf_ex.id, events.WORKFLOW_SUCCEEDED)
]
self.assertFalse(r_notif.RemoteNotifier.notify.called)
self.assertListEqual(expected_order, EVENT_LOGS)
@mock.patch.object(
r_notif.RemoteNotifier,
'notify',
mock.MagicMock(side_effect=notifier_process)
)
class RemoteNotifServerTestCase(base.NotifierTestCase):
@classmethod
def setUpClass(cls):
super(RemoteNotifServerTestCase, cls).setUpClass()
cfg.CONF.set_default('type', 'remote', group='notifier')
def setUp(self):
super(RemoteNotifServerTestCase, self).setUp()
del EVENT_LOGS[:]
def tearDown(self):
notif.cleanup()
super(RemoteNotifServerTestCase, self).tearDown()
def test_get_notifier(self):
notifier = notif.get_notifier(cfg.CONF.notifier.type)
self.assertEqual('remote', cfg.CONF.notifier.type)
self.assertIsInstance(notifier, r_notif.RemoteNotifier)
def test_notify(self):
wf_def = """
version: '2.0'
wf:
tasks:
t1:
action: std.noop
on-success:
- t2
t2:
action: std.noop
"""
wf_svc.create_workflows(wf_def)
notif_options = [{'type': 'foobar'}]
wf_ex = self.engine.start_workflow(
'wf',
'',
wf_input={},
notify=notif_options
)
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_exs = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertEqual(2, len(task_exs))
t1_ex = self._assert_single_item(task_exs, name='t1')
t2_ex = self._assert_single_item(task_exs, name='t2')
self.assertEqual(states.SUCCESS, t1_ex.state)
self.assertIsNone(t1_ex.state_info)
self.assertEqual(states.SUCCESS, t2_ex.state)
self.assertIsNone(t2_ex.state_info)
expected_order = [
(wf_ex.id, events.WORKFLOW_LAUNCHED),
(t1_ex.id, events.TASK_LAUNCHED),
(t1_ex.id, events.TASK_SUCCEEDED),
(t2_ex.id, events.TASK_LAUNCHED),
(t2_ex.id, events.TASK_SUCCEEDED),
(wf_ex.id, events.WORKFLOW_SUCCEEDED)
]
self.assertTrue(r_notif.RemoteNotifier.notify.called)
self.assertListEqual(expected_order, EVENT_LOGS)

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,5 @@
# Copyright 2017 - Brocade Communications Systems, Inc.
# Copyright 2018 - Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -47,7 +48,7 @@ class ServiceLauncherTest(base.DbTestCase):
api_workers = api_server.workers
self._await(lambda: len(svr_proc_mgr.children.keys()) == api_workers)
self._await(lambda: len(svr_thrd_mgr.services.services) == 3)
self._await(lambda: len(svr_thrd_mgr.services.services) == 4)
def test_launch_process(self):
eventlet.spawn(launch.launch_any, ['api'])

View File

@ -75,6 +75,14 @@ mistral.executors =
local = mistral.executors.default_executor:DefaultExecutor
remote = mistral.executors.remote_executor:RemoteExecutor
mistral.notifiers =
local = mistral.notifiers.default_notifier:DefaultNotifier
remote = mistral.notifiers.remote_notifier:RemoteNotifier
mistral.notification.publishers =
webhook = mistral.notifiers.publishers.webhook:WebhookPublisher
noop = mistral.notifiers.publishers.noop:NoopPublisher
mistral.expression.functions =
# json_pp was deprecated in Queens and will be removed in the S cycle
json_pp = mistral.utils.expression_utils:json_pp_