diff --git a/cinder/tests/unit/volume/flows/test_manage_volume_flow.py b/cinder/tests/unit/volume/flows/test_manage_volume_flow.py index b133e950289..712b8bbd1a8 100644 --- a/cinder/tests/unit/volume/flows/test_manage_volume_flow.py +++ b/cinder/tests/unit/volume/flows/test_manage_volume_flow.py @@ -15,6 +15,7 @@ import inspect from unittest import mock import taskflow.engines +from taskflow.types import failure as ft from cinder import context from cinder import exception @@ -64,6 +65,21 @@ class ManageVolumeFlowTestCase(test.TestCase): create_what.pop('volume_id') task.execute(self.ctxt, **create_what) + def test_cast_manage_existing_revert(self): + volume = fake_volume.fake_volume_obj(self.ctxt) + volume.save = mock.MagicMock() + + # Fake objects assert specs + task = manage_existing.ManageCastTask( + fake_volume_api.FakeSchedulerRpcAPI({}, self), + fake_volume_api.FakeDb()) + flow_failures = [mock.MagicMock()] + task.revert(self.ctxt, {}, flow_failures, volume) + + # Check that volume status is updated and saved + self.assertEqual(volume.status, 'error_managing') + volume.save.assert_called_once() + def test_create_db_entry_task_with_multiattach(self): fake_volume_type = fake_volume.fake_volume_type_obj( @@ -86,6 +102,32 @@ class ManageVolumeFlowTestCase(test.TestCase): result = task.execute(self.ctxt, **spec) self.assertTrue(result['volume_properties']['multiattach']) + def test_revert_manage_existing(self): + fake_db = fake_volume_api.FakeDb() + fake_db.volume_destroy = mock.MagicMock() + + self.ctxt.elevated = mock.Mock() + + task = manage_existing.EntryCreateTask(fake_db) + task.revert(self.ctxt, {'volume_id': fakes.VOLUME_ID}) + + # Check DB entry task is destroyed + fake_db.volume_destroy.assert_called_once_with( + self.ctxt.elevated.return_value, + fakes.VOLUME_ID + ) + + def test_revert_manage_existing_with_ft_failure(self): + fake_db = fake_volume_api.FakeDb() + fake_db.volume_destroy = mock.MagicMock() + + mock_failure = mock.Mock(spec=ft.Failure) + task = manage_existing.EntryCreateTask(fake_db) + task.revert(self.ctxt, mock_failure) + + # Check DB entry task is not destroyed + fake_db.volume_destroy.assert_not_called() + @staticmethod def _stub_volume_object_get(self): volume = {