Remove support for 2.x scheduler RPC API

This commit gets rid of our Mitaka compatibility code in scheduler RPC API.

Change-Id: I270d6db4c15a0bcf7b26af3c68749646f09e7959
This commit is contained in:
Michał Dulko 2016-09-16 11:04:49 +02:00
parent ecd2567374
commit d056718962
11 changed files with 65 additions and 299 deletions

View File

@ -25,7 +25,6 @@ from oslo_log import log as logging
from oslo_utils import excutils from oslo_utils import excutils
from oslo_utils import timeutils from oslo_utils import timeutils
from cinder.common import constants
from cinder import db from cinder import db
from cinder.db import base from cinder.db import base
from cinder import exception from cinder import exception
@ -396,7 +395,6 @@ class API(base.Base):
# to select the target host for this group. # to select the target host for this group.
self.scheduler_rpcapi.create_consistencygroup( self.scheduler_rpcapi.create_consistencygroup(
context, context,
constants.VOLUME_TOPIC,
group, group,
request_spec_list=request_spec_list, request_spec_list=request_spec_list,
filter_properties_list=filter_properties_list) filter_properties_list=filter_properties_list)

View File

@ -26,7 +26,6 @@ from oslo_utils import excutils
from oslo_utils import timeutils from oslo_utils import timeutils
from oslo_utils import uuidutils from oslo_utils import uuidutils
from cinder.common import constants
from cinder import db from cinder import db
from cinder.db import base from cinder.db import base
from cinder import exception from cinder import exception
@ -456,7 +455,6 @@ class API(base.Base):
# to select the target host for this group. # to select the target host for this group.
self.scheduler_rpcapi.create_group( self.scheduler_rpcapi.create_group(
context, context,
constants.VOLUME_TOPIC,
group, group,
group_spec=group_spec, group_spec=group_spec,
request_spec_list=request_spec_list, request_spec_list=request_spec_list,

View File

@ -33,7 +33,6 @@ from cinder import exception
from cinder import flow_utils from cinder import flow_utils
from cinder.i18n import _, _LE from cinder.i18n import _, _LE
from cinder import manager from cinder import manager
from cinder import objects
from cinder import quota from cinder import quota
from cinder import rpc from cinder import rpc
from cinder.scheduler.flows import create_volume from cinder.scheduler.flows import create_volume
@ -59,7 +58,7 @@ class SchedulerManager(manager.Manager):
RPC_API_VERSION = scheduler_rpcapi.SchedulerAPI.RPC_API_VERSION RPC_API_VERSION = scheduler_rpcapi.SchedulerAPI.RPC_API_VERSION
target = messaging.Target(version='2.3') target = messaging.Target(version=RPC_API_VERSION)
def __init__(self, scheduler_driver=None, service_name=None, def __init__(self, scheduler_driver=None, service_name=None,
*args, **kwargs): *args, **kwargs):
@ -67,7 +66,6 @@ class SchedulerManager(manager.Manager):
scheduler_driver = CONF.scheduler_driver scheduler_driver = CONF.scheduler_driver
self.driver = importutils.import_object(scheduler_driver) self.driver = importutils.import_object(scheduler_driver)
super(SchedulerManager, self).__init__(*args, **kwargs) super(SchedulerManager, self).__init__(*args, **kwargs)
self.additional_endpoints.append(_SchedulerV3Proxy(self))
self._startup_delay = True self._startup_delay = True
def init_host_with_rpc(self): def init_host_with_rpc(self):
@ -96,11 +94,8 @@ class SchedulerManager(manager.Manager):
while self._startup_delay and not self.driver.is_ready(): while self._startup_delay and not self.driver.is_ready():
eventlet.sleep(1) eventlet.sleep(1)
def create_consistencygroup(self, context, topic, def create_consistencygroup(self, context, group, request_spec_list=None,
group,
request_spec_list=None,
filter_properties_list=None): filter_properties_list=None):
self._wait_for_scheduler() self._wait_for_scheduler()
try: try:
self.driver.schedule_create_consistencygroup( self.driver.schedule_create_consistencygroup(
@ -121,13 +116,9 @@ class SchedulerManager(manager.Manager):
group.status = 'error' group.status = 'error'
group.save() group.save()
def create_group(self, context, topic, def create_group(self, context, group, group_spec=None,
group, group_filter_properties=None, request_spec_list=None,
group_spec=None,
group_filter_properties=None,
request_spec_list=None,
filter_properties_list=None): filter_properties_list=None):
self._wait_for_scheduler() self._wait_for_scheduler()
try: try:
self.driver.schedule_create_group( self.driver.schedule_create_group(
@ -150,23 +141,11 @@ class SchedulerManager(manager.Manager):
group.status = 'error' group.status = 'error'
group.save() group.save()
def create_volume(self, context, topic, volume_id, snapshot_id=None, def create_volume(self, context, volume, snapshot_id=None, image_id=None,
image_id=None, request_spec=None, request_spec=None, filter_properties=None):
filter_properties=None, volume=None):
self._wait_for_scheduler() self._wait_for_scheduler()
# FIXME(dulek): Remove this in v3.0 of RPC API.
if volume is None:
# For older clients, mimic the old behavior and look up the
# volume by its volume_id.
volume = objects.Volume.get_by_id(context, volume_id)
# FIXME(dulek): Remove this in v3.0 of RPC API.
if isinstance(request_spec, dict):
# We may receive request_spec as dict from older clients.
request_spec = objects.RequestSpec.from_primitives(request_spec)
try: try:
flow_engine = create_volume.get_flow(context, flow_engine = create_volume.get_flow(context,
db, self.driver, db, self.driver,
@ -186,19 +165,12 @@ class SchedulerManager(manager.Manager):
def request_service_capabilities(self, context): def request_service_capabilities(self, context):
volume_rpcapi.VolumeAPI().publish_service_capabilities(context) volume_rpcapi.VolumeAPI().publish_service_capabilities(context)
def migrate_volume_to_host(self, context, topic, volume_id, host, def migrate_volume_to_host(self, context, volume, host, force_host_copy,
force_host_copy, request_spec, request_spec, filter_properties=None):
filter_properties=None, volume=None):
"""Ensure that the host exists and can accept the volume.""" """Ensure that the host exists and can accept the volume."""
self._wait_for_scheduler() self._wait_for_scheduler()
# FIXME(dulek): Remove this in v3.0 of RPC API.
if volume is None:
# For older clients, mimic the old behavior and look up the
# volume by its volume_id.
volume = objects.Volume.get_by_id(context, volume_id)
def _migrate_volume_set_error(self, context, ex, request_spec): def _migrate_volume_set_error(self, context, ex, request_spec):
if volume.status == 'maintenance': if volume.status == 'maintenance':
previous_status = ( previous_status = (
@ -225,26 +197,17 @@ class SchedulerManager(manager.Manager):
tgt_host, tgt_host,
force_host_copy) force_host_copy)
def retype(self, context, topic, volume_id, def retype(self, context, volume, request_spec, filter_properties=None):
request_spec, filter_properties=None, volume=None):
"""Schedule the modification of a volume's type. """Schedule the modification of a volume's type.
:param context: the request context :param context: the request context
:param topic: the topic listened on :param volume: the volume object to retype
:param volume_id: the ID of the volume to retype
:param request_spec: parameters for this retype request :param request_spec: parameters for this retype request
:param filter_properties: parameters to filter by :param filter_properties: parameters to filter by
:param volume: the volume object to retype
""" """
self._wait_for_scheduler() self._wait_for_scheduler()
# FIXME(dulek): Remove this in v3.0 of RPC API.
if volume is None:
# For older clients, mimic the old behavior and look up the
# volume by its volume_id.
volume = objects.Volume.get_by_id(context, volume_id)
def _retype_volume_set_error(self, context, ex, request_spec, def _retype_volume_set_error(self, context, ex, request_spec,
volume_ref, reservations, msg=None): volume_ref, reservations, msg=None):
if reservations: if reservations:
@ -287,18 +250,12 @@ class SchedulerManager(manager.Manager):
reservations, reservations,
old_reservations) old_reservations)
def manage_existing(self, context, topic, volume_id, def manage_existing(self, context, volume, request_spec,
request_spec, filter_properties=None, volume=None): filter_properties=None):
"""Ensure that the host exists and can accept the volume.""" """Ensure that the host exists and can accept the volume."""
self._wait_for_scheduler() self._wait_for_scheduler()
# FIXME(mdulko): Remove this in v3.0 of RPC API.
if volume is None:
# For older clients, mimic the old behavior and look up the
# volume by its volume_id.
volume = objects.Volume.get_by_id(context, volume_id)
def _manage_existing_set_error(self, context, ex, request_spec): def _manage_existing_set_error(self, context, ex, request_spec):
volume_state = {'volume_state': {'status': 'error_managing'}} volume_state = {'volume_state': {'status': 'error_managing'}}
self._set_volume_state_and_notify('manage_existing', volume_state, self._set_volume_state_and_notify('manage_existing', volume_state,
@ -356,84 +313,3 @@ class SchedulerManager(manager.Manager):
rpc.get_notifier("scheduler").error(context, rpc.get_notifier("scheduler").error(context,
'scheduler.' + method, 'scheduler.' + method,
payload) payload)
# TODO(dulek): This goes away immediately in Ocata and is just present in
# Newton so that we can receive v2.x and v3.0 messages.
class _SchedulerV3Proxy(object):
target = messaging.Target(version='3.0')
def __init__(self, manager):
self.manager = manager
def update_service_capabilities(self, context, service_name=None,
host=None, capabilities=None, **kwargs):
return self.manager.update_service_capabilities(
context, service_name=service_name, host=host,
capabilities=capabilities, **kwargs)
def create_consistencygroup(self, context, group, request_spec_list=None,
filter_properties_list=None):
# NOTE(dulek): Second argument here is `topic` which is unused. We're
# getting rid of it in 3.0, hence it's missing from method signature.
return self.manager.create_consistencygroup(
context, None, group, request_spec_list=request_spec_list,
filter_properties_list=filter_properties_list)
def create_group(self, context, group, group_spec=None,
group_filter_properties=None, request_spec_list=None,
filter_properties_list=None):
# NOTE(dulek): Second argument here is `topic` which is unused. We're
# getting rid of it in 3.0, hence it's missing from method signature.
return self.manager.create_group(
context, None, group, group_spec=group_spec,
group_filter_properties=group_filter_properties,
request_spec_list=request_spec_list,
filter_properties_list=filter_properties_list)
def create_volume(self, context, volume, snapshot_id=None, image_id=None,
request_spec=None, filter_properties=None):
# NOTE(dulek): Second argument here is `topic`, which is unused. We're
# getting rid of it in 3.0, hence it's missing from method signature.
# We're also replacing volume_id with volume object (switched from
# optional keyword argument to positional argument).
return self.manager.create_volume(
context, None, volume.id, snapshot_id=snapshot_id,
image_id=image_id, request_spec=request_spec,
filter_properties=filter_properties, volume=volume)
def request_service_capabilities(self, context):
return self.manager.request_service_capabilities(context)
def migrate_volume_to_host(self, context, volume, host,
force_host_copy, request_spec,
filter_properties=None):
# NOTE(dulek): Second argument here is `topic` which is unused. We're
# getting rid of it in 3.0, hence it's missing from method signature.
# We're also replacing volume_id with volume object (switched from
# optional keyword argument to positional argument).
return self.manager.migrate_volume_to_host(
context, None, volume.id, host, force_host_copy, request_spec,
filter_properties=filter_properties, volume=volume)
def retype(self, context, volume, request_spec, filter_properties=None):
# NOTE(dulek): Second argument here is `topic` which is unused. We're
# getting rid of it in 3.0, hence it's missing from method signature.
# We're also replacing volume_id with volume object (switched from
# optional keyword argument to positional argument).
return self.manager.retype(
context, None, volume.id, request_spec,
filter_properties=filter_properties, volume=volume)
def manage_existing(self, context, volume, request_spec,
filter_properties=None):
# NOTE(dulek): Second argument here is `topic` which is unused. We're
# getting rid of it in 3.0, hence it's missing from method signature.
# We're also replacing volume_id with volume object (switched from
# optional keyword argument to positional argument).
return self.manager.manage_existing(
context, None, volume.id, request_spec,
filter_properties=filter_properties, volume=volume)
def get_pools(self, context, filters=None):
return self.manager.get_pools(context, filters=filters)

View File

@ -23,7 +23,7 @@ from cinder import rpc
class SchedulerAPI(rpc.RPCAPI): class SchedulerAPI(rpc.RPCAPI):
"""Client side of the scheduler rpc API. """Client side of the scheduler RPC API.
API version history: API version history:
@ -64,10 +64,9 @@ class SchedulerAPI(rpc.RPCAPI):
TOPIC = constants.SCHEDULER_TOPIC TOPIC = constants.SCHEDULER_TOPIC
BINARY = 'cinder-scheduler' BINARY = 'cinder-scheduler'
def create_consistencygroup(self, ctxt, topic, group, def create_consistencygroup(self, ctxt, group, request_spec_list=None,
request_spec_list=None,
filter_properties_list=None): filter_properties_list=None):
version = self._compat_ver('3.0', '2.0') version = '3.0'
cctxt = self.client.prepare(version=version) cctxt = self.client.prepare(version=version)
request_spec_p_list = [] request_spec_p_list = []
for request_spec in request_spec_list: for request_spec in request_spec_list:
@ -79,17 +78,12 @@ class SchedulerAPI(rpc.RPCAPI):
'filter_properties_list': filter_properties_list, 'filter_properties_list': filter_properties_list,
} }
if version == '2.0':
msg_args['topic'] = topic
return cctxt.cast(ctxt, 'create_consistencygroup', **msg_args) return cctxt.cast(ctxt, 'create_consistencygroup', **msg_args)
def create_group(self, ctxt, topic, group, def create_group(self, ctxt, group, group_spec=None,
group_spec=None, request_spec_list=None, group_filter_properties=None,
request_spec_list=None,
group_filter_properties=None,
filter_properties_list=None): filter_properties_list=None):
version = self._compat_ver('3.0', '2.3') version = '3.0'
cctxt = self.client.prepare(version=version) cctxt = self.client.prepare(version=version)
request_spec_p_list = [] request_spec_p_list = []
for request_spec in request_spec_list: for request_spec in request_spec_list:
@ -104,90 +98,55 @@ class SchedulerAPI(rpc.RPCAPI):
'filter_properties_list': filter_properties_list, 'filter_properties_list': filter_properties_list,
} }
if version == '2.3':
msg_args['topic'] = topic
return cctxt.cast(ctxt, 'create_group', **msg_args) return cctxt.cast(ctxt, 'create_group', **msg_args)
def create_volume(self, ctxt, topic, volume_id, snapshot_id=None, def create_volume(self, ctxt, volume, snapshot_id=None, image_id=None,
image_id=None, request_spec=None, request_spec=None, filter_properties=None):
filter_properties=None, volume=None):
msg_args = {'snapshot_id': snapshot_id, 'image_id': image_id, msg_args = {'snapshot_id': snapshot_id, 'image_id': image_id,
'request_spec': request_spec, 'request_spec': request_spec,
'filter_properties': filter_properties, 'volume': volume} 'filter_properties': filter_properties, 'volume': volume}
version = self._compat_ver('3.0', '2.2', '2.0') version = '3.0'
if version in ('2.2', '2.0'):
msg_args['volume_id'] = volume.id
msg_args['topic'] = topic
if version == '2.0':
# Send request_spec as dict
msg_args['request_spec'] = jsonutils.to_primitive(request_spec)
# NOTE(dulek): This is to keep supporting Mitaka's scheduler which
# expects a dictionary when creating a typeless volume.
if msg_args['request_spec'].get('volume_type') is None:
msg_args['request_spec']['volume_type'] = {}
cctxt = self.client.prepare(version=version) cctxt = self.client.prepare(version=version)
return cctxt.cast(ctxt, 'create_volume', **msg_args) return cctxt.cast(ctxt, 'create_volume', **msg_args)
def migrate_volume_to_host(self, ctxt, topic, volume_id, host, def migrate_volume_to_host(self, ctxt, volume, host, force_host_copy=False,
force_host_copy=False, request_spec=None, request_spec=None, filter_properties=None):
filter_properties=None, volume=None):
request_spec_p = jsonutils.to_primitive(request_spec) request_spec_p = jsonutils.to_primitive(request_spec)
msg_args = {'host': host, 'force_host_copy': force_host_copy, msg_args = {'host': host, 'force_host_copy': force_host_copy,
'request_spec': request_spec_p, 'request_spec': request_spec_p,
'filter_properties': filter_properties, 'volume': volume} 'filter_properties': filter_properties, 'volume': volume}
version = self._compat_ver('3.0', '2.0') version = '3.0'
if version == '2.0':
msg_args['volume_id'] = volume.id
msg_args['topic'] = topic
cctxt = self.client.prepare(version=version) cctxt = self.client.prepare(version=version)
return cctxt.cast(ctxt, 'migrate_volume_to_host', **msg_args) return cctxt.cast(ctxt, 'migrate_volume_to_host', **msg_args)
def retype(self, ctxt, topic, volume_id, request_spec=None, def retype(self, ctxt, volume, request_spec=None, filter_properties=None):
filter_properties=None, volume=None):
request_spec_p = jsonutils.to_primitive(request_spec) request_spec_p = jsonutils.to_primitive(request_spec)
msg_args = {'request_spec': request_spec_p, msg_args = {'request_spec': request_spec_p,
'filter_properties': filter_properties, 'volume': volume} 'filter_properties': filter_properties, 'volume': volume}
version = self._compat_ver('3.0', '2.0') version = '3.0'
if version == '2.0':
msg_args['volume_id'] = volume.id
msg_args['topic'] = topic
cctxt = self.client.prepare(version=version) cctxt = self.client.prepare(version=version)
return cctxt.cast(ctxt, 'retype', **msg_args) return cctxt.cast(ctxt, 'retype', **msg_args)
def manage_existing(self, ctxt, topic, volume_id, def manage_existing(self, ctxt, volume, request_spec=None,
request_spec=None, filter_properties=None, filter_properties=None):
volume=None):
request_spec_p = jsonutils.to_primitive(request_spec) request_spec_p = jsonutils.to_primitive(request_spec)
msg_args = { msg_args = {
'request_spec': request_spec_p, 'request_spec': request_spec_p,
'filter_properties': filter_properties, 'volume': volume, 'filter_properties': filter_properties, 'volume': volume,
} }
version = self._compat_ver('3.0', '2.1', '2.0') version = '3.0'
if version in ('2.1', '2.0'):
msg_args['volume_id'] = volume.id
msg_args['topic'] = topic
if version == '2.0':
msg_args.pop('volume')
cctxt = self.client.prepare(version=version) cctxt = self.client.prepare(version=version)
return cctxt.cast(ctxt, 'manage_existing', **msg_args) return cctxt.cast(ctxt, 'manage_existing', **msg_args)
def get_pools(self, ctxt, filters=None): def get_pools(self, ctxt, filters=None):
version = self._compat_ver('3.0', '2.0') version = '3.0'
cctxt = self.client.prepare(version=version) cctxt = self.client.prepare(version=version)
return cctxt.call(ctxt, 'get_pools', return cctxt.call(ctxt, 'get_pools',
filters=filters) filters=filters)
def update_service_capabilities(self, ctxt, def update_service_capabilities(self, ctxt, service_name, host,
service_name, host,
capabilities): capabilities):
version = self._compat_ver('3.0', '2.0') version = '3.0'
cctxt = self.client.prepare(fanout=True, version=version) cctxt = self.client.prepare(fanout=True, version=version)
cctxt.cast(ctxt, 'update_service_capabilities', cctxt.cast(ctxt, 'update_service_capabilities',
service_name=service_name, host=host, service_name=service_name, host=host,

View File

@ -19,7 +19,6 @@ Unit Tests for cinder.scheduler.rpcapi
import copy import copy
import ddt
import mock import mock
from cinder import context from cinder import context
@ -29,13 +28,10 @@ from cinder.tests.unit import fake_constants
from cinder.tests.unit import fake_volume from cinder.tests.unit import fake_volume
@ddt.ddt
class SchedulerRpcAPITestCase(test.TestCase): class SchedulerRpcAPITestCase(test.TestCase):
def setUp(self): def setUp(self):
super(SchedulerRpcAPITestCase, self).setUp() super(SchedulerRpcAPITestCase, self).setUp()
self.patch('oslo_messaging.RPCClient.can_send_version',
return_value=True)
self.context = context.RequestContext('fake_user', 'fake_project') self.context = context.RequestContext('fake_user', 'fake_project')
self.volume_id = fake_constants.VOLUME_ID self.volume_id = fake_constants.VOLUME_ID
@ -89,12 +85,9 @@ class SchedulerRpcAPITestCase(test.TestCase):
fanout=True, fanout=True,
version='3.0') version='3.0')
@mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True) def test_create_volume(self):
def test_create_volume(self, can_send_version):
self._test_scheduler_api('create_volume', self._test_scheduler_api('create_volume',
rpc_method='cast', rpc_method='cast',
topic='topic',
volume_id=self.volume_id,
snapshot_id='snapshot_id', snapshot_id='snapshot_id',
image_id='image_id', image_id='image_id',
request_spec='fake_request_spec', request_spec='fake_request_spec',
@ -102,29 +95,10 @@ class SchedulerRpcAPITestCase(test.TestCase):
volume=fake_volume.fake_volume_obj( volume=fake_volume.fake_volume_obj(
self.context), self.context),
version='3.0') version='3.0')
can_send_version.assert_has_calls([mock.call('3.0')])
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=False)
def test_create_volume_serialization(self, can_send_version):
self._test_scheduler_api('create_volume',
rpc_method='cast',
topic='topic',
volume_id=self.volume_id,
snapshot_id='snapshot_id',
image_id='image_id',
request_spec={'volume_type': {}},
filter_properties='filter_properties',
volume=fake_volume.fake_volume_obj(
self.context),
version='2.0')
can_send_version.assert_has_calls([mock.call('3.0'), mock.call('2.2')])
def test_migrate_volume_to_host(self): def test_migrate_volume_to_host(self):
self._test_scheduler_api('migrate_volume_to_host', self._test_scheduler_api('migrate_volume_to_host',
rpc_method='cast', rpc_method='cast',
topic='topic',
volume_id=self.volume_id,
host='host', host='host',
force_host_copy=True, force_host_copy=True,
request_spec='fake_request_spec', request_spec='fake_request_spec',
@ -136,28 +110,20 @@ class SchedulerRpcAPITestCase(test.TestCase):
def test_retype(self): def test_retype(self):
self._test_scheduler_api('retype', self._test_scheduler_api('retype',
rpc_method='cast', rpc_method='cast',
topic='topic',
volume_id=self.volume_id,
request_spec='fake_request_spec', request_spec='fake_request_spec',
filter_properties='filter_properties', filter_properties='filter_properties',
volume=fake_volume.fake_volume_obj( volume=fake_volume.fake_volume_obj(
self.context), self.context),
version='3.0') version='3.0')
@ddt.data('2.0', '2.1') def test_manage_existing(self):
@mock.patch('oslo_messaging.RPCClient.can_send_version')
def test_manage_existing(self, version, can_send_version):
can_send_version.side_effect = lambda x: x == version
self._test_scheduler_api('manage_existing', self._test_scheduler_api('manage_existing',
rpc_method='cast', rpc_method='cast',
topic='topic',
volume_id=self.volume_id,
request_spec='fake_request_spec', request_spec='fake_request_spec',
filter_properties='filter_properties', filter_properties='filter_properties',
volume=fake_volume.fake_volume_obj( volume=fake_volume.fake_volume_obj(
self.context), self.context),
version=version) version='3.0')
can_send_version.assert_has_calls([mock.call('3.0'), mock.call('2.1')])
def test_get_pools(self): def test_get_pools(self):
self._test_scheduler_api('get_pools', self._test_scheduler_api('get_pools',
@ -168,7 +134,6 @@ class SchedulerRpcAPITestCase(test.TestCase):
def test_create_group(self): def test_create_group(self):
self._test_scheduler_api('create_group', self._test_scheduler_api('create_group',
rpc_method='cast', rpc_method='cast',
topic='topic',
group='group', group='group',
group_spec='group_spec_p', group_spec='group_spec_p',
request_spec_list=['fake_request_spec_list'], request_spec_list=['fake_request_spec_list'],

View File

@ -129,17 +129,15 @@ class SchedulerManagerTestCase(test.TestCase):
# Puts the volume in 'error' state and eats the exception. # Puts the volume in 'error' state and eats the exception.
_mock_sched_create.side_effect = exception.NoValidHost(reason="") _mock_sched_create.side_effect = exception.NoValidHost(reason="")
volume = fake_volume.fake_volume_obj(self.context) volume = fake_volume.fake_volume_obj(self.context)
topic = 'fake_topic'
request_spec = {'volume_id': volume.id, request_spec = {'volume_id': volume.id,
'volume': {'id': volume.id, '_name_id': None, 'volume': {'id': volume.id, '_name_id': None,
'metadata': {}, 'admin_metadata': {}, 'metadata': {}, 'admin_metadata': {},
'glance_metadata': {}}} 'glance_metadata': {}}}
request_spec_obj = objects.RequestSpec.from_primitives(request_spec) request_spec_obj = objects.RequestSpec.from_primitives(request_spec)
self.manager.create_volume(self.context, topic, volume.id, self.manager.create_volume(self.context, volume,
request_spec=request_spec, request_spec=request_spec_obj,
filter_properties={}, filter_properties={})
volume=volume)
_mock_volume_update.assert_called_once_with(self.context, _mock_volume_update.assert_called_once_with(self.context,
volume.id, volume.id,
{'status': 'error'}) {'status': 'error'})
@ -155,15 +153,13 @@ class SchedulerManagerTestCase(test.TestCase):
@mock.patch('eventlet.sleep') @mock.patch('eventlet.sleep')
def test_create_volume_no_delay(self, _mock_sleep, _mock_sched_create): def test_create_volume_no_delay(self, _mock_sleep, _mock_sched_create):
volume = fake_volume.fake_volume_obj(self.context) volume = fake_volume.fake_volume_obj(self.context)
topic = 'fake_topic'
request_spec = {'volume_id': volume.id} request_spec = {'volume_id': volume.id}
request_spec_obj = objects.RequestSpec.from_primitives(request_spec) request_spec_obj = objects.RequestSpec.from_primitives(request_spec)
self.manager.create_volume(self.context, topic, volume.id, self.manager.create_volume(self.context, volume,
request_spec=request_spec, request_spec=request_spec_obj,
filter_properties={}, filter_properties={})
volume=volume)
_mock_sched_create.assert_called_once_with(self.context, _mock_sched_create.assert_called_once_with(self.context,
request_spec_obj, {}) request_spec_obj, {})
self.assertFalse(_mock_sleep.called) self.assertFalse(_mock_sleep.called)
@ -176,17 +172,15 @@ class SchedulerManagerTestCase(test.TestCase):
_mock_sched_create): _mock_sched_create):
self.manager._startup_delay = True self.manager._startup_delay = True
volume = fake_volume.fake_volume_obj(self.context) volume = fake_volume.fake_volume_obj(self.context)
topic = 'fake_topic'
request_spec = {'volume_id': volume.id} request_spec = {'volume_id': volume.id}
request_spec_obj = objects.RequestSpec.from_primitives(request_spec) request_spec_obj = objects.RequestSpec.from_primitives(request_spec)
_mock_is_ready.side_effect = [False, False, True] _mock_is_ready.side_effect = [False, False, True]
self.manager.create_volume(self.context, topic, volume.id, self.manager.create_volume(self.context, volume,
request_spec=request_spec, request_spec=request_spec_obj,
filter_properties={}, filter_properties={})
volume=volume)
_mock_sched_create.assert_called_once_with(self.context, _mock_sched_create.assert_called_once_with(self.context,
request_spec_obj, {}) request_spec_obj, {})
calls = [mock.call(1)] * 2 calls = [mock.call(1)] * 2
@ -201,17 +195,15 @@ class SchedulerManagerTestCase(test.TestCase):
_mock_sched_create): _mock_sched_create):
self.manager._startup_delay = True self.manager._startup_delay = True
volume = fake_volume.fake_volume_obj(self.context) volume = fake_volume.fake_volume_obj(self.context)
topic = 'fake_topic'
request_spec = {'volume_id': volume.id} request_spec = {'volume_id': volume.id}
request_spec_obj = objects.RequestSpec.from_primitives(request_spec) request_spec_obj = objects.RequestSpec.from_primitives(request_spec)
_mock_is_ready.return_value = True _mock_is_ready.return_value = True
self.manager.create_volume(self.context, topic, volume.id, self.manager.create_volume(self.context, volume,
request_spec=request_spec, request_spec=request_spec_obj,
filter_properties={}, filter_properties={})
volume=volume)
_mock_sched_create.assert_called_once_with(self.context, _mock_sched_create.assert_called_once_with(self.context,
request_spec_obj, {}) request_spec_obj, {})
self.assertFalse(_mock_sleep.called) self.assertFalse(_mock_sleep.called)
@ -248,16 +240,13 @@ class SchedulerManagerTestCase(test.TestCase):
status=status, status=status,
previous_status='available') previous_status='available')
fake_volume_id = volume.id fake_volume_id = volume.id
topic = 'fake_topic'
request_spec = {'volume_id': fake_volume_id} request_spec = {'volume_id': fake_volume_id}
_mock_host_passes.side_effect = exception.NoValidHost(reason="") _mock_host_passes.side_effect = exception.NoValidHost(reason="")
_mock_volume_get.return_value = volume _mock_volume_get.return_value = volume
self.manager.migrate_volume_to_host(self.context, topic, self.manager.migrate_volume_to_host(self.context, volume, 'host', True,
fake_volume_id, 'host', True,
request_spec=request_spec, request_spec=request_spec,
filter_properties={}, filter_properties={})
volume=volume)
_mock_volume_update.assert_called_once_with(self.context, _mock_volume_update.assert_called_once_with(self.context,
fake_volume_id, fake_volume_id,
fake_updates) fake_updates)
@ -279,7 +268,6 @@ class SchedulerManagerTestCase(test.TestCase):
instance_uuid, None, instance_uuid, None,
'/dev/fake') '/dev/fake')
_mock_vol_attachment_get.return_value = [volume_attach] _mock_vol_attachment_get.return_value = [volume_attach]
topic = 'fake_topic'
reservations = mock.sentinel.reservations reservations = mock.sentinel.reservations
request_spec = {'volume_id': volume.id, 'volume_type': {'id': 3}, request_spec = {'volume_id': volume.id, 'volume_type': {'id': 3},
'migration_policy': 'on-demand', 'migration_policy': 'on-demand',
@ -290,10 +278,8 @@ class SchedulerManagerTestCase(test.TestCase):
orig_retype = self.manager.driver.find_retype_host orig_retype = self.manager.driver.find_retype_host
self.manager.driver.find_retype_host = _mock_find_retype_host self.manager.driver.find_retype_host = _mock_find_retype_host
self.manager.retype(self.context, topic, volume.id, self.manager.retype(self.context, volume, request_spec=request_spec,
request_spec=request_spec, filter_properties={})
filter_properties={},
volume=volume)
_mock_find_retype_host.assert_called_once_with(self.context, _mock_find_retype_host.assert_called_once_with(self.context,
request_spec, {}, request_spec, {},
@ -319,7 +305,6 @@ class SchedulerManagerTestCase(test.TestCase):
self.assertRaises(exception.CinderException, self.assertRaises(exception.CinderException,
self.manager.create_consistencygroup, self.manager.create_consistencygroup,
self.context, self.context,
'volume',
consistencygroup_obj) consistencygroup_obj)
self.assertGreater(LOG.exception.call_count, 0) self.assertGreater(LOG.exception.call_count, 0)
db.consistencygroup_update.assert_called_once_with( db.consistencygroup_update.assert_called_once_with(
@ -333,7 +318,7 @@ class SchedulerManagerTestCase(test.TestCase):
mock_cg.side_effect = exception.NoValidHost( mock_cg.side_effect = exception.NoValidHost(
reason="No weighed hosts available") reason="No weighed hosts available")
self.manager.create_consistencygroup( self.manager.create_consistencygroup(
self.context, 'volume', consistencygroup_obj) self.context, consistencygroup_obj)
self.assertGreater(LOG.error.call_count, 0) self.assertGreater(LOG.error.call_count, 0)
db.consistencygroup_update.assert_called_once_with( db.consistencygroup_update.assert_called_once_with(
self.context, group_id, {'status': ( self.context, group_id, {'status': (

View File

@ -36,14 +36,12 @@ class FakeSchedulerRpcAPI(object):
self.expected_spec = expected_spec self.expected_spec = expected_spec
self.test_inst = test_inst self.test_inst = test_inst
def create_volume(self, ctxt, topic, volume_id, snapshot_id=None, def create_volume(self, ctxt, volume, snapshot_id=None, image_id=None,
image_id=None, request_spec=None, request_spec=None, filter_properties=None):
filter_properties=None, volume=None):
self.test_inst.assertEqual(self.expected_spec, request_spec) self.test_inst.assertEqual(self.expected_spec, request_spec)
def manage_existing(self, context, volume_topic, volume_id, def manage_existing(self, context, volume, request_spec=None):
request_spec=None, volume=None):
self.test_inst.assertEqual(self.expected_spec, request_spec) self.test_inst.assertEqual(self.expected_spec, request_spec)

View File

@ -1385,12 +1385,10 @@ class API(base.Base):
'volume_type': volume_type, 'volume_type': volume_type,
'volume_id': volume.id} 'volume_id': volume.id}
self.scheduler_rpcapi.migrate_volume_to_host(context, self.scheduler_rpcapi.migrate_volume_to_host(context,
constants.VOLUME_TOPIC, volume,
volume.id,
host, host,
force_host_copy, force_host_copy,
request_spec, request_spec)
volume=volume)
LOG.info(_LI("Migrate volume request issued successfully."), LOG.info(_LI("Migrate volume request issued successfully."),
resource=volume) resource=volume)
@ -1530,10 +1528,9 @@ class API(base.Base):
'quota_reservations': reservations, 'quota_reservations': reservations,
'old_reservations': old_reservations} 'old_reservations': old_reservations}
self.scheduler_rpcapi.retype(context, constants.VOLUME_TOPIC, self.scheduler_rpcapi.retype(context, volume,
volume.id,
request_spec=request_spec, request_spec=request_spec,
filter_properties={}, volume=volume) filter_properties={})
LOG.info(_LI("Retype volume request issued successfully."), LOG.info(_LI("Retype volume request issued successfully."),
resource=volume) resource=volume)

View File

@ -19,7 +19,6 @@ import taskflow.engines
from taskflow.patterns import linear_flow from taskflow.patterns import linear_flow
from taskflow.types import failure as ft from taskflow.types import failure as ft
from cinder.common import constants
from cinder import exception from cinder import exception
from cinder import flow_utils from cinder import flow_utils
from cinder.i18n import _, _LE, _LW from cinder.i18n import _, _LE, _LW
@ -707,7 +706,6 @@ class VolumeCastTask(flow_utils.CinderTask):
def _cast_create_volume(self, context, request_spec, filter_properties): def _cast_create_volume(self, context, request_spec, filter_properties):
source_volid = request_spec['source_volid'] source_volid = request_spec['source_volid']
source_replicaid = request_spec['source_replicaid'] source_replicaid = request_spec['source_replicaid']
volume_id = request_spec['volume_id']
volume = request_spec['volume'] volume = request_spec['volume']
snapshot_id = request_spec['snapshot_id'] snapshot_id = request_spec['snapshot_id']
image_id = request_spec['image_id'] image_id = request_spec['image_id']
@ -754,13 +752,11 @@ class VolumeCastTask(flow_utils.CinderTask):
# to select the target host for this volume. # to select the target host for this volume.
self.scheduler_rpcapi.create_volume( self.scheduler_rpcapi.create_volume(
context, context,
constants.VOLUME_TOPIC, volume,
volume_id,
snapshot_id=snapshot_id, snapshot_id=snapshot_id,
image_id=image_id, image_id=image_id,
request_spec=request_spec, request_spec=request_spec,
filter_properties=filter_properties, filter_properties=filter_properties)
volume=volume)
else: else:
# Bypass the scheduler and send the request directly to the volume # Bypass the scheduler and send the request directly to the volume
# manager. # manager.

View File

@ -16,7 +16,6 @@ import taskflow.engines
from taskflow.patterns import linear_flow from taskflow.patterns import linear_flow
from taskflow.types import failure as ft from taskflow.types import failure as ft
from cinder.common import constants
from cinder import exception from cinder import exception
from cinder import flow_utils from cinder import flow_utils
from cinder.i18n import _LE from cinder.i18n import _LE
@ -108,10 +107,8 @@ class ManageCastTask(flow_utils.CinderTask):
# Call the scheduler to ensure that the host exists and that it can # Call the scheduler to ensure that the host exists and that it can
# accept the volume # accept the volume
self.scheduler_rpcapi.manage_existing(context, constants.VOLUME_TOPIC, self.scheduler_rpcapi.manage_existing(context, volume,
volume.id, request_spec=request_spec)
request_spec=request_spec,
volume=volume)
def revert(self, context, result, flow_failures, volume, **kwargs): def revert(self, context, result, flow_failures, volume, **kwargs):
# Restore the source volume status and set the volume to error status. # Restore the source volume status and set the volume to error status.

View File

@ -20,7 +20,6 @@ import taskflow.engines
from taskflow.patterns import linear_flow from taskflow.patterns import linear_flow
from taskflow.types import failure as ft from taskflow.types import failure as ft
from cinder.common import constants
from cinder import context as cinder_context from cinder import context as cinder_context
from cinder import exception from cinder import exception
from cinder import flow_utils from cinder import flow_utils
@ -144,10 +143,8 @@ class OnFailureRescheduleTask(flow_utils.CinderTask):
# Stringify to avoid circular ref problem in json serialization # Stringify to avoid circular ref problem in json serialization
retry_info['exc'] = traceback.format_exception(*cause.exc_info) retry_info['exc'] = traceback.format_exception(*cause.exc_info)
return create_volume(context, constants.VOLUME_TOPIC, volume.id, return create_volume(context, volume, request_spec=request_spec,
request_spec=request_spec, filter_properties=filter_properties)
filter_properties=filter_properties,
volume=volume)
def _post_reschedule(self, volume): def _post_reschedule(self, volume):
"""Actions that happen after the rescheduling attempt occur here.""" """Actions that happen after the rescheduling attempt occur here."""