Use the new action context in MistralHTTPAction

This move allows us to remove the previous method for injecting
context information into actions. It was only used by the
MistralHTTPAction.

Related-Bug: #1718353
Co-Authored-By: Renat Akhmerov <renat.akhmerov@gmail.com>
Change-Id: Ie5c39d35e5848accb24f1cd15e88a4a2dd4b69c0
This commit is contained in:
Renat Akhmerov 2017-09-20 13:51:49 +07:00 committed by Dougal Matthews
parent dd4a4bd440
commit f9457b8cc6
5 changed files with 98 additions and 288 deletions

View File

@ -237,44 +237,19 @@ class HTTPAction(actions.Action):
class MistralHTTPAction(HTTPAction):
def __init__(self,
action_context,
url,
method="GET",
params=None,
body=None,
headers=None,
cookies=None,
auth=None,
timeout=None,
allow_redirects=None,
proxies=None,
verify=None):
def run(self, context):
self.headers = self.headers or {}
actx = action_context
headers = headers or {}
headers.update({
'Mistral-Workflow-Name': actx.get('workflow_name'),
'Mistral-Workflow-Execution-Id': actx.get('workflow_execution_id'),
'Mistral-Task-Id': actx.get('task_id'),
'Mistral-Action-Execution-Id': actx.get('action_execution_id'),
'Mistral-Callback-URL': actx.get('callback_url'),
exec_ctx = context.execution
self.headers.update({
'Mistral-Workflow-Name': exec_ctx.workflow_name,
'Mistral-Workflow-Execution-Id': exec_ctx.workflow_execution_id,
'Mistral-Task-Id': exec_ctx.task_id,
'Mistral-Action-Execution-Id': exec_ctx.action_execution_id,
'Mistral-Callback-URL': exec_ctx.callback_url,
})
super(MistralHTTPAction, self).__init__(
url,
method,
params,
body,
headers,
cookies,
auth,
timeout,
allow_redirects,
proxies,
verify,
)
super(MistralHTTPAction, self).run(context)
def is_sync(self):
return False

View File

@ -177,11 +177,6 @@ class Action(object):
# state within the current session.
self.task_ex.action_executions.append(self.action_ex)
def _inject_action_ctx_for_validating(self, input_dict):
if a_m.has_action_context(
self.action_def.action_class, self.action_def.attributes):
input_dict.update(a_m.get_empty_action_context())
@profiler.trace('action-log-result', hide_args=True)
def _log_result(self, prev_state, result):
state = self.action_ex.state
@ -239,8 +234,6 @@ class PythonAction(Action):
# DB object is created.
action_ex_id = utils.generate_unicode_uuid()
self._insert_action_context(action_ex_id, input_dict)
self._create_action_execution(
self._prepare_input(input_dict),
self._prepare_runtime_context(index, safe_rerun),
@ -271,8 +264,6 @@ class PythonAction(Action):
# DB object is created.
action_ex_id = utils.generate_unicode_uuid()
self._insert_action_context(action_ex_id, input_dict, save=save)
if save:
self._create_action_execution(
input_dict,
@ -306,8 +297,6 @@ class PythonAction(Action):
return a.is_sync()
def validate_input(self, input_dict):
if self.action_def.action_class:
self._inject_action_ctx_for_validating(input_dict)
# NOTE(kong): Don't validate action input if action initialization
# method contains ** argument.
@ -362,24 +351,6 @@ class PythonAction(Action):
"""
return {'index': index, 'safe_rerun': safe_rerun}
def _insert_action_context(self, action_ex_id, input_dict, save=True):
"""Template method to prepare action context.
It inserts the action context in the input if required
runtime context.
"""
# we need to push action context to all actions. It's related to
# https://blueprints.launchpad.net/mistral/+spec/mistral-custom-actions-api
has_action_context = a_m.has_action_context(
self.action_def.action_class,
self.action_def.attributes or {}
)
if has_action_context:
input_dict.update(
a_m.get_action_context(self.task_ex, action_ex_id, save=save)
)
class AdHocAction(PythonAction):
"""Ad-hoc action."""

View File

@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
from oslo_log import log as logging
from stevedore import extension
@ -145,57 +143,3 @@ def get_action_class(action_full_name):
action_db.action_class,
action_db.attributes
)
def get_action_context(task_ex, action_ex_id, save=True):
if task_ex:
return {
_ACTION_CTX_PARAM: {
'workflow_name': task_ex.workflow_name,
'workflow_execution_id': task_ex.workflow_execution_id,
'task_id': task_ex.id,
'task_name': task_ex.name,
'task_tags': task_ex.tags,
'action_execution_id': action_ex_id,
'callback_url': '/v2/action_executions/%s' % action_ex_id
}
}
elif save:
return {
_ACTION_CTX_PARAM: {
'workflow_name': None,
'workflow_execution_id': None,
'task_id': None,
'task_name': None,
'task_tags': None,
'action_execution_id': action_ex_id,
'callback_url': '/v2/action_executions/%s' % action_ex_id
}
}
return {
_ACTION_CTX_PARAM: {
'workflow_name': None,
'workflow_execution_id': None,
'task_id': None,
'task_name': None,
'task_tags': None,
'action_execution_id': None,
'callback_url': None
}
}
def get_empty_action_context():
return {_ACTION_CTX_PARAM: {}}
def _has_argument(action, attributes, argument_name):
action_cls = action_factory.construct_action_class(action, attributes)
arg_spec = inspect.getargspec(action_cls.__init__)
return argument_name in arg_spec.args
def has_action_context(action, attributes):
return _has_argument(action, attributes, _ACTION_CTX_PARAM)

View File

@ -1,5 +1,3 @@
# Copyright 2014 - StackStorm, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
@ -13,184 +11,68 @@
# limitations under the License.
import mock
from oslo_config import cfg
import requests
from mistral.actions import std_actions
from mistral.db.v2 import api as db_api
from mistral.services import workbooks as wb_service
from mistral.services import workflows as wf_service
from mistral.tests.unit import base as test_base
from mistral.tests.unit.engine import base
from mistral.workflow import states
from mistral_lib import actions as actions_base
# Use the set_default method to set value otherwise in certain test cases
# the change in value is not permanent.
cfg.CONF.set_default('auth_enable', False, group='pecan')
WORKBOOK = """
WF = """
---
version: '2.0'
name: wb
workflows:
wf1:
type: direct
wf:
tasks:
task1:
action: std.mistral_http
input:
url: https://wiki.openstack.org/wiki/mistral
publish:
result: <% task(task1).result %>
action: my_action
"""
class MyAction(actions_base.Action):
def __init__(self):
pass
def run(self, context):
pass
class ActionContextTest(base.EngineTestCase):
def setUp(self):
super(ActionContextTest, self).setUp()
@mock.patch.object(
requests, 'request',
mock.MagicMock(return_value=test_base.FakeHTTPResponse('', 200, 'OK')))
@mock.patch.object(
std_actions.MistralHTTPAction, 'is_sync',
mock.MagicMock(return_value=True))
def test_action_context(self):
wb_service.create_workbook_v2(WORKBOOK)
test_base.register_action_class('my_action', MyAction)
wf_ex = self.engine.start_workflow('wb.wf1')
@mock.patch.object(MyAction, 'run', return_value=None)
def test_context(self, mocked_run):
wf_service.create_workflows(WF)
# Start workflow.
wf_ex = self.engine.start_workflow('wf', '', {})
self.await_workflow_success(wf_ex.id)
self.assertEqual(1, len(mocked_run.call_args_list))
action_context = mocked_run.call_args[0][0]
exec_context = action_context.execution
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertEqual(exec_context.workflow_execution_id, wf_ex.id)
task_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
action_ex = self._assert_single_item(task_ex.executions)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
a_ex = task1.action_executions[0]
headers = {
'Mistral-Workflow-Name': wf_ex.workflow_name,
'Mistral-Workflow-Execution-Id': wf_ex.id,
'Mistral-Task-Id': task_ex.id,
'Mistral-Action-Execution-Id': action_ex.id,
'Mistral-Callback-URL': '/v2/action_executions/%s' % action_ex.id
}
self.assertEqual(exec_context.task_id, task1.id)
requests.request.assert_called_with(
'GET',
'https://wiki.openstack.org/wiki/mistral',
params=None,
data=None,
headers=headers,
cookies=None,
auth=None,
timeout=None,
allow_redirects=None,
proxies=None,
verify=None
)
self.assertEqual(exec_context.workflow_name, wf_ex.name)
@mock.patch.object(
requests, 'request',
mock.MagicMock(return_value=test_base.FakeHTTPResponse('', 200, 'OK')))
def test_single_async_saved_action_context(self):
action_ex = self.engine.start_action(
'std.mistral_http',
{'url': 'https://wiki.openstack.org/wiki/mistral'},
save_result=True
)
callback_url = "/v2/action_executions/{}".format(a_ex.id)
action_context = {
'action_execution_id': action_ex.id,
'callback_url': '/v2/action_executions/%s' % action_ex.id,
'task_id': None,
'task_name': None,
'task_tags': None,
'workflow_name': None,
'workflow_execution_id': None
}
self.assertIn('action_context', action_ex.input)
self.assertEqual(action_context, action_ex.input['action_context'])
@mock.patch.object(
requests, 'request',
mock.MagicMock(return_value=test_base.FakeHTTPResponse('', 200, 'OK')))
def test_single_async_action_context(self):
action_ex = self.engine.start_action(
'std.mistral_http',
{'url': 'https://wiki.openstack.org/wiki/mistral'},
save_result=False
)
action_context = {
'action_execution_id': action_ex.id,
'callback_url': '/v2/action_executions/%s' % action_ex.id,
'task_id': None,
'task_name': None,
'task_tags': None,
'workflow_name': None,
'workflow_execution_id': None
}
self.assertIn('action_context', action_ex.input)
self.assertEqual(action_context, action_ex.input['action_context'])
@mock.patch.object(
requests, 'request',
mock.MagicMock(return_value=test_base.FakeHTTPResponse('', 200, 'OK')))
@mock.patch.object(
std_actions.MistralHTTPAction, 'is_sync',
mock.MagicMock(return_value=True))
def test_single_sync_saved_action_context(self):
action_ex = self.engine.start_action(
'std.mistral_http',
{'url': 'https://wiki.openstack.org/wiki/mistral'},
save_result=True
)
action_context = {
'action_execution_id': action_ex.id,
'callback_url': '/v2/action_executions/%s' % action_ex.id,
'task_id': None,
'task_name': None,
'task_tags': None,
'workflow_name': None,
'workflow_execution_id': None
}
self.assertIn('action_context', action_ex.input)
self.assertEqual(action_context, action_ex.input['action_context'])
@mock.patch.object(
requests, 'request',
mock.MagicMock(return_value=test_base.FakeHTTPResponse('', 200, 'OK')))
@mock.patch.object(
std_actions.MistralHTTPAction, 'is_sync',
mock.MagicMock(return_value=True))
def test_single_sync_action_context(self):
action_ex = self.engine.start_action(
'std.mistral_http',
{'url': 'https://wiki.openstack.org/wiki/mistral'},
save_result=False
)
action_context = {
'action_execution_id': None,
'callback_url': None,
'task_id': None,
'task_name': None,
'task_tags': None,
'workflow_name': None,
'workflow_execution_id': None
}
self.assertIn('action_context', action_ex.input)
self.assertEqual(action_context, action_ex.input['action_context'])
self.assertEqual(exec_context.callback_url, callback_url)
self.assertEqual(exec_context.action_execution_id, a_ex.id)

View File

@ -214,9 +214,14 @@ class AdhocActionsTest(base.EngineTestCase):
def test_run_adhoc_action_with_env(self):
wf_ex = self.engine.start_workflow(
'my_wb.wf4', '', {'str1': 'a'}, env={'foo': 'bar'})
'my_wb.wf4',
'',
{'str1': 'a'},
env={'foo': 'bar'}
)
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -228,10 +233,10 @@ class AdhocActionsTest(base.EngineTestCase):
)
def test_run_nested_adhoc_with_output(self):
wf_ex = self.engine.start_workflow(
'my_wb.wf5', '')
wf_ex = self.engine.start_workflow('my_wb.wf5', '')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -243,9 +248,10 @@ class AdhocActionsTest(base.EngineTestCase):
)
def test_missing_adhoc_action_definition(self):
wf_ex = self.engine.start_workflow(
'my_wb.wf6', '')
wf_ex = self.engine.start_workflow('my_wb.wf6', '')
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -257,16 +263,48 @@ class AdhocActionsTest(base.EngineTestCase):
self.assertEqual(states.ERROR, task1.state)
def test_nested_missing_adhoc_action_definition(self):
wf_ex = self.engine.start_workflow(
'my_wb.wf7', '')
wf_ex = self.engine.start_workflow('my_wb.wf7', '')
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks,
name='nested_missing_action')
task1 = self._assert_single_item(
tasks,
name='nested_missing_action'
)
self.assertEqual(states.ERROR, task1.state)
def test_adhoc_async_action(self):
wb_text = """---
version: '2.0'
name: my_wb1
actions:
my_action:
input:
- my_param
base: std.mistral_http
base-input:
url: http://google.com/<% $.my_param %>
method: GET
workflows:
my_wf:
tasks:
task1:
action: my_action my_param="asdfasdf"
"""
wb_service.create_workbook_v2(wb_text)
wf_ex = self.engine.start_workflow('my_wb1.my_wf')
self.await_workflow_running(wf_ex.id)