Merge "Add test coverage for manage_existing API revert"

This commit is contained in:
Zuul 2020-04-14 18:41:54 +00:00 committed by Gerrit Code Review
commit 312387b81a

View File

@ -15,6 +15,7 @@ import inspect
from unittest import mock from unittest import mock
import taskflow.engines import taskflow.engines
from taskflow.types import failure as ft
from cinder import context from cinder import context
from cinder import exception from cinder import exception
@ -64,6 +65,21 @@ class ManageVolumeFlowTestCase(test.TestCase):
create_what.pop('volume_id') create_what.pop('volume_id')
task.execute(self.ctxt, **create_what) task.execute(self.ctxt, **create_what)
def test_cast_manage_existing_revert(self):
volume = fake_volume.fake_volume_obj(self.ctxt)
volume.save = mock.MagicMock()
# Fake objects assert specs
task = manage_existing.ManageCastTask(
fake_volume_api.FakeSchedulerRpcAPI({}, self),
fake_volume_api.FakeDb())
flow_failures = [mock.MagicMock()]
task.revert(self.ctxt, {}, flow_failures, volume)
# Check that volume status is updated and saved
self.assertEqual(volume.status, 'error_managing')
volume.save.assert_called_once()
def test_create_db_entry_task_with_multiattach(self): def test_create_db_entry_task_with_multiattach(self):
fake_volume_type = fake_volume.fake_volume_type_obj( fake_volume_type = fake_volume.fake_volume_type_obj(
@ -86,6 +102,32 @@ class ManageVolumeFlowTestCase(test.TestCase):
result = task.execute(self.ctxt, **spec) result = task.execute(self.ctxt, **spec)
self.assertTrue(result['volume_properties']['multiattach']) self.assertTrue(result['volume_properties']['multiattach'])
def test_revert_manage_existing(self):
fake_db = fake_volume_api.FakeDb()
fake_db.volume_destroy = mock.MagicMock()
self.ctxt.elevated = mock.Mock()
task = manage_existing.EntryCreateTask(fake_db)
task.revert(self.ctxt, {'volume_id': fakes.VOLUME_ID})
# Check DB entry task is destroyed
fake_db.volume_destroy.assert_called_once_with(
self.ctxt.elevated.return_value,
fakes.VOLUME_ID
)
def test_revert_manage_existing_with_ft_failure(self):
fake_db = fake_volume_api.FakeDb()
fake_db.volume_destroy = mock.MagicMock()
mock_failure = mock.Mock(spec=ft.Failure)
task = manage_existing.EntryCreateTask(fake_db)
task.revert(self.ctxt, mock_failure)
# Check DB entry task is not destroyed
fake_db.volume_destroy.assert_not_called()
@staticmethod @staticmethod
def _stub_volume_object_get(self): def _stub_volume_object_get(self):
volume = { volume = {