Functional tests for workflow sharing

Implements: blueprint resource-sharing-cli
Change-Id: I6ef7033b5b095da4a2e67088e62dd2fdb817204a
This commit is contained in:
Lingxian Kong 2016-02-19 23:22:03 +13:00
parent 5c04327ddd
commit 9e5f946797
4 changed files with 100 additions and 0 deletions

View File

@ -94,6 +94,16 @@ class MistralCLIAuth(base.ClientTestBase):
'mistral %s' % mistral_url_op, action, flags, params,
fail_ok)
def get_project_id(self, project='admin'):
project_name = credentials(project)['tenant_name']
admin_clients = self._get_clients()
projects = self.parser.listing(
admin_clients.openstack('project show', params=project_name)
)
return [o['Value'] for o in projects if o['Field'] == 'id'][0]
class MistralCLIAltAuth(base.ClientTestBase):

View File

@ -117,6 +117,20 @@ class MistralClientTestBase(base.MistralCLIAuth, base.MistralCLIAltAuth):
return wf
def workflow_member_create(self, wf_id):
cmd_param = (
'%s workflow %s' % (wf_id, self.get_project_id("demo"))
)
member = self.mistral_admin("member-create", params=cmd_param)
self.addCleanup(
self.mistral_admin,
'member-delete',
params=cmd_param
)
return member
def action_create(self, act_def, admin=True, scope='private'):
params = '{0}'.format(act_def)

View File

@ -180,6 +180,76 @@ class WorkflowIsolationCLITests(base_v2.MistralClientTestBase):
)
class WorkflowSharingCLITests(base_v2.MistralClientTestBase):
def setUp(self):
super(WorkflowSharingCLITests, self).setUp()
self.wf = self.workflow_create(self.wf_def, admin=True)
def _update_shared_workflow(self, new_status='accepted'):
member = self.workflow_member_create(self.wf[0]["ID"])
status = self.get_value_of_field(member, 'Status')
self.assertEqual('pending', status)
cmd_param = '%s workflow --status %s' % (self.wf[0]["ID"], new_status)
member = self.mistral_alt_user("member-update", params=cmd_param)
status = self.get_value_of_field(member, 'Status')
self.assertEqual(new_status, status)
def test_list_accepted_shared_workflow(self):
wfs = self.mistral_alt_user("workflow-list")
self.assertNotIn(self.wf[0]["ID"], [w["ID"] for w in wfs])
self._update_shared_workflow(new_status='accepted')
alt_wfs = self.mistral_alt_user("workflow-list")
self.assertIn(self.wf[0]["ID"], [w["ID"] for w in alt_wfs])
self.assertIn(
self.get_project_id("admin"),
[w["Project ID"] for w in alt_wfs]
)
def test_list_rejected_shared_workflow(self):
self._update_shared_workflow(new_status='rejected')
alt_wfs = self.mistral_alt_user("workflow-list")
self.assertNotIn(self.wf[0]["ID"], [w["ID"] for w in alt_wfs])
def test_create_execution_using_shared_workflow(self):
self._update_shared_workflow(new_status='accepted')
execution = self.execution_create(self.wf[0]["ID"], admin=False)
wf_name = self.get_value_of_field(execution, 'Workflow name')
self.assertEqual(self.wf[0]["Name"], wf_name)
def test_create_contrigger_using_shared_workflow(self):
self._update_shared_workflow(new_status='accepted')
trigger = self.cron_trigger_create(
"test_trigger",
self.wf[0]["ID"],
"{}",
"5 * * * *",
admin=False
)
wf_name = self.get_value_of_field(trigger, 'Workflow')
self.assertEqual(self.wf[0]["Name"], wf_name)
# Admin project can not delete the shared workflow, because it is used
# in a cron-trigger of another project.
self.assertRaises(
exceptions.CommandFailed,
self.mistral_admin,
'workflow-delete',
params=self.wf[0]["ID"]
)
class ActionIsolationCLITests(base_v2.MistralClientTestBase):
def test_actions_name_uniqueness(self):

View File

@ -88,6 +88,12 @@ openstack.workflow_engine.v2 =
workflow_engine_service_list = mistralclient.commands.v2.services:List
resource_member_list = mistralclient.commands.v2.members:List
resource_member_show = mistralclient.commands.v2.members:Get
resource_member_create = mistralclient.commands.v2.members:Create
resource_member_delete = mistralclient.commands.v2.members:Delete
resource_member_update = mistralclient.commands.v2.members:Update
[nosetests]
cover-package = mistralclient