From 2d10d8a38d64a7e0b237236acf8aaeaf515f1395 Mon Sep 17 00:00:00 2001 From: Yuriy Nesenenko Date: Fri, 4 Dec 2015 16:40:39 +0200 Subject: [PATCH] Small refactoring in test_admin_actions.py This patch splits tests to reduce duplication code. Also it adds the method tearDown(). This method will only be called if the setUp() succeeds, regardless of the outcome of the test method. Change-Id: I8df68f2cd4331cb7f4993d864417a045908888db --- .../unit/api/contrib/test_admin_actions.py | 1031 ++++++++--------- 1 file changed, 492 insertions(+), 539 deletions(-) diff --git a/cinder/tests/unit/api/contrib/test_admin_actions.py b/cinder/tests/unit/api/contrib/test_admin_actions.py index 60b6c3bd9fb..5a005685c45 100644 --- a/cinder/tests/unit/api/contrib/test_admin_actions.py +++ b/cinder/tests/unit/api/contrib/test_admin_actions.py @@ -45,7 +45,27 @@ def app(): return mapper -class AdminActionsTest(test.TestCase): +class BaseAdminTest(test.TestCase): + def setUp(self): + super(BaseAdminTest, self).setUp() + self.volume_api = volume_api.API() + # admin context + self.ctx = context.RequestContext('admin', 'fake', True) + + def _create_volume(self, context, updates=None): + db_volume = {'status': 'available', + 'host': 'test', + 'availability_zone': 'fake_zone', + 'attach_status': 'detached'} + if updates: + db_volume.update(updates) + + volume = objects.Volume(context=context, **db_volume) + volume.create() + return volume + + +class AdminActionsTest(BaseAdminTest): def setUp(self): super(AdminActionsTest, self).setUp() @@ -57,7 +77,6 @@ class AdminActionsTest(test.TestCase): group='oslo_concurrency') self.flags(rpc_backend='cinder.openstack.common.rpc.impl_fake') - self.volume_api = volume_api.API() cast_as_call.mock_cast_as_call(self.volume_api.volume_rpcapi.client) cast_as_call.mock_cast_as_call(self.volume_api.scheduler_rpcapi.client) @@ -89,18 +108,6 @@ class AdminActionsTest(test.TestCase): resp = req.get_response(app()) return resp - def _create_volume(self, context, updates=None): - db_volume = {'status': 'available', - 'host': 'test', - 'availability_zone': 'fake_zone', - 'attach_status': 'detached'} - if updates: - db_volume.update(updates) - - volume = objects.Volume(context=context, **db_volume) - volume.create() - return volume - def test_valid_updates(self): vac = admin_actions.VolumeAdminController() @@ -120,68 +127,63 @@ class AdminActionsTest(test.TestCase): vac.validate_update({'migration_status': 'starting'}) def test_reset_attach_status(self): - ctx = context.RequestContext('admin', 'fake', True) - volume = db.volume_create(ctx, {'attach_status': 'detached'}) + volume = db.volume_create(self.ctx, {'attach_status': 'detached'}) - resp = self._issue_volume_reset(ctx, + resp = self._issue_volume_reset(self.ctx, volume, {'attach_status': 'attached'}) self.assertEqual(202, resp.status_int) - volume = db.volume_get(ctx, volume['id']) + volume = db.volume_get(self.ctx, volume['id']) self.assertEqual('attached', volume['attach_status']) def test_reset_attach_invalid_status(self): - ctx = context.RequestContext('admin', 'fake', True) - volume = db.volume_create(ctx, {'attach_status': 'detached'}) + volume = db.volume_create(self.ctx, {'attach_status': 'detached'}) - resp = self._issue_volume_reset(ctx, + resp = self._issue_volume_reset(self.ctx, volume, {'attach_status': 'bogus-status'}) self.assertEqual(400, resp.status_int) - volume = db.volume_get(ctx, volume['id']) + volume = db.volume_get(self.ctx, volume['id']) self.assertEqual('detached', volume['attach_status']) def test_reset_migration_invalid_status(self): - ctx = context.RequestContext('admin', 'fake', True) - volume = db.volume_create(ctx, {'migration_status': None}) + volume = db.volume_create(self.ctx, {'migration_status': None}) - resp = self._issue_volume_reset(ctx, + resp = self._issue_volume_reset(self.ctx, volume, {'migration_status': 'bogus-status'}) self.assertEqual(400, resp.status_int) - volume = db.volume_get(ctx, volume['id']) + volume = db.volume_get(self.ctx, volume['id']) self.assertIsNone(volume['migration_status']) def test_reset_migration_status(self): - ctx = context.RequestContext('admin', 'fake', True) - volume = db.volume_create(ctx, {'migration_status': None}) + volume = db.volume_create(self.ctx, {'migration_status': None}) - resp = self._issue_volume_reset(ctx, + resp = self._issue_volume_reset(self.ctx, volume, {'migration_status': 'migrating'}) self.assertEqual(202, resp.status_int) - volume = db.volume_get(ctx, volume['id']) + volume = db.volume_get(self.ctx, volume['id']) self.assertEqual('migrating', volume['migration_status']) def test_reset_status_as_admin(self): - ctx = context.RequestContext('admin', 'fake', True) - volume = db.volume_create(ctx, {'status': 'available'}) + volume = db.volume_create(self.ctx, {'status': 'available'}) - resp = self._issue_volume_reset(ctx, + resp = self._issue_volume_reset(self.ctx, volume, {'status': 'error'}) self.assertEqual(202, resp.status_int) - volume = db.volume_get(ctx, volume['id']) + volume = db.volume_get(self.ctx, volume['id']) self.assertEqual('error', volume['status']) def test_reset_status_as_non_admin(self): ctx = context.RequestContext('fake', 'fake') - volume = db.volume_create(context.get_admin_context(), + volume = db.volume_create(self.ctx, {'status': 'error', 'size': 1}) resp = self._issue_volume_reset(ctx, @@ -190,22 +192,21 @@ class AdminActionsTest(test.TestCase): # request is not authorized self.assertEqual(403, resp.status_int) - volume = db.volume_get(context.get_admin_context(), volume['id']) + volume = db.volume_get(self.ctx, volume['id']) # status is still 'error' self.assertEqual('error', volume['status']) def test_backup_reset_status_as_admin(self): - ctx = context.RequestContext('admin', 'fake', True) - volume = db.volume_create(ctx, {'status': 'available', - 'user_id': 'user', - 'project_id': 'project'}) - backup = db.backup_create(ctx, {'status': 'available', - 'size': 1, - 'volume_id': volume['id'], - 'user_id': 'user', - 'project_id': 'project'}) + volume = db.volume_create(self.ctx, {'status': 'available', + 'user_id': 'user', + 'project_id': 'project'}) + backup = db.backup_create(self.ctx, {'status': 'available', + 'size': 1, + 'volume_id': volume['id'], + 'user_id': 'user', + 'project_id': 'project'}) - resp = self._issue_backup_reset(ctx, + resp = self._issue_backup_reset(self.ctx, backup, {'status': 'error'}) @@ -223,42 +224,42 @@ class AdminActionsTest(test.TestCase): self.assertEqual(403, resp.status_int) def test_backup_reset_status(self): - ctx = context.RequestContext('admin', 'fake', True) - volume = db.volume_create(ctx, {'status': 'available', 'host': 'test', - 'provider_location': '', 'size': 1}) - backup = db.backup_create(ctx, {'status': 'available', - 'volume_id': volume['id'], - 'user_id': 'user', - 'project_id': 'project'}) + volume = db.volume_create(self.ctx, + {'status': 'available', 'host': 'test', + 'provider_location': '', 'size': 1}) + backup = db.backup_create(self.ctx, {'status': 'available', + 'volume_id': volume['id'], + 'user_id': 'user', + 'project_id': 'project'}) - resp = self._issue_backup_reset(ctx, + resp = self._issue_backup_reset(self.ctx, backup, {'status': 'error'}) self.assertEqual(202, resp.status_int) def test_invalid_status_for_backup(self): - ctx = context.RequestContext('admin', 'fake', True) - volume = db.volume_create(ctx, {'status': 'available', 'host': 'test', - 'provider_location': '', 'size': 1}) - backup = db.backup_create(ctx, {'status': 'available', - 'volume_id': volume['id']}) - resp = self._issue_backup_reset(ctx, + volume = db.volume_create(self.ctx, + {'status': 'available', 'host': 'test', + 'provider_location': '', 'size': 1}) + backup = db.backup_create(self.ctx, {'status': 'available', + 'volume_id': volume['id']}) + resp = self._issue_backup_reset(self.ctx, backup, {'status': 'restoring'}) self.assertEqual(400, resp.status_int) def test_backup_reset_status_with_invalid_backup(self): - ctx = context.RequestContext('admin', 'fake', True) - volume = db.volume_create(ctx, {'status': 'available', 'host': 'test', - 'provider_location': '', 'size': 1}) - backup = db.backup_create(ctx, {'status': 'available', - 'volume_id': volume['id'], - 'user_id': 'user', - 'project_id': 'project'}) + volume = db.volume_create(self.ctx, + {'status': 'available', 'host': 'test', + 'provider_location': '', 'size': 1}) + backup = db.backup_create(self.ctx, {'status': 'available', + 'volume_id': volume['id'], + 'user_id': 'user', + 'project_id': 'project'}) backup['id'] = 'fake_id' - resp = self._issue_backup_reset(ctx, + resp = self._issue_backup_reset(self.ctx, backup, {'status': 'error'}) @@ -266,130 +267,126 @@ class AdminActionsTest(test.TestCase): self.assertEqual(404, resp.status_int) def test_malformed_reset_status_body(self): - ctx = context.RequestContext('admin', 'fake', True) - volume = db.volume_create(ctx, {'status': 'available', 'size': 1}) + volume = db.volume_create(self.ctx, {'status': 'available', 'size': 1}) - resp = self._issue_volume_reset(ctx, + resp = self._issue_volume_reset(self.ctx, volume, {'x-status': 'bad'}) self.assertEqual(400, resp.status_int) - volume = db.volume_get(ctx, volume['id']) + volume = db.volume_get(self.ctx, volume['id']) self.assertEqual('available', volume['status']) def test_invalid_status_for_volume(self): - ctx = context.RequestContext('admin', 'fake', True) - volume = db.volume_create(ctx, {'status': 'available', 'size': 1}) - resp = self._issue_volume_reset(ctx, + volume = db.volume_create(self.ctx, {'status': 'available', 'size': 1}) + resp = self._issue_volume_reset(self.ctx, volume, {'status': 'invalid'}) self.assertEqual(400, resp.status_int) - volume = db.volume_get(ctx, volume['id']) + volume = db.volume_get(self.ctx, volume['id']) self.assertEqual('available', volume['status']) def test_reset_status_for_missing_volume(self): - ctx = context.RequestContext('admin', 'fake', True) req = webob.Request.blank('/v2/fake/volumes/%s/action' % 'missing-volume-id') req.method = 'POST' req.headers['content-type'] = 'application/json' body = {'os-reset_status': {'status': 'available'}} req.body = jsonutils.dump_as_bytes(body) - req.environ['cinder.context'] = ctx + req.environ['cinder.context'] = self.ctx resp = req.get_response(app()) self.assertEqual(404, resp.status_int) - self.assertRaises(exception.NotFound, db.volume_get, ctx, + self.assertRaises(exception.NotFound, db.volume_get, self.ctx, 'missing-volume-id') def test_reset_attached_status(self): - ctx = context.RequestContext('admin', 'fake', True) - volume = db.volume_create(ctx, {'status': 'available', 'host': 'test', - 'provider_location': '', 'size': 1, - 'attach_status': 'attached'}) + volume = db.volume_create(self.ctx, + {'status': 'available', 'host': 'test', + 'provider_location': '', 'size': 1, + 'attach_status': 'attached'}) - resp = self._issue_volume_reset(ctx, + resp = self._issue_volume_reset(self.ctx, volume, {'status': 'available', 'attach_status': 'detached'}) self.assertEqual(202, resp.status_int) - volume = db.volume_get(ctx, volume['id']) + volume = db.volume_get(self.ctx, volume['id']) self.assertEqual('detached', volume['attach_status']) self.assertEqual('available', volume['status']) def test_invalid_reset_attached_status(self): - ctx = context.RequestContext('admin', 'fake', True) - volume = db.volume_create(ctx, {'status': 'available', 'host': 'test', - 'provider_location': '', 'size': 1, - 'attach_status': 'detached'}) - resp = self._issue_volume_reset(ctx, + volume = db.volume_create(self.ctx, + {'status': 'available', 'host': 'test', + 'provider_location': '', 'size': 1, + 'attach_status': 'detached'}) + resp = self._issue_volume_reset(self.ctx, volume, {'status': 'available', 'attach_status': 'invalid'}) self.assertEqual(400, resp.status_int) - volume = db.volume_get(ctx, volume['id']) + volume = db.volume_get(self.ctx, volume['id']) self.assertEqual('available', volume['status']) self.assertEqual('detached', volume['attach_status']) def test_snapshot_reset_status(self): - ctx = context.RequestContext('admin', 'fake', True) - volume = db.volume_create(ctx, {'status': 'available', 'host': 'test', - 'provider_location': '', 'size': 1, - 'availability_zone': 'test', - 'attach_status': 'detached'}) + volume = db.volume_create(self.ctx, + {'status': 'available', 'host': 'test', + 'provider_location': '', 'size': 1, + 'availability_zone': 'test', + 'attach_status': 'detached'}) kwargs = { 'volume_id': volume['id'], 'cgsnapshot_id': None, - 'user_id': ctx.user_id, - 'project_id': ctx.project_id, + 'user_id': self.ctx.user_id, + 'project_id': self.ctx.project_id, 'status': 'error_deleting', 'progress': '0%', 'volume_size': volume['size'], 'metadata': {} } - snapshot = objects.Snapshot(context=ctx, **kwargs) + snapshot = objects.Snapshot(context=self.ctx, **kwargs) snapshot.create() self.addCleanup(snapshot.destroy) - resp = self._issue_snapshot_reset(ctx, snapshot, {'status': 'error'}) + resp = self._issue_snapshot_reset(self.ctx, snapshot, + {'status': 'error'}) self.assertEqual(202, resp.status_int) - snapshot = objects.Snapshot.get_by_id(ctx, snapshot['id']) + snapshot = objects.Snapshot.get_by_id(self.ctx, snapshot['id']) self.assertEqual('error', snapshot.status) def test_invalid_status_for_snapshot(self): - ctx = context.RequestContext('admin', 'fake', True) - volume = db.volume_create(ctx, {'status': 'available', 'host': 'test', - 'provider_location': '', 'size': 1}) - snapshot = objects.Snapshot(ctx, status='available', + volume = db.volume_create(self.ctx, + {'status': 'available', 'host': 'test', + 'provider_location': '', 'size': 1}) + snapshot = objects.Snapshot(self.ctx, status='available', volume_id=volume['id']) snapshot.create() self.addCleanup(snapshot.destroy) - resp = self._issue_snapshot_reset(ctx, snapshot, + resp = self._issue_snapshot_reset(self.ctx, snapshot, {'status': 'attaching'}) self.assertEqual(400, resp.status_int) self.assertEqual('available', snapshot.status) def test_force_delete(self): - # admin context - ctx = context.RequestContext('admin', 'fake', True) # current status is creating - volume = self._create_volume(ctx, {'size': 1, 'host': None}) + volume = self._create_volume(self.ctx, {'size': 1, 'host': None}) req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id']) req.method = 'POST' req.headers['content-type'] = 'application/json' req.body = jsonutils.dump_as_bytes({'os-force_delete': {}}) # attach admin context to request - req.environ['cinder.context'] = ctx + req.environ['cinder.context'] = self.ctx resp = req.get_response(app()) # request is accepted self.assertEqual(202, resp.status_int) # volume is deleted - self.assertRaises(exception.NotFound, objects.Volume.get_by_id, ctx, - volume.id) + self.assertRaises(exception.NotFound, objects.Volume.get_by_id, + self.ctx, volume.id) @mock.patch.object(volume_api.API, 'delete_snapshot', return_value=True) @mock.patch('cinder.objects.Snapshot.get_by_id') @@ -397,10 +394,9 @@ class AdminActionsTest(test.TestCase): @mock.patch.object(db, 'volume_get') def test_force_delete_snapshot(self, volume_get, snapshot_get, get_by_id, delete_snapshot): - ctx = context.RequestContext('admin', 'fake', True) volume = stubs.stub_volume(1) snapshot = stubs.stub_snapshot(1) - snapshot_obj = fake_snapshot.fake_snapshot_obj(ctx, **snapshot) + snapshot_obj = fake_snapshot.fake_snapshot_obj(self.ctx, **snapshot) volume_get.return_value = volume snapshot_get.return_value = snapshot get_by_id.return_value = snapshot_obj @@ -411,397 +407,28 @@ class AdminActionsTest(test.TestCase): req.headers['content-type'] = 'application/json' req.body = jsonutils.dump_as_bytes({'os-force_delete': {}}) # attach admin context to request - req.environ['cinder.context'] = ctx + req.environ['cinder.context'] = self.ctx resp = req.get_response(app()) self.assertEqual(202, resp.status_int) - def test_force_detach_instance_attached_volume(self): - # admin context - ctx = context.RequestContext('admin', 'fake', True) - # current status is available - volume = self._create_volume(ctx, {'provider_location': '', - 'size': 1}) - connector = {'initiator': 'iqn.2012-07.org.fake:01'} - # start service to handle rpc messages for attach requests - svc = self.start_service('volume', host='test') - self.addCleanup(svc.stop) - self.volume_api.reserve_volume(ctx, volume) - mountpoint = '/dev/vbd' - attachment = self.volume_api.attach(ctx, volume, stubs.FAKE_UUID, - None, mountpoint, 'rw') - # volume is attached - volume = db.volume_get(ctx, volume['id']) - self.assertEqual('in-use', volume['status']) - self.assertEqual(stubs.FAKE_UUID, attachment['instance_uuid']) - self.assertEqual(mountpoint, attachment['mountpoint']) - self.assertEqual('attached', attachment['attach_status']) - admin_metadata = volume['volume_admin_metadata'] - self.assertEqual(2, len(admin_metadata)) - self.assertEqual('readonly', admin_metadata[0]['key']) - self.assertEqual('False', admin_metadata[0]['value']) - self.assertEqual('attached_mode', admin_metadata[1]['key']) - self.assertEqual('rw', admin_metadata[1]['value']) - conn_info = self.volume_api.initialize_connection(ctx, - volume, - connector) - self.assertEqual('rw', conn_info['data']['access_mode']) - # build request to force detach - req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id']) - req.method = 'POST' - req.headers['content-type'] = 'application/json' - # request status of 'error' - body = {'os-force_detach': {'attachment_id': attachment['id'], - 'connector': connector}} - req.body = jsonutils.dump_as_bytes(body) - # attach admin context to request - req.environ['cinder.context'] = ctx - # make request - resp = req.get_response(app()) - # request is accepted - self.assertEqual(202, resp.status_int) - volume = db.volume_get(ctx, volume['id']) - self.assertRaises(exception.VolumeAttachmentNotFound, - db.volume_attachment_get, - ctx, attachment['id']) - - # status changed to 'available' - self.assertEqual('available', volume['status']) - admin_metadata = volume['volume_admin_metadata'] - self.assertEqual(1, len(admin_metadata)) - self.assertEqual('readonly', admin_metadata[0]['key'], 'readonly') - self.assertEqual('False', admin_metadata[0]['value']) - - def test_force_detach_host_attached_volume(self): - # admin context - ctx = context.RequestContext('admin', 'fake', True) - # current status is available - volume = self._create_volume(ctx, {'provider_location': '', - 'size': 1}) - connector = {'initiator': 'iqn.2012-07.org.fake:01'} - # start service to handle rpc messages for attach requests - svc = self.start_service('volume', host='test') - self.addCleanup(svc.stop) - self.volume_api.initialize_connection(ctx, volume, connector) - mountpoint = '/dev/vbd' - host_name = 'fake-host' - attachment = self.volume_api.attach(ctx, volume, None, host_name, - mountpoint, 'ro') - # volume is attached - volume = db.volume_get(ctx, volume['id']) - self.assertEqual('in-use', volume['status']) - self.assertIsNone(attachment['instance_uuid']) - self.assertEqual(host_name, attachment['attached_host']) - self.assertEqual(mountpoint, attachment['mountpoint']) - self.assertEqual('attached', attachment['attach_status']) - admin_metadata = volume['volume_admin_metadata'] - self.assertEqual(2, len(admin_metadata)) - self.assertEqual('readonly', admin_metadata[0]['key']) - self.assertEqual('False', admin_metadata[0]['value']) - self.assertEqual('attached_mode', admin_metadata[1]['key']) - self.assertEqual('ro', admin_metadata[1]['value']) - conn_info = self.volume_api.initialize_connection(ctx, - volume, connector) - self.assertEqual('ro', conn_info['data']['access_mode']) - # build request to force detach - req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id']) - req.method = 'POST' - req.headers['content-type'] = 'application/json' - # request status of 'error' - body = {'os-force_detach': {'attachment_id': attachment['id'], - 'connector': connector}} - req.body = jsonutils.dump_as_bytes(body) - # attach admin context to request - req.environ['cinder.context'] = ctx - # make request - resp = req.get_response(app()) - # request is accepted - self.assertEqual(202, resp.status_int) - volume = db.volume_get(ctx, volume['id']) - self.assertRaises(exception.VolumeAttachmentNotFound, - db.volume_attachment_get, - ctx, attachment['id']) - # status changed to 'available' - self.assertEqual('available', volume['status']) - admin_metadata = volume['volume_admin_metadata'] - self.assertEqual(1, len(admin_metadata)) - self.assertEqual('readonly', admin_metadata[0]['key']) - self.assertEqual('False', admin_metadata[0]['value']) - - def test_volume_force_detach_raises_remote_error(self): - # admin context - ctx = context.RequestContext('admin', 'fake', True) - # current status is available - volume = self._create_volume(ctx, {'provider_location': '', - 'size': 1}) - connector = {'initiator': 'iqn.2012-07.org.fake:01'} - # start service to handle rpc messages for attach requests - svc = self.start_service('volume', host='test') - self.addCleanup(svc.stop) - self.volume_api.reserve_volume(ctx, volume) - mountpoint = '/dev/vbd' - attachment = self.volume_api.attach(ctx, volume, stubs.FAKE_UUID, - None, mountpoint, 'rw') - # volume is attached - volume = db.volume_get(ctx, volume['id']) - self.assertEqual('in-use', volume['status']) - self.assertEqual(stubs.FAKE_UUID, attachment['instance_uuid']) - self.assertEqual(mountpoint, attachment['mountpoint']) - self.assertEqual('attached', attachment['attach_status']) - admin_metadata = volume['volume_admin_metadata'] - self.assertEqual(2, len(admin_metadata)) - self.assertEqual('readonly', admin_metadata[0]['key']) - self.assertEqual('False', admin_metadata[0]['value']) - self.assertEqual('attached_mode', admin_metadata[1]['key']) - self.assertEqual('rw', admin_metadata[1]['value']) - conn_info = self.volume_api.initialize_connection(ctx, - volume, - connector) - self.assertEqual('rw', conn_info['data']['access_mode']) - # build request to force detach - volume_remote_error = \ - messaging.RemoteError(exc_type='VolumeAttachmentNotFound') - with mock.patch.object(volume_api.API, 'detach', - side_effect=volume_remote_error): - req = webob.Request.blank('/v2/fake/volumes/%s/action' % - volume['id']) - req.method = 'POST' - req.headers['content-type'] = 'application/json' - body = {'os-force_detach': {'attachment_id': 'fake'}} - req.body = jsonutils.dump_as_bytes(body) - # attach admin context to request - req.environ['cinder.context'] = ctx - # make request - resp = req.get_response(app()) - self.assertEqual(400, resp.status_int) - - # test for KeyError when missing connector - volume_remote_error = ( - messaging.RemoteError(exc_type='KeyError')) - with mock.patch.object(volume_api.API, 'detach', - side_effect=volume_remote_error): - req = webob.Request.blank('/v2/fake/volumes/%s/action' % - volume['id']) - req.method = 'POST' - req.headers['content-type'] = 'application/json' - body = {'os-force_detach': {'attachment_id': 'fake'}} - req.body = jsonutils.dump_as_bytes(body) - # attach admin context to request - req.environ['cinder.context'] = ctx - # make request - self.assertRaises(messaging.RemoteError, - req.get_response, - app()) - - # test for VolumeBackendAPIException - volume_remote_error = ( - messaging.RemoteError(exc_type='VolumeBackendAPIException')) - with mock.patch.object(volume_api.API, 'detach', - side_effect=volume_remote_error): - req = webob.Request.blank('/v2/fake/volumes/%s/action' % - volume['id']) - req.method = 'POST' - req.headers['content-type'] = 'application/json' - body = {'os-force_detach': {'attachment_id': 'fake', - 'connector': connector}} - req.body = jsonutils.dump_as_bytes(body) - - # attach admin context to request - req.environ['cinder.context'] = ctx - # make request - self.assertRaises(messaging.RemoteError, - req.get_response, - app()) - - def test_volume_force_detach_raises_db_error(self): - # In case of DB error 500 error code is returned to user - # admin context - ctx = context.RequestContext('admin', 'fake', True) - # current status is available - volume = self._create_volume(ctx, {'provider_location': '', - 'size': 1}) - connector = {'initiator': 'iqn.2012-07.org.fake:01'} - # start service to handle rpc messages for attach requests - svc = self.start_service('volume', host='test') - self.addCleanup(svc.stop) - self.volume_api.reserve_volume(ctx, volume) - mountpoint = '/dev/vbd' - attachment = self.volume_api.attach(ctx, volume, stubs.FAKE_UUID, - None, mountpoint, 'rw') - # volume is attached - volume = db.volume_get(ctx, volume['id']) - self.assertEqual('in-use', volume['status']) - self.assertEqual(stubs.FAKE_UUID, attachment['instance_uuid']) - self.assertEqual(mountpoint, attachment['mountpoint']) - self.assertEqual('attached', attachment['attach_status']) - admin_metadata = volume['volume_admin_metadata'] - self.assertEqual(2, len(admin_metadata)) - self.assertEqual('readonly', admin_metadata[0]['key']) - self.assertEqual('False', admin_metadata[0]['value']) - self.assertEqual('attached_mode', admin_metadata[1]['key']) - self.assertEqual('rw', admin_metadata[1]['value']) - conn_info = self.volume_api.initialize_connection(ctx, - volume, - connector) - self.assertEqual('rw', conn_info['data']['access_mode']) - # build request to force detach - volume_remote_error = \ - messaging.RemoteError(exc_type='DBError') - with mock.patch.object(volume_api.API, 'detach', - side_effect=volume_remote_error): - req = webob.Request.blank('/v2/fake/volumes/%s/action' % - volume['id']) - req.method = 'POST' - req.headers['content-type'] = 'application/json' - body = {'os-force_detach': {'attachment_id': 'fake', - 'connector': connector}} - req.body = jsonutils.dump_as_bytes(body) - # attach admin context to request - req.environ['cinder.context'] = ctx - # make request - self.assertRaises(messaging.RemoteError, - req.get_response, - app()) - - def test_attach_in_used_volume_by_instance(self): - """Test that attaching to an in-use volume fails.""" - # admin context - ctx = context.RequestContext('admin', 'fake', True) - # current status is available - volume = self._create_volume(ctx, {'provider_location': '', - 'size': 1}) - connector = {'initiator': 'iqn.2012-07.org.fake:01'} - # start service to handle rpc messages for attach requests - svc = self.start_service('volume', host='test') - self.addCleanup(svc.stop) - self.volume_api.reserve_volume(ctx, volume) - conn_info = self.volume_api.initialize_connection(ctx, - volume, connector) - self.volume_api.attach(ctx, volume, fakes.get_fake_uuid(), None, - '/dev/vbd0', 'rw') - self.assertEqual('rw', conn_info['data']['access_mode']) - self.assertRaises(exception.InvalidVolume, - self.volume_api.attach, - ctx, - volume, - fakes.get_fake_uuid(), - None, - '/dev/vdb1', - 'ro') - - def test_attach_in_used_volume_by_host(self): - """Test that attaching to an in-use volume fails.""" - # admin context - ctx = context.RequestContext('admin', 'fake', True) - # current status is available - volume = self._create_volume(ctx, {'provider_location': '', - 'size': 1}) - connector = {'initiator': 'iqn.2012-07.org.fake:01'} - # start service to handle rpc messages for attach requests - svc = self.start_service('volume', host='test') - self.addCleanup(svc.stop) - self.volume_api.reserve_volume(ctx, volume) - self.volume_api.initialize_connection(ctx, volume, connector) - self.volume_api.attach(ctx, volume, None, 'fake_host1', - '/dev/vbd0', 'rw') - conn_info = self.volume_api.initialize_connection(ctx, - volume, connector) - conn_info['data']['access_mode'] = 'rw' - self.assertRaises(exception.InvalidVolume, - self.volume_api.attach, - ctx, - volume, - None, - 'fake_host2', - '/dev/vbd1', - 'ro') - - def test_invalid_iscsi_connector(self): - """Test connector without the initiator (required by iscsi driver).""" - # admin context - ctx = context.RequestContext('admin', 'fake', True) - # current status is available - volume = self._create_volume(ctx, {'provider_location': '', - 'size': 1}) - connector = {} - # start service to handle rpc messages for attach requests - svc = self.start_service('volume', host='test') - self.addCleanup(svc.stop) - self.assertRaises(exception.InvalidInput, - self.volume_api.initialize_connection, - ctx, volume, connector) - - def test_attach_attaching_volume_with_different_instance(self): - """Test that attaching volume reserved for another instance fails.""" - ctx = context.RequestContext('admin', 'fake', True) - # current status is available - volume = self._create_volume(ctx, {'provider_location': '', - 'size': 1}) - # start service to handle rpc messages for attach requests - svc = self.start_service('volume', host='test') - self.addCleanup(svc.stop) - self.volume_api.reserve_volume(ctx, volume) - values = {'volume_id': volume['id'], - 'attach_status': 'attaching', - 'attach_time': timeutils.utcnow(), - 'instance_uuid': 'abc123', - } - db.volume_attach(ctx, values) - db.volume_admin_metadata_update(ctx, volume['id'], - {"attached_mode": 'rw'}, False) - mountpoint = '/dev/vbd' - attachment = self.volume_api.attach(ctx, volume, - stubs.FAKE_UUID, None, - mountpoint, 'rw') - - self.assertEqual(stubs.FAKE_UUID, attachment['instance_uuid']) - self.assertEqual(volume['id'], attachment['volume_id'], volume['id']) - self.assertEqual('attached', attachment['attach_status']) - - def test_attach_attaching_volume_with_different_mode(self): - """Test that attaching volume reserved for another mode fails.""" - # admin context - ctx = context.RequestContext('admin', 'fake', True) - # current status is available - volume = self._create_volume(ctx, {'provider_location': '', - 'size': 1}) - # start service to handle rpc messages for attach requests - svc = self.start_service('volume', host='test') - self.addCleanup(svc.stop) - values = {'status': 'attaching', - 'instance_uuid': fakes.get_fake_uuid()} - db.volume_update(ctx, volume['id'], values) - db.volume_admin_metadata_update(ctx, volume['id'], - {"attached_mode": 'rw'}, False) - mountpoint = '/dev/vbd' - self.assertRaises(exception.InvalidVolume, - self.volume_api.attach, - ctx, - volume, - values['instance_uuid'], - None, - mountpoint, - 'ro') - def _migrate_volume_prep(self): - admin_ctx = context.get_admin_context() # create volume's current host and the destination host - db.service_create(admin_ctx, + db.service_create(self.ctx, {'host': 'test', 'topic': CONF.volume_topic, 'created_at': timeutils.utcnow()}) - db.service_create(admin_ctx, + db.service_create(self.ctx, {'host': 'test2', 'topic': CONF.volume_topic, 'created_at': timeutils.utcnow()}) # current status is available - volume = self._create_volume(admin_ctx) + volume = self._create_volume(self.ctx) return volume def _migrate_volume_exec(self, ctx, volume, host, expected_status, force_host_copy=False): - admin_ctx = context.get_admin_context() # build request to migrate to host - req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume.id) + req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id']) req.method = 'POST' req.headers['content-type'] = 'application/json' body = {'os-migrate_volume': {'host': host, @@ -811,27 +438,28 @@ class AdminActionsTest(test.TestCase): resp = req.get_response(app()) # verify status self.assertEqual(expected_status, resp.status_int) - volume = objects.Volume.get_by_id(admin_ctx, volume.id) + volume = db.volume_get(self.ctx, volume['id']) return volume def test_migrate_volume_success(self): expected_status = 202 host = 'test2' - ctx = context.RequestContext('admin', 'fake', True) volume = self._migrate_volume_prep() - volume = self._migrate_volume_exec(ctx, volume, host, expected_status) + volume = self._migrate_volume_exec(self.ctx, volume, host, + expected_status) self.assertEqual('starting', volume['migration_status']) def test_migrate_volume_fail_replication(self): expected_status = 400 host = 'test2' - ctx = context.RequestContext('admin', 'fake', True) volume = self._migrate_volume_prep() # current status is available - volume = self._create_volume(ctx, {'provider_location': '', - 'attach_status': '', - 'replication_status': 'active'}) - volume = self._migrate_volume_exec(ctx, volume, host, expected_status) + volume = self._create_volume(self.ctx, + {'provider_location': '', + 'attach_status': '', + 'replication_status': 'active'}) + volume = self._migrate_volume_exec(self.ctx, volume, host, + expected_status) def test_migrate_volume_as_non_admin(self): expected_status = 403 @@ -843,7 +471,6 @@ class AdminActionsTest(test.TestCase): def test_migrate_volume_without_host_parameter(self): expected_status = 400 host = 'test3' - ctx = context.RequestContext('admin', 'fake', True) volume = self._migrate_volume_prep() # build request to migrate without host req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id']) @@ -852,7 +479,7 @@ class AdminActionsTest(test.TestCase): body = {'os-migrate_volume': {'host': host, 'force_host_copy': False}} req.body = jsonutils.dump_as_bytes(body) - req.environ['cinder.context'] = ctx + req.environ['cinder.context'] = self.ctx resp = req.get_response(app()) # verify status self.assertEqual(expected_status, resp.status_int) @@ -860,42 +487,37 @@ class AdminActionsTest(test.TestCase): def test_migrate_volume_host_no_exist(self): expected_status = 400 host = 'test3' - ctx = context.RequestContext('admin', 'fake', True) volume = self._migrate_volume_prep() - self._migrate_volume_exec(ctx, volume, host, expected_status) + self._migrate_volume_exec(self.ctx, volume, host, expected_status) def test_migrate_volume_same_host(self): expected_status = 400 host = 'test' - ctx = context.RequestContext('admin', 'fake', True) volume = self._migrate_volume_prep() - self._migrate_volume_exec(ctx, volume, host, expected_status) + self._migrate_volume_exec(self.ctx, volume, host, expected_status) def test_migrate_volume_migrating(self): expected_status = 400 host = 'test2' - ctx = context.RequestContext('admin', 'fake', True) volume = self._migrate_volume_prep() model_update = {'migration_status': 'migrating'} - volume = db.volume_update(ctx, volume['id'], model_update) - self._migrate_volume_exec(ctx, volume, host, expected_status) + volume = db.volume_update(self.ctx, volume['id'], model_update) + self._migrate_volume_exec(self.ctx, volume, host, expected_status) def test_migrate_volume_with_snap(self): expected_status = 400 host = 'test2' - ctx = context.RequestContext('admin', 'fake', True) volume = self._migrate_volume_prep() - snap = objects.Snapshot(ctx, volume_id=volume['id']) + snap = objects.Snapshot(self.ctx, volume_id=volume['id']) snap.create() self.addCleanup(snap.destroy) - self._migrate_volume_exec(ctx, volume, host, expected_status) + self._migrate_volume_exec(self.ctx, volume, host, expected_status) def test_migrate_volume_bad_force_host_copy(self): expected_status = 400 host = 'test2' - ctx = context.RequestContext('admin', 'fake', True) volume = self._migrate_volume_prep() - self._migrate_volume_exec(ctx, volume, host, expected_status, + self._migrate_volume_exec(self.ctx, volume, host, expected_status, force_host_copy='foo') def _migrate_volume_comp_exec(self, ctx, volume, new_volume, error, @@ -920,9 +542,8 @@ class AdminActionsTest(test.TestCase): self.assertNotIn('save_volume_id', resp_dict) def test_migrate_volume_comp_as_non_admin(self): - admin_ctx = context.get_admin_context() - volume = db.volume_create(admin_ctx, {'id': 'fake1'}) - new_volume = db.volume_create(admin_ctx, {'id': 'fake2'}) + volume = db.volume_create(self.ctx, {'id': 'fake1'}) + new_volume = db.volume_create(self.ctx, {'id': 'fake2'}) expected_status = 403 expected_id = None ctx = context.RequestContext('fake', 'fake') @@ -930,34 +551,29 @@ class AdminActionsTest(test.TestCase): expected_status, expected_id) def test_migrate_volume_comp_no_mig_status(self): - admin_ctx = context.get_admin_context() - volume1 = self._create_volume(admin_ctx, {'migration_status': 'foo'}) - volume2 = self._create_volume(admin_ctx, {'migration_status': None}) + volume1 = self._create_volume(self.ctx, {'migration_status': 'foo'}) + volume2 = self._create_volume(self.ctx, {'migration_status': None}) expected_status = 400 expected_id = None - ctx = context.RequestContext('admin', 'fake', True) - self._migrate_volume_comp_exec(ctx, volume1, volume2, False, + self._migrate_volume_comp_exec(self.ctx, volume1, volume2, False, expected_status, expected_id) - self._migrate_volume_comp_exec(ctx, volume2, volume1, False, + self._migrate_volume_comp_exec(self.ctx, volume2, volume1, False, expected_status, expected_id) def test_migrate_volume_comp_bad_mig_status(self): - admin_ctx = context.get_admin_context() - volume1 = self._create_volume(admin_ctx, + volume1 = self._create_volume(self.ctx, {'migration_status': 'migrating'}) - volume2 = self._create_volume(admin_ctx, + volume2 = self._create_volume(self.ctx, {'migration_status': 'target:foo'}) expected_status = 400 expected_id = None - ctx = context.RequestContext('admin', 'fake', True) - self._migrate_volume_comp_exec(ctx, volume1, volume2, False, + self._migrate_volume_comp_exec(self.ctx, volume1, volume2, False, expected_status, expected_id) def test_migrate_volume_comp_no_action(self): - admin_ctx = context.get_admin_context() - volume = db.volume_create(admin_ctx, {'id': 'fake1'}) - new_volume = db.volume_create(admin_ctx, {'id': 'fake2'}) + volume = db.volume_create(self.ctx, {'id': 'fake1'}) + new_volume = db.volume_create(self.ctx, {'id': 'fake2'}) expected_status = 400 expected_id = None ctx = context.RequestContext('fake', 'fake') @@ -965,17 +581,15 @@ class AdminActionsTest(test.TestCase): expected_status, expected_id, True) def test_migrate_volume_comp_from_nova(self): - admin_ctx = context.get_admin_context() - volume = self._create_volume(admin_ctx, {'status': 'in-use', - 'migration_status': None, - 'attach_status': 'attached'}) - new_volume = self._create_volume(admin_ctx, + volume = self._create_volume(self.ctx, {'status': 'in-use', + 'migration_status': None, + 'attach_status': 'attached'}) + new_volume = self._create_volume(self.ctx, {'migration_status': None, 'attach_status': 'detached'}) expected_status = 200 expected_id = new_volume.id - ctx = context.RequestContext('admin', 'fake', True) - self._migrate_volume_comp_exec(ctx, volume, new_volume, False, + self._migrate_volume_comp_exec(self.ctx, volume, new_volume, False, expected_status, expected_id) def test_backup_reset_valid_updates(self): @@ -997,7 +611,6 @@ class AdminActionsTest(test.TestCase): {'availability_zone': "az1", 'host': 'testhost', 'disabled': 0, 'updated_at': timeutils.utcnow()}] # admin context - ctx = context.RequestContext('admin', 'fake', True) mock_check_support.return_value = True # current status is dependent on argument: test_status. id = test_backups.BackupsAPITestCase._create_backup(status=test_status) @@ -1005,14 +618,14 @@ class AdminActionsTest(test.TestCase): req.method = 'POST' req.headers['Content-Type'] = 'application/json' req.body = jsonutils.dump_as_bytes({'os-force_delete': {}}) - req.environ['cinder.context'] = ctx + req.environ['cinder.context'] = self.ctx res = req.get_response(app()) self.assertEqual(202, res.status_int) self.assertEqual('deleting', test_backups.BackupsAPITestCase. _get_backup_attrib(id, 'status')) - db.backup_destroy(context.get_admin_context(), id) + db.backup_destroy(self.ctx, id) def test_delete_backup_force_when_creating(self): self._force_delete_backup_util('creating') @@ -1036,13 +649,353 @@ class AdminActionsTest(test.TestCase): return_value=False) def test_delete_backup_force_when_not_supported(self, mock_check_support): # admin context - ctx = context.RequestContext('admin', 'fake', True) self.override_config('backup_driver', 'cinder.backup.drivers.ceph') id = test_backups.BackupsAPITestCase._create_backup() req = webob.Request.blank('/v2/fake/backups/%s/action' % id) req.method = 'POST' req.headers['Content-Type'] = 'application/json' req.body = jsonutils.dump_as_bytes({'os-force_delete': {}}) - req.environ['cinder.context'] = ctx + req.environ['cinder.context'] = self.ctx res = req.get_response(app()) self.assertEqual(405, res.status_int) + + +class AdminActionsAttachDetachTest(BaseAdminTest): + def setUp(self): + super(AdminActionsAttachDetachTest, self).setUp() + # start service to handle rpc messages for attach requests + self.svc = self.start_service('volume', host='test') + + def tearDown(self): + self.svc.stop() + super(AdminActionsAttachDetachTest, self).tearDown() + + def test_force_detach_instance_attached_volume(self): + # current status is available + volume = self._create_volume(self.ctx, {'provider_location': '', + 'size': 1}) + connector = {'initiator': 'iqn.2012-07.org.fake:01'} + + self.volume_api.reserve_volume(self.ctx, volume) + mountpoint = '/dev/vbd' + attachment = self.volume_api.attach(self.ctx, volume, stubs.FAKE_UUID, + None, mountpoint, 'rw') + # volume is attached + volume = db.volume_get(self.ctx, volume['id']) + self.assertEqual('in-use', volume['status']) + self.assertEqual(stubs.FAKE_UUID, attachment['instance_uuid']) + self.assertEqual(mountpoint, attachment['mountpoint']) + self.assertEqual('attached', attachment['attach_status']) + admin_metadata = volume['volume_admin_metadata'] + self.assertEqual(2, len(admin_metadata)) + self.assertEqual('readonly', admin_metadata[0]['key']) + self.assertEqual('False', admin_metadata[0]['value']) + self.assertEqual('attached_mode', admin_metadata[1]['key']) + self.assertEqual('rw', admin_metadata[1]['value']) + conn_info = self.volume_api.initialize_connection(self.ctx, + volume, + connector) + self.assertEqual('rw', conn_info['data']['access_mode']) + # build request to force detach + req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id']) + req.method = 'POST' + req.headers['content-type'] = 'application/json' + # request status of 'error' + body = {'os-force_detach': {'attachment_id': attachment['id'], + 'connector': connector}} + req.body = jsonutils.dump_as_bytes(body) + # attach admin context to request + req.environ['cinder.context'] = self.ctx + # make request + resp = req.get_response(app()) + # request is accepted + self.assertEqual(202, resp.status_int) + volume = db.volume_get(self.ctx, volume['id']) + self.assertRaises(exception.VolumeAttachmentNotFound, + db.volume_attachment_get, + self.ctx, attachment['id']) + + # status changed to 'available' + self.assertEqual('available', volume['status']) + admin_metadata = volume['volume_admin_metadata'] + self.assertEqual(1, len(admin_metadata)) + self.assertEqual('readonly', admin_metadata[0]['key'], 'readonly') + self.assertEqual('False', admin_metadata[0]['value']) + + def test_force_detach_host_attached_volume(self): + # current status is available + volume = self._create_volume(self.ctx, {'provider_location': '', + 'size': 1}) + connector = {'initiator': 'iqn.2012-07.org.fake:01'} + + self.volume_api.initialize_connection(self.ctx, volume, connector) + mountpoint = '/dev/vbd' + host_name = 'fake-host' + attachment = self.volume_api.attach(self.ctx, volume, None, host_name, + mountpoint, 'ro') + # volume is attached + volume = db.volume_get(self.ctx, volume['id']) + self.assertEqual('in-use', volume['status']) + self.assertIsNone(attachment['instance_uuid']) + self.assertEqual(host_name, attachment['attached_host']) + self.assertEqual(mountpoint, attachment['mountpoint']) + self.assertEqual('attached', attachment['attach_status']) + admin_metadata = volume['volume_admin_metadata'] + self.assertEqual(2, len(admin_metadata)) + self.assertEqual('readonly', admin_metadata[0]['key']) + self.assertEqual('False', admin_metadata[0]['value']) + self.assertEqual('attached_mode', admin_metadata[1]['key']) + self.assertEqual('ro', admin_metadata[1]['value']) + conn_info = self.volume_api.initialize_connection(self.ctx, + volume, connector) + self.assertEqual('ro', conn_info['data']['access_mode']) + # build request to force detach + req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id']) + req.method = 'POST' + req.headers['content-type'] = 'application/json' + # request status of 'error' + body = {'os-force_detach': {'attachment_id': attachment['id'], + 'connector': connector}} + req.body = jsonutils.dump_as_bytes(body) + # attach admin context to request + req.environ['cinder.context'] = self.ctx + # make request + resp = req.get_response(app()) + # request is accepted + self.assertEqual(202, resp.status_int) + volume = db.volume_get(self.ctx, volume['id']) + self.assertRaises(exception.VolumeAttachmentNotFound, + db.volume_attachment_get, + self.ctx, attachment['id']) + # status changed to 'available' + self.assertEqual('available', volume['status']) + admin_metadata = volume['volume_admin_metadata'] + self.assertEqual(1, len(admin_metadata)) + self.assertEqual('readonly', admin_metadata[0]['key']) + self.assertEqual('False', admin_metadata[0]['value']) + + def test_volume_force_detach_raises_remote_error(self): + # current status is available + volume = self._create_volume(self.ctx, {'provider_location': '', + 'size': 1}) + connector = {'initiator': 'iqn.2012-07.org.fake:01'} + + self.volume_api.reserve_volume(self.ctx, volume) + mountpoint = '/dev/vbd' + attachment = self.volume_api.attach(self.ctx, volume, stubs.FAKE_UUID, + None, mountpoint, 'rw') + # volume is attached + volume = db.volume_get(self.ctx, volume['id']) + self.assertEqual('in-use', volume['status']) + self.assertEqual(stubs.FAKE_UUID, attachment['instance_uuid']) + self.assertEqual(mountpoint, attachment['mountpoint']) + self.assertEqual('attached', attachment['attach_status']) + admin_metadata = volume['volume_admin_metadata'] + self.assertEqual(2, len(admin_metadata)) + self.assertEqual('readonly', admin_metadata[0]['key']) + self.assertEqual('False', admin_metadata[0]['value']) + self.assertEqual('attached_mode', admin_metadata[1]['key']) + self.assertEqual('rw', admin_metadata[1]['value']) + conn_info = self.volume_api.initialize_connection(self.ctx, + volume, + connector) + self.assertEqual('rw', conn_info['data']['access_mode']) + # build request to force detach + volume_remote_error = \ + messaging.RemoteError(exc_type='VolumeAttachmentNotFound') + with mock.patch.object(volume_api.API, 'detach', + side_effect=volume_remote_error): + req = webob.Request.blank('/v2/fake/volumes/%s/action' % + volume['id']) + req.method = 'POST' + req.headers['content-type'] = 'application/json' + body = {'os-force_detach': {'attachment_id': 'fake'}} + req.body = jsonutils.dump_as_bytes(body) + # attach admin context to request + req.environ['cinder.context'] = self.ctx + # make request + resp = req.get_response(app()) + self.assertEqual(400, resp.status_int) + + # test for KeyError when missing connector + volume_remote_error = ( + messaging.RemoteError(exc_type='KeyError')) + with mock.patch.object(volume_api.API, 'detach', + side_effect=volume_remote_error): + req = webob.Request.blank('/v2/fake/volumes/%s/action' % + volume['id']) + req.method = 'POST' + req.headers['content-type'] = 'application/json' + body = {'os-force_detach': {'attachment_id': 'fake'}} + req.body = jsonutils.dump_as_bytes(body) + # attach admin context to request + req.environ['cinder.context'] = self.ctx + # make request + self.assertRaises(messaging.RemoteError, + req.get_response, + app()) + + # test for VolumeBackendAPIException + volume_remote_error = ( + messaging.RemoteError(exc_type='VolumeBackendAPIException')) + with mock.patch.object(volume_api.API, 'detach', + side_effect=volume_remote_error): + req = webob.Request.blank('/v2/fake/volumes/%s/action' % + volume['id']) + req.method = 'POST' + req.headers['content-type'] = 'application/json' + body = {'os-force_detach': {'attachment_id': 'fake', + 'connector': connector}} + req.body = jsonutils.dump_as_bytes(body) + + # attach admin context to request + req.environ['cinder.context'] = self.ctx + # make request + self.assertRaises(messaging.RemoteError, + req.get_response, + app()) + + def test_volume_force_detach_raises_db_error(self): + # In case of DB error 500 error code is returned to user + # current status is available + volume = self._create_volume(self.ctx, {'provider_location': '', + 'size': 1}) + connector = {'initiator': 'iqn.2012-07.org.fake:01'} + + self.volume_api.reserve_volume(self.ctx, volume) + mountpoint = '/dev/vbd' + attachment = self.volume_api.attach(self.ctx, volume, stubs.FAKE_UUID, + None, mountpoint, 'rw') + # volume is attached + volume = db.volume_get(self.ctx, volume['id']) + self.assertEqual('in-use', volume['status']) + self.assertEqual(stubs.FAKE_UUID, attachment['instance_uuid']) + self.assertEqual(mountpoint, attachment['mountpoint']) + self.assertEqual('attached', attachment['attach_status']) + admin_metadata = volume['volume_admin_metadata'] + self.assertEqual(2, len(admin_metadata)) + self.assertEqual('readonly', admin_metadata[0]['key']) + self.assertEqual('False', admin_metadata[0]['value']) + self.assertEqual('attached_mode', admin_metadata[1]['key']) + self.assertEqual('rw', admin_metadata[1]['value']) + conn_info = self.volume_api.initialize_connection(self.ctx, + volume, + connector) + self.assertEqual('rw', conn_info['data']['access_mode']) + # build request to force detach + volume_remote_error = messaging.RemoteError(exc_type='DBError') + with mock.patch.object(volume_api.API, 'detach', + side_effect=volume_remote_error): + req = webob.Request.blank('/v2/fake/volumes/%s/action' % + volume['id']) + req.method = 'POST' + req.headers['content-type'] = 'application/json' + body = {'os-force_detach': {'attachment_id': 'fake', + 'connector': connector}} + req.body = jsonutils.dump_as_bytes(body) + # attach admin context to request + req.environ['cinder.context'] = self.ctx + # make request + self.assertRaises(messaging.RemoteError, + req.get_response, + app()) + + def test_attach_in_used_volume_by_instance(self): + """Test that attaching to an in-use volume fails.""" + # current status is available + volume = self._create_volume(self.ctx, {'provider_location': '', + 'size': 1}) + connector = {'initiator': 'iqn.2012-07.org.fake:01'} + self.volume_api.reserve_volume(self.ctx, volume) + conn_info = self.volume_api.initialize_connection(self.ctx, + volume, connector) + self.volume_api.attach(self.ctx, volume, fakes.get_fake_uuid(), None, + '/dev/vbd0', 'rw') + self.assertEqual('rw', conn_info['data']['access_mode']) + self.assertRaises(exception.InvalidVolume, + self.volume_api.attach, + self.ctx, + volume, + fakes.get_fake_uuid(), + None, + '/dev/vdb1', + 'ro') + + def test_attach_in_used_volume_by_host(self): + """Test that attaching to an in-use volume fails.""" + # current status is available + volume = self._create_volume(self.ctx, {'provider_location': '', + 'size': 1}) + connector = {'initiator': 'iqn.2012-07.org.fake:01'} + + self.volume_api.reserve_volume(self.ctx, volume) + self.volume_api.initialize_connection(self.ctx, volume, connector) + self.volume_api.attach(self.ctx, volume, None, 'fake_host1', + '/dev/vbd0', 'rw') + conn_info = self.volume_api.initialize_connection(self.ctx, + volume, connector) + conn_info['data']['access_mode'] = 'rw' + self.assertRaises(exception.InvalidVolume, + self.volume_api.attach, + self.ctx, + volume, + None, + 'fake_host2', + '/dev/vbd1', + 'ro') + + def test_invalid_iscsi_connector(self): + """Test connector without the initiator (required by iscsi driver).""" + # current status is available + volume = self._create_volume(self.ctx, {'provider_location': '', + 'size': 1}) + connector = {} + + self.assertRaises(exception.InvalidInput, + self.volume_api.initialize_connection, + self.ctx, volume, connector) + + def test_attach_attaching_volume_with_different_instance(self): + """Test that attaching volume reserved for another instance fails.""" + # current status is available + volume = self._create_volume(self.ctx, {'provider_location': '', + 'size': 1}) + + self.volume_api.reserve_volume(self.ctx, volume) + values = {'volume_id': volume['id'], + 'attach_status': 'attaching', + 'attach_time': timeutils.utcnow(), + 'instance_uuid': 'abc123', + } + db.volume_attach(self.ctx, values) + db.volume_admin_metadata_update(self.ctx, volume['id'], + {"attached_mode": 'rw'}, False) + mountpoint = '/dev/vbd' + attachment = self.volume_api.attach(self.ctx, volume, + stubs.FAKE_UUID, None, + mountpoint, 'rw') + + self.assertEqual(stubs.FAKE_UUID, attachment['instance_uuid']) + self.assertEqual(volume['id'], attachment['volume_id'], volume['id']) + self.assertEqual('attached', attachment['attach_status']) + + def test_attach_attaching_volume_with_different_mode(self): + """Test that attaching volume reserved for another mode fails.""" + # current status is available + volume = self._create_volume(self.ctx, {'provider_location': '', + 'size': 1}) + + values = {'status': 'attaching', + 'instance_uuid': fakes.get_fake_uuid()} + db.volume_update(self.ctx, volume['id'], values) + db.volume_admin_metadata_update(self.ctx, volume['id'], + {"attached_mode": 'rw'}, False) + mountpoint = '/dev/vbd' + self.assertRaises(exception.InvalidVolume, + self.volume_api.attach, + self.ctx, + volume, + values['instance_uuid'], + None, + mountpoint, + 'ro')