Merge "Schedule request to scheduler when manage existing snapshot"
This commit is contained in:
commit
e75d209177
@ -211,7 +211,8 @@ class SchedulerManager(manager.CleanableManager, manager.Manager):
|
||||
{'size': request_spec['volume_properties']['size']})
|
||||
except exception.NoValidBackend as ex:
|
||||
self._set_snapshot_state_and_notify('create_snapshot',
|
||||
snapshot, 'error',
|
||||
snapshot,
|
||||
fields.SnapshotStatus.ERROR,
|
||||
ctxt, ex, request_spec)
|
||||
else:
|
||||
volume_rpcapi.VolumeAPI().create_snapshot(ctxt, volume,
|
||||
@ -351,6 +352,28 @@ class SchedulerManager(manager.CleanableManager, manager.Manager):
|
||||
volume_rpcapi.VolumeAPI().manage_existing(context, volume,
|
||||
request_spec.get('ref'))
|
||||
|
||||
def manage_existing_snapshot(self, context, volume, snapshot, ref,
|
||||
request_spec, filter_properties=None):
|
||||
"""Ensure that the host exists and can accept the snapshot."""
|
||||
|
||||
self._wait_for_scheduler()
|
||||
|
||||
try:
|
||||
backend = self.driver.backend_passes_filters(
|
||||
context, volume.service_topic_queue, request_spec,
|
||||
filter_properties)
|
||||
backend.consume_from_volume({'size': volume.size})
|
||||
|
||||
except exception.NoValidBackend as ex:
|
||||
self._set_snapshot_state_and_notify('manage_existing_snapshot',
|
||||
snapshot,
|
||||
fields.SnapshotStatus.ERROR,
|
||||
context, ex, request_spec)
|
||||
else:
|
||||
volume_rpcapi.VolumeAPI().manage_existing_snapshot(
|
||||
context, snapshot, ref,
|
||||
volume.service_topic_queue)
|
||||
|
||||
def get_pools(self, context, filters=None):
|
||||
"""Get active pools from scheduler's cache.
|
||||
|
||||
|
@ -71,9 +71,10 @@ class SchedulerAPI(rpc.RPCAPI):
|
||||
3.8 - Addds ``valid_host_capacity`` method
|
||||
3.9 - Adds create_snapshot method
|
||||
3.10 - Adds backup_id to create_volume method.
|
||||
3.11 - Adds manage_existing_snapshot method.
|
||||
"""
|
||||
|
||||
RPC_API_VERSION = '3.10'
|
||||
RPC_API_VERSION = '3.11'
|
||||
RPC_DEFAULT_VERSION = '3.0'
|
||||
TOPIC = constants.SCHEDULER_TOPIC
|
||||
BINARY = 'cinder-scheduler'
|
||||
@ -162,6 +163,20 @@ class SchedulerAPI(rpc.RPCAPI):
|
||||
}
|
||||
return cctxt.cast(ctxt, 'manage_existing', **msg_args)
|
||||
|
||||
@rpc.assert_min_rpc_version('3.11')
|
||||
def manage_existing_snapshot(self, ctxt, volume, snapshot, ref,
|
||||
request_spec=None, filter_properties=None):
|
||||
cctxt = self._get_cctxt()
|
||||
request_spec_p = jsonutils.to_primitive(request_spec)
|
||||
msg_args = {
|
||||
'request_spec': request_spec_p,
|
||||
'filter_properties': filter_properties,
|
||||
'volume': volume,
|
||||
'snapshot': snapshot,
|
||||
'ref': ref,
|
||||
}
|
||||
return cctxt.cast(ctxt, 'manage_existing_snapshot', **msg_args)
|
||||
|
||||
@rpc.assert_min_rpc_version('3.2')
|
||||
def extend_volume(self, ctxt, volume, new_size, reservations,
|
||||
request_spec, filter_properties=None):
|
||||
|
@ -45,7 +45,8 @@ def volume_get(self, context, volume_id, viewable_admin_meta=False):
|
||||
if volume_id == fake.VOLUME_ID:
|
||||
return objects.Volume(context, id=fake.VOLUME_ID,
|
||||
_name_id=fake.VOLUME2_ID,
|
||||
host='fake_host', cluster_name=None)
|
||||
host='fake_host', cluster_name=None,
|
||||
size=1)
|
||||
raise exception.VolumeNotFound(volume_id=volume_id)
|
||||
|
||||
|
||||
@ -107,7 +108,8 @@ class SnapshotManageTest(test.TestCase):
|
||||
res = req.get_response(app())
|
||||
return res
|
||||
|
||||
@mock.patch('cinder.volume.rpcapi.VolumeAPI.manage_existing_snapshot')
|
||||
@mock.patch(
|
||||
'cinder.scheduler.rpcapi.SchedulerAPI.manage_existing_snapshot')
|
||||
@mock.patch('cinder.volume.api.API.create_snapshot_in_db')
|
||||
@mock.patch('cinder.db.sqlalchemy.api.service_get')
|
||||
def test_manage_snapshot_ok(self, mock_db,
|
||||
@ -145,9 +147,10 @@ class SnapshotManageTest(test.TestCase):
|
||||
# correct arguments.
|
||||
self.assertEqual(1, mock_rpcapi.call_count)
|
||||
args = mock_rpcapi.call_args[0]
|
||||
self.assertEqual({u'fake_key': u'fake_ref'}, args[2])
|
||||
self.assertEqual({u'fake_key': u'fake_ref'}, args[3])
|
||||
|
||||
@mock.patch('cinder.volume.rpcapi.VolumeAPI.manage_existing_snapshot')
|
||||
@mock.patch(
|
||||
'cinder.scheduler.rpcapi.SchedulerAPI.manage_existing_snapshot')
|
||||
@mock.patch('cinder.volume.api.API.create_snapshot_in_db')
|
||||
@mock.patch('cinder.objects.service.Service.get_by_id')
|
||||
def test_manage_snapshot_ok_with_metadata_null(
|
||||
@ -167,7 +170,8 @@ class SnapshotManageTest(test.TestCase):
|
||||
# 5th argument of args is metadata.
|
||||
self.assertIsNone(args[5])
|
||||
|
||||
@mock.patch('cinder.volume.rpcapi.VolumeAPI.manage_existing_snapshot')
|
||||
@mock.patch(
|
||||
'cinder.scheduler.rpcapi.SchedulerAPI.manage_existing_snapshot')
|
||||
@mock.patch('cinder.volume.api.API.create_snapshot_in_db')
|
||||
@mock.patch('cinder.db.sqlalchemy.api.service_get')
|
||||
def test_manage_snapshot_ok_ref_as_string(self, mock_db,
|
||||
@ -188,7 +192,7 @@ class SnapshotManageTest(test.TestCase):
|
||||
# correct arguments.
|
||||
self.assertEqual(1, mock_rpcapi.call_count)
|
||||
args = mock_rpcapi.call_args[0]
|
||||
self.assertEqual(body['snapshot']['ref'], args[2])
|
||||
self.assertEqual(body['snapshot']['ref'], args[3])
|
||||
|
||||
@mock.patch('cinder.objects.service.Service.is_up',
|
||||
return_value=True,
|
||||
@ -366,7 +370,8 @@ class SnapshotManageTest(test.TestCase):
|
||||
res.json['badRequest']['message'])
|
||||
self.assertTrue(mock_is_up.called)
|
||||
|
||||
@mock.patch('cinder.volume.rpcapi.VolumeAPI.manage_existing_snapshot')
|
||||
@mock.patch(
|
||||
'cinder.scheduler.rpcapi.SchedulerAPI.manage_existing_snapshot')
|
||||
@mock.patch('cinder.volume.api.API.create_snapshot_in_db')
|
||||
@mock.patch('cinder.objects.service.Service.get_by_id')
|
||||
def test_manage_snapshot_with_null_validate(
|
||||
|
@ -65,7 +65,8 @@ class SnapshotManageTest(test.TestCase):
|
||||
res = req.get_response(app())
|
||||
return res
|
||||
|
||||
@mock.patch('cinder.volume.rpcapi.VolumeAPI.manage_existing_snapshot')
|
||||
@mock.patch(
|
||||
'cinder.scheduler.rpcapi.SchedulerAPI.manage_existing_snapshot')
|
||||
@mock.patch('cinder.volume.api.API.create_snapshot_in_db')
|
||||
@mock.patch('cinder.objects.service.Service.get_by_id')
|
||||
def test_manage_snapshot_route(self, mock_service_get,
|
||||
|
@ -103,6 +103,32 @@ class SchedulerRPCAPITestCase(test.RPCAPITestCase):
|
||||
request_spec=self.fake_rs_obj,
|
||||
version='3.5')
|
||||
|
||||
@mock.patch('oslo_messaging.RPCClient.can_send_version',
|
||||
return_value=True)
|
||||
def test_manage_existing_snapshot(self, can_send_version_mock):
|
||||
self._test_rpc_api('manage_existing_snapshot',
|
||||
rpc_method='cast',
|
||||
volume='fake_volume',
|
||||
snapshot='fake_snapshot',
|
||||
ref='fake_ref',
|
||||
request_spec={'snapshot_id': self.fake_snapshot.id},
|
||||
filter_properties=None)
|
||||
|
||||
@mock.patch('oslo_messaging.RPCClient.can_send_version',
|
||||
return_value=False)
|
||||
def test_manage_existing_snapshot_capped(self, can_send_version_mock):
|
||||
self.assertRaises(exception.ServiceTooOld,
|
||||
self._test_rpc_api,
|
||||
'manage_existing_snapshot',
|
||||
rpc_method='cast',
|
||||
volume=self.fake_volume,
|
||||
snapshot=self.fake_snapshot,
|
||||
ref='fake_ref',
|
||||
request_spec={'snapshot_id': self.fake_snapshot.id,
|
||||
'ref': 'fake_ref'},
|
||||
filter_properties=None,
|
||||
version='3.10')
|
||||
|
||||
@mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True)
|
||||
def test_notify_service_capabilities_backend(self, can_send_version_mock):
|
||||
"""Test sending new backend by RPC instead of old host parameter."""
|
||||
|
@ -74,6 +74,25 @@ class SchedulerManagerTestCase(test.TestCase):
|
||||
sleep_mock.assert_called_once_with(CONF.periodic_interval)
|
||||
self.assertFalse(self.manager._startup_delay)
|
||||
|
||||
@mock.patch('cinder.scheduler.driver.Scheduler.backend_passes_filters')
|
||||
@mock.patch(
|
||||
'cinder.scheduler.host_manager.BackendState.consume_from_volume')
|
||||
@mock.patch('cinder.volume.rpcapi.VolumeAPI.manage_existing_snapshot')
|
||||
def test_manage_existing_snapshot(self, mock_manage_existing_snapshot,
|
||||
mock_consume, mock_backend_passes):
|
||||
volume = fake_volume.fake_volume_obj(self.context, **{'size': 1})
|
||||
fake_backend = fake_scheduler.FakeBackendState('host1', {})
|
||||
mock_backend_passes.return_value = fake_backend
|
||||
|
||||
self.manager.manage_existing_snapshot(self.context, volume,
|
||||
'fake_snapshot', 'fake_ref',
|
||||
None)
|
||||
|
||||
mock_consume.assert_called_once_with({'size': 1})
|
||||
mock_manage_existing_snapshot.assert_called_once_with(
|
||||
self.context, 'fake_snapshot', 'fake_ref',
|
||||
volume.service_topic_queue)
|
||||
|
||||
@mock.patch('cinder.objects.service.Service.get_minimum_rpc_version')
|
||||
@mock.patch('cinder.objects.service.Service.get_minimum_obj_version')
|
||||
@mock.patch('cinder.rpc.LAST_RPC_VERSIONS', {'cinder-volume': '1.3'})
|
||||
|
@ -1839,7 +1839,8 @@ class API(base.Base):
|
||||
def manage_existing_snapshot(self, context, ref, volume,
|
||||
name=None, description=None,
|
||||
metadata=None):
|
||||
service = self._get_service_by_host_cluster(context, volume.host,
|
||||
# Ensure the service is up and not disabled.
|
||||
self._get_service_by_host_cluster(context, volume.host,
|
||||
volume.cluster_name,
|
||||
'snapshot')
|
||||
|
||||
@ -1847,8 +1848,12 @@ class API(base.Base):
|
||||
description, True,
|
||||
metadata, None,
|
||||
commit_quota=True)
|
||||
self.volume_rpcapi.manage_existing_snapshot(
|
||||
context, snapshot_object, ref, service.service_topic_queue)
|
||||
kwargs = {'snapshot_id': snapshot_object.id,
|
||||
'volume_properties':
|
||||
objects.VolumeProperties(size=volume.size)}
|
||||
self.scheduler_rpcapi.manage_existing_snapshot(
|
||||
context, volume, snapshot_object, ref,
|
||||
request_spec=objects.RequestSpec(**kwargs))
|
||||
return snapshot_object
|
||||
|
||||
def get_manageable_snapshots(self, context, host, cluster_name,
|
||||
|
Loading…
Reference in New Issue
Block a user