Add parameters to force failures in nop action

In order to test the different code paths for action execution
it is very useful to be able to make the actions fail in the different
execution stages.

This patch adds three new options `fail_pre_condition`, `fail_execute`
and `fail_post_condition`. Setting any of them to True makes the action
to fail in the specified step.

Change-Id: Ied8c0bb767d9bb6bdfb9209365857a3b4d606b40
Signed-off-by: Alfredo Moralejo <amoralej@redhat.com>
This commit is contained in:
Alfredo Moralejo
2025-07-24 19:04:43 +02:00
parent 1a9f17748e
commit 1fb89aeac3
3 changed files with 94 additions and 3 deletions

View File

@@ -0,0 +1,14 @@
---
features:
- |
Three new parameters have been added to the ``nop`` action:
* ``fail_pre_condition``: When setting it to `true` the action
fails on the pre_condition execution.
* ``fail_execute``: When setting it to `true` the action fails
on the execute step.
* ``fail_post_condition``: When setting it to `true` the action
fails on the post_condition execution.

View File

@@ -20,6 +20,7 @@
from oslo_log import log
from watcher.applier.actions import base
from watcher.common import exception
LOG = log.getLogger(__name__)
@@ -45,6 +46,18 @@ class Nop(base.BaseAction):
'properties': {
'message': {
'type': ['string', 'null']
},
'fail_pre_condition': {
'type': 'boolean',
'default': False
},
'fail_execute': {
'type': 'boolean',
'default': False
},
'fail_post_condition': {
'type': 'boolean',
'default': False
}
},
'required': ['message'],
@@ -55,8 +68,13 @@ class Nop(base.BaseAction):
def message(self):
return self.input_parameters.get(self.MESSAGE)
def debug_message(self, message):
return ("Executing action NOP message: %s ", message)
def execute(self):
LOG.debug("Executing action NOP message: %s ", self.message)
LOG.debug(self.debug_message(self.message))
if self.input_parameters.get('fail_execute'):
return False
return True
def revert(self):
@@ -64,10 +82,12 @@ class Nop(base.BaseAction):
return True
def pre_condition(self):
pass
if self.input_parameters.get('fail_pre_condition'):
raise exception.WatcherException("Failed in pre_condition")
def post_condition(self):
pass
if self.input_parameters.get('fail_post_condition'):
raise exception.WatcherException("Failed in post_condition")
def get_description(self):
"""Description of the action"""

View File

@@ -91,6 +91,63 @@ class TestTaskFlowActionContainer(base.DbTestCase):
self.engine.context, action.uuid)
self.assertEqual(obj_action.state, objects.action.State.FAILED)
def test_execute_with_failed_execute(self):
action_plan = obj_utils.create_test_action_plan(
self.context, audit_id=self.audit.id,
strategy_id=self.strategy.id,
state=objects.action_plan.State.ONGOING)
action = obj_utils.create_test_action(
self.context, action_plan_id=action_plan.id,
state=objects.action.State.PENDING,
action_type='nop',
input_parameters={'message': 'hello World',
'fail_execute': True})
action_container = tflow.TaskFlowActionContainer(
db_action=action,
engine=self.engine)
action_container.execute()
obj_action = objects.Action.get_by_uuid(
self.engine.context, action.uuid)
self.assertEqual(obj_action.state, objects.action.State.FAILED)
def test_pre_execute_with_failed_pre_condition(self):
action_plan = obj_utils.create_test_action_plan(
self.context, audit_id=self.audit.id,
strategy_id=self.strategy.id,
state=objects.action_plan.State.ONGOING)
action = obj_utils.create_test_action(
self.context, action_plan_id=action_plan.id,
state=objects.action.State.PENDING,
action_type='nop',
input_parameters={'message': 'hello World',
'fail_pre_condition': True})
action_container = tflow.TaskFlowActionContainer(
db_action=action,
engine=self.engine)
action_container.pre_execute()
obj_action = objects.Action.get_by_uuid(
self.engine.context, action.uuid)
self.assertEqual(obj_action.state, objects.action.State.FAILED)
def test_post_execute_with_failed_post_condition(self):
action_plan = obj_utils.create_test_action_plan(
self.context, audit_id=self.audit.id,
strategy_id=self.strategy.id,
state=objects.action_plan.State.ONGOING)
action = obj_utils.create_test_action(
self.context, action_plan_id=action_plan.id,
state=objects.action.State.ONGOING,
action_type='nop',
input_parameters={'message': 'hello World',
'fail_post_condition': True})
action_container = tflow.TaskFlowActionContainer(
db_action=action,
engine=self.engine)
action_container.post_execute()
obj_action = objects.Action.get_by_uuid(
self.engine.context, action.uuid)
self.assertEqual(obj_action.state, objects.action.State.FAILED)
@mock.patch('eventlet.spawn')
def test_execute_with_cancel_action_plan(self, mock_eventlet_spawn):
action_plan = obj_utils.create_test_action_plan(