Allow triggering cleanup from API

Now that we support having multiple c-vol services using the same
storage backend under one cluster, they no longer clean all resources
from the backend with ongoing statuses in the DB, only those from their
own host because those are failed operations that were left "in the air"
when the service was stopped.  So we need a way to trigger the cleanup
of resources that were being processed by another c-vol service that
failed in the same cluster.

This patch adds a new API endpoint (/workers/cleanup) that will trigger
cleanup for c-vol services as microversion 3.19.

The cleanup will be performed by other services that share the same
cluster, so at least one of them must be up to be able to do the
cleanup.

Cleanup cannot be triggered during a cloud upgrade, but a restarted
service will still cleanup it's own resources during an upgrade.

If no arguments are provided cleanup will try to issue a clean message
for all nodes that are down, but we can restrict which nodes we want to
be cleaned using parameters `service_id`, `cluster_name`, `host`,
`binary`, and `disabled`.

Cleaning specific resources is also possible using `resource_type` and
`resource_id` parameters.

We can even force cleanup on nodes that are up with `is_up`, but that's
not recommended and should only used if you know what you are doing.
For example if you know a specific cinder-volume is down even though
it's still not being reported as down when listing the services and you
know the cluster has at least another service to do the cleanup.

API will return a dictionary with 2 lists, one with services that have
been issued a cleanup request (`cleaning` key) and another list with
services that cannot be cleaned right now because there is no
alternative service to do the cleanup in that cluster (`unavailable`
key).

Data returned for each service element in these two lists consist of the
`id`, `host`, `binary`, and `cluster_name`.  These are not the services
that will be performing the cleanup, but the services that will be
cleaned up or couldn't be cleaned up.

Specs: https://specs.openstack.org/openstack/cinder-specs/specs/newton/ha-aa-cleanup.html

APIImpact: New /workers/cleanup entry
Implements: blueprint cinder-volume-active-active-support
Change-Id: If336b6569b171846954ed6eb73f5a4314c6c7e2e
This commit is contained in:
Gorka Eguileor 2016-08-24 17:02:46 +02:00
parent 179e35c31a
commit efa8e210c9
17 changed files with 685 additions and 8 deletions

View File

@ -73,6 +73,7 @@ REST_API_VERSION_HISTORY = """
* 3.21 - Show provider_id in detailed view of a volume for admin.
* 3.22 - Add filtering based on metadata for snapshot listing.
* 3.23 - Allow passing force parameter to volume delete.
* 3.24 - Add workers/cleanup endpoint.
"""
# The minimum and maximum versions of the API supported
@ -80,7 +81,7 @@ REST_API_VERSION_HISTORY = """
# minimum version of the API supported.
# Explicitly using /v1 or /v2 enpoints will still work
_MIN_API_VERSION = "3.0"
_MAX_API_VERSION = "3.23"
_MAX_API_VERSION = "3.24"
_LEGACY_API_VERSION1 = "1.0"
_LEGACY_API_VERSION2 = "2.0"

View File

@ -227,3 +227,36 @@ user documentation.
----
Added support to filter snapshot list based on metadata of snapshot.
3.24
----
New API endpoint /workers/cleanup allows triggering cleanup for cinder-volume
services. Meant for cleaning ongoing operations from failed nodes.
The cleanup will be performed by other services belonging to the same
cluster, so at least one of them must be up to be able to do the cleanup.
Cleanup cannot be triggered during a cloud upgrade.
If no arguments are provided cleanup will try to issue a clean message for
all nodes that are down, but we can restrict which nodes we want to be
cleaned using parameters ``service_id``, ``cluster_name``, ``host``,
``binary``, and ``disabled``.
Cleaning specific resources is also possible using ``resource_type`` and
``resource_id`` parameters.
We can even force cleanup on nodes that are up with ``is_up``, but that's
not recommended and should only used if you know what you are doing. For
example if you know a specific cinder-volume is down even though it's still
not being reported as down when listing the services and you know the cluster
has at least another service to do the cleanup.
API will return a dictionary with 2 lists, one with services that have been
issued a cleanup request (``cleaning`` key) and another list with services
that cannot be cleaned right now because there is no alternative service to
do the cleanup in that cluster (``unavailable`` key).
Data returned for each service element in these two lists consist of the
``id``, ``host``, ``binary``, and ``cluster_name``. These are not the
services that will be performing the cleanup, but the services that will be
cleaned up or couldn't be cleaned up.

View File

@ -37,6 +37,7 @@ from cinder.api.v3 import snapshots
from cinder.api.v3 import volume_manage
from cinder.api.v3 import volume_metadata
from cinder.api.v3 import volumes
from cinder.api.v3 import workers
from cinder.api import versions
@ -173,3 +174,8 @@ class APIRouter(cinder.api.openstack.APIRouter):
mapper.resource("backup", "backups",
controller=self.resources['backups'],
collection={'detail': 'GET'})
self.resources['workers'] = workers.create_resource()
mapper.resource('worker', 'workers',
controller=self.resources['workers'],
collection={'cleanup': 'POST'})

View File

@ -0,0 +1,25 @@
# Copyright (c) 2016 Red Hat Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class ViewBuilder(object):
"""Map Cluster into dicts for API responses."""
_collection_name = 'workers'
@classmethod
def service_list(cls, services):
return [{'id': s.id, 'host': s.host, 'binary': s.binary,
'cluster_name': s.cluster_name} for s in services]

124
cinder/api/v3/workers.py Normal file
View File

@ -0,0 +1,124 @@
# Copyright (c) 2016 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import timeutils
from oslo_utils import uuidutils
from cinder.api.openstack import wsgi
from cinder.api.v3.views import workers as workers_view
from cinder import db
from cinder import exception
from cinder.i18n import _
from cinder import objects
from cinder.objects import cleanable
from cinder.scheduler import rpcapi as sch_rpc
from cinder import utils
class WorkerController(wsgi.Controller):
allowed_clean_keys = {'service_id', 'cluster_name', 'host', 'binary',
'is_up', 'disabled', 'resource_id', 'resource_type',
'until'}
policy_checker = wsgi.Controller.get_policy_checker('workers')
def __init__(self, *args, **kwargs):
self.sch_api = sch_rpc.SchedulerAPI()
def _prepare_params(self, ctxt, params, allowed):
if not allowed.issuperset(params):
invalid_keys = set(params).difference(allowed)
msg = _('Invalid filter keys: %s') % ', '.join(invalid_keys)
raise exception.InvalidInput(reason=msg)
if params.get('binary') not in (None, 'cinder-volume',
'cinder-scheduler'):
msg = _('binary must be empty or set to cinder-volume or '
'cinder-scheduler')
raise exception.InvalidInput(reason=msg)
for boolean in ('disabled', 'is_up'):
if params.get(boolean) is not None:
params[boolean] = utils.get_bool_param(boolean, params)
resource_type = params.get('resource_type')
if resource_type:
resource_type = resource_type.title()
types = cleanable.CinderCleanableObject.cleanable_resource_types
if resource_type not in types:
msg = (_('Resource type %s not valid, must be ') %
resource_type)
msg = utils.build_or_str(types, msg + '%s.')
raise exception.InvalidInput(reason=msg)
params['resource_type'] = resource_type
resource_id = params.get('resource_id')
if resource_id:
if not uuidutils.is_uuid_like(resource_id):
msg = (_('Resource ID must be a UUID, and %s is not.') %
resource_id)
raise exception.InvalidInput(reason=msg)
# If we have the resource type but we don't have where it is
# located, we get it from the DB to limit the distribution of the
# request by the scheduler, otherwise it will be distributed to all
# the services.
location_keys = {'service_id', 'cluster_name', 'host'}
if not location_keys.intersection(params):
workers = db.worker_get_all(ctxt, resource_id=resource_id,
binary=params.get('binary'),
resource_type=resource_type)
if len(workers) == 0:
msg = (_('There is no resource with UUID %s pending '
'cleanup.'), resource_id)
raise exception.InvalidInput(reason=msg)
if len(workers) > 1:
msg = (_('There are multiple resources with UUID %s '
'pending cleanup. Please be more specific.'),
resource_id)
raise exception.InvalidInput(reason=msg)
worker = workers[0]
params.update(service_id=worker.service_id,
resource_type=worker.resource_type)
return params
@wsgi.Controller.api_version('3.24')
@wsgi.response(202)
def cleanup(self, req, body=None):
"""Do the cleanup on resources from a specific service/host/node."""
# Let the wsgi middleware convert NotAuthorized exceptions
ctxt = self.policy_checker(req, 'cleanup')
body = body or {}
params = self._prepare_params(ctxt, body, self.allowed_clean_keys)
params['until'] = timeutils.utcnow()
# NOTE(geguileo): If is_up is not specified in the request
# CleanupRequest's default will be used (False)
cleanup_request = objects.CleanupRequest(**params)
cleaning, unavailable = self.sch_api.work_cleanup(ctxt,
cleanup_request)
return {
'cleaning': workers_view.ViewBuilder.service_list(cleaning),
'unavailable': workers_view.ViewBuilder.service_list(unavailable),
}
def create_resource():
return wsgi.Resource(WorkerController())

View File

@ -239,6 +239,10 @@ class ServiceUnavailable(Invalid):
message = _("Service is unavailable at this time.")
class UnavailableDuringUpgrade(Invalid):
message = _('Cannot perform %(action)s during system upgrade.')
class ImageUnacceptable(Invalid):
message = _("Image %(image_id)s is unacceptable: %(reason)s")

View File

@ -26,14 +26,28 @@ from cinder.volume import rpcapi as vol_rpcapi
class CinderCleanableObject(base.CinderPersistentObject):
"""Base class for cleanable OVO resources."""
"""Base class for cleanable OVO resources.
All cleanable objects must have a host property/attribute.
"""
worker = None
cleanable_resource_types = set()
@classmethod
def get_rpc_api(cls):
# By default assume all resources are handled by c-vol services
return vol_rpcapi.VolumeAPI
@classmethod
def cinder_ovo_cls_init(cls):
"""Called on OVO registration, sets set of cleanable resources."""
# First call persistent object method to store the DB model
super(CinderCleanableObject, cls).cinder_ovo_cls_init()
# Add this class to the set of resources
cls.cleanable_resource_types.add(cls.obj_name())
@classmethod
def get_pinned_version(cls):
# We pin the version by the last service that gets updated, which is

View File

@ -263,6 +263,11 @@ class Snapshot(cleanable.CinderCleanableObject, base.CinderObject,
return False
return status == 'creating'
@property
def host(self):
"""All cleanable VO must have a host property/attribute."""
return self.volume.host
@base.CinderObjectRegistry.register
class SnapshotList(base.ObjectListBase, base.CinderObject):

View File

@ -19,6 +19,7 @@
Scheduler Service
"""
import collections
from datetime import datetime
import eventlet
@ -28,13 +29,14 @@ import oslo_messaging as messaging
from oslo_utils import excutils
from oslo_utils import importutils
from oslo_utils import timeutils
from oslo_utils import versionutils
import six
from cinder import context
from cinder import db
from cinder import exception
from cinder import flow_utils
from cinder.i18n import _, _LE
from cinder.i18n import _, _LE, _LI
from cinder import manager
from cinder import objects
from cinder import quota
@ -71,6 +73,10 @@ class SchedulerManager(manager.CleanableManager, manager.Manager):
self.driver = importutils.import_object(scheduler_driver)
super(SchedulerManager, self).__init__(*args, **kwargs)
self._startup_delay = True
self.volume_api = volume_rpcapi.VolumeAPI()
self.sch_api = scheduler_rpcapi.SchedulerAPI()
self.rpc_api_version = versionutils.convert_version_to_int(
self.RPC_API_VERSION)
def init_host_with_rpc(self):
ctxt = context.get_admin_context()
@ -81,6 +87,8 @@ class SchedulerManager(manager.CleanableManager, manager.Manager):
def reset(self):
super(SchedulerManager, self).reset()
self.volume_api = volume_rpcapi.VolumeAPI()
self.sch_api = scheduler_rpcapi.SchedulerAPI()
self.driver.reset()
def update_service_capabilities(self, context, service_name=None,
@ -373,3 +381,99 @@ class SchedulerManager(manager.CleanableManager, manager.Manager):
rpc.get_notifier("scheduler").error(context,
'scheduler.' + method,
payload)
@property
def upgrading_cloud(self):
min_version_str = self.sch_api.determine_rpc_version_cap()
min_version = versionutils.convert_version_to_int(min_version_str)
return min_version < self.rpc_api_version
def _cleanup_destination(self, clusters, service):
"""Determines the RPC method, destination service and name.
The name is only used for logging, and it is the topic queue.
"""
# For the scheduler we don't have a specific destination, as any
# scheduler will do and we know we are up, since we are running this
# code.
if service.binary == 'cinder-scheduler':
cleanup_rpc = self.sch_api.do_cleanup
dest = None
dest_name = service.host
else:
cleanup_rpc = self.volume_api.do_cleanup
# For clustered volume services we try to get info from the cache.
if service.is_clustered:
# Get cluster info from cache
dest = clusters[service.binary].get(service.cluster_name)
# Cache miss forces us to get the cluster from the DB via OVO
if not dest:
dest = service.cluster
clusters[service.binary][service.cluster_name] = dest
dest_name = dest.name
# Non clustered volume services
else:
dest = service
dest_name = service.host
return cleanup_rpc, dest, dest_name
def work_cleanup(self, context, cleanup_request):
"""Process request from API to do cleanup on services.
Here we retrieve from the DB which services we want to clean up based
on the request from the user.
Then send individual cleanup requests to each of the services that are
up, and we finally return a tuple with services that we have sent a
cleanup request and those that were not up and we couldn't send it.
"""
if self.upgrading_cloud:
raise exception.UnavailableDuringUpgrade(action='workers cleanup')
LOG.info(_LI('Workers cleanup request started.'))
filters = dict(service_id=cleanup_request.service_id,
cluster_name=cleanup_request.cluster_name,
host=cleanup_request.host,
binary=cleanup_request.binary,
is_up=cleanup_request.is_up,
disabled=cleanup_request.disabled)
# Get the list of all the services that match the request
services = objects.ServiceList.get_all(context, filters)
until = cleanup_request.until or timeutils.utcnow()
requested = []
not_requested = []
# To reduce DB queries we'll cache the clusters data
clusters = collections.defaultdict(dict)
for service in services:
cleanup_request.cluster_name = service.cluster_name
cleanup_request.service_id = service.id
cleanup_request.host = service.host
cleanup_request.binary = service.binary
cleanup_request.until = until
cleanup_rpc, dest, dest_name = self._cleanup_destination(clusters,
service)
# If it's a scheduler or the service is up, send the request.
if not dest or dest.is_up:
LOG.info(_LI('Sending cleanup for %(binary)s %(dest_name)s.'),
{'binary': service.binary,
'dest_name': dest_name})
cleanup_rpc(context, cleanup_request)
requested.append(service)
# We don't send cleanup requests when there are no services alive
# to do the cleanup.
else:
LOG.info(_LI('No service available to cleanup %(binary)s '
'%(dest_name)s.'),
{'binary': service.binary,
'dest_name': dest_name})
not_requested.append(service)
LOG.info(_LI('Cleanup requests completed.'))
return requested, not_requested

View File

@ -66,9 +66,10 @@ class SchedulerAPI(rpc.RPCAPI):
3.3 - Add cluster support to migrate_volume, and to
update_service_capabilities and send the timestamp from the
capabilities.
3.4 - Adds work_cleanup and do_cleanup methods.
"""
RPC_API_VERSION = '3.3'
RPC_API_VERSION = '3.4'
RPC_DEFAULT_VERSION = '3.0'
TOPIC = constants.SCHEDULER_TOPIC
BINARY = 'cinder-scheduler'
@ -199,3 +200,28 @@ class SchedulerAPI(rpc.RPCAPI):
cctxt.cast(ctxt, 'notify_service_capabilities',
service_name=service_name, host=host,
capabilities=capabilities)
def work_cleanup(self, ctxt, cleanup_request):
"""Generate individual service cleanup requests from user request."""
if not self.client.can_send_version('3.4'):
msg = _('One of cinder-scheduler services is too old to accept '
'such request. Are you running mixed Newton-Ocata'
'cinder-schedulers?')
raise exception.ServiceTooOld(msg)
cctxt = self.client.prepare(version='3.4')
# Response will have services that are receiving the cleanup request
# and services that couldn't receive it since they are down.
return cctxt.call(ctxt, 'work_cleanup',
cleanup_request=cleanup_request)
def do_cleanup(self, ctxt, cleanup_request):
"""Perform this scheduler's resource cleanup as per cleanup_request."""
if not self.client.can_send_version('3.4'):
msg = _('One of cinder-scheduler services is too old to accept '
'such request. Are you running mixed Newton-Ocata'
'cinder-schedulers?')
raise exception.ServiceTooOld(msg)
cctxt = self.client.prepare(version='3.4')
cctxt.cast(ctxt, 'do_cleanup', cleanup_request=cleanup_request)

View File

@ -0,0 +1,158 @@
# Copyright (c) 2016 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import mock
from oslo_serialization import jsonutils
import webob
from cinder.api.v3 import router as router_v3
from cinder.api.v3 import workers
from cinder import context
from cinder import objects
from cinder import test
from cinder.tests.unit.api import fakes
from cinder.tests.unit import fake_constants as fake
SERVICES = (
[objects.Service(id=1, host='host1', binary='cinder-volume',
cluster_name='mycluster'),
objects.Service(id=2, host='host2', binary='cinder-volume',
cluster_name='mycluster')],
[objects.Service(id=3, host='host3', binary='cinder-volume',
cluster_name='mycluster'),
objects.Service(id=4, host='host4', binary='cinder-volume',
cluster_name='mycluster')],
)
def app():
# no auth, just let environ['cinder.context'] pass through
api = router_v3.APIRouter()
mapper = fakes.urlmap.URLMap()
mapper['/v3'] = api
return mapper
@ddt.ddt
class WorkersTestCase(test.TestCase):
"""Tes Case for the cleanup of Workers entries."""
def setUp(self):
super(WorkersTestCase, self).setUp()
self.context = context.RequestContext(user_id=None,
project_id=fake.PROJECT_ID,
is_admin=True,
read_deleted='no',
overwrite=False)
self.controller = workers.create_resource()
def _get_resp_post(self, body, version='3.24', ctxt=None):
"""Helper to execute a POST workers API call."""
req = webob.Request.blank('/v3/%s/workers/cleanup' % fake.PROJECT_ID)
req.method = 'POST'
req.headers['Content-Type'] = 'application/json'
req.headers['OpenStack-API-Version'] = 'volume ' + version
req.environ['cinder.context'] = ctxt or self.context
req.body = jsonutils.dump_as_bytes(body)
res = req.get_response(app())
return res
@mock.patch('cinder.scheduler.rpcapi.SchedulerAPI.work_cleanup')
def test_cleanup_old_api_version(self, rpc_mock):
res = self._get_resp_post({}, '3.19')
self.assertEqual(404, res.status_code)
rpc_mock.assert_not_called()
@mock.patch('cinder.scheduler.rpcapi.SchedulerAPI.work_cleanup')
def test_cleanup_not_authorized(self, rpc_mock):
ctxt = context.RequestContext(user_id=None,
project_id=fake.PROJECT_ID,
is_admin=False,
read_deleted='no',
overwrite=False)
res = self._get_resp_post({}, ctxt=ctxt)
self.assertEqual(403, res.status_code)
rpc_mock.assert_not_called()
@ddt.data({'fake_key': 'value'}, {'binary': 'nova-scheduler'},
{'disabled': 'sure'}, {'is_up': 'nop'},
{'resource_type': 'service'}, {'resource_id': 'non UUID'})
@mock.patch('cinder.scheduler.rpcapi.SchedulerAPI.work_cleanup')
def test_cleanup_wrong_param(self, body, rpc_mock):
res = self._get_resp_post(body)
self.assertEqual(400, res.status_code)
if 'disabled' in body or 'is_up' in body:
expected = 'is not a boolean'
else:
expected = 'Invalid input'
self.assertIn(expected, res.json['badRequest']['message'])
rpc_mock.assert_not_called()
def _expected_services(self, cleaning, unavailable):
def service_view(service):
return {'id': service.id, 'host': service.host,
'binary': service.binary,
'cluster_name': service.cluster_name}
return {'cleaning': [service_view(s) for s in cleaning],
'unavailable': [service_view(s) for s in unavailable]}
@ddt.data({'service_id': 10}, {'cluster_name': 'cluster_name'},
{'host': 'hostname'}, {'binary': 'cinder-volume'},
{'binary': 'cinder-scheduler'}, {'disabled': 'true'},
{'is_up': 'no'}, {'resource_type': 'Volume'},
{'resource_id': fake.VOLUME_ID, 'host': 'hostname'})
@mock.patch('cinder.scheduler.rpcapi.SchedulerAPI.work_cleanup',
return_value=SERVICES)
def test_cleanup_params(self, body, rpc_mock):
res = self._get_resp_post(body)
self.assertEqual(202, res.status_code)
rpc_mock.assert_called_once_with(self.context, mock.ANY)
cleanup_request = rpc_mock.call_args[0][1]
for key, value in body.items():
if key in ('disabled', 'is_up'):
value = value == 'true'
self.assertEqual(value, getattr(cleanup_request, key))
self.assertEqual(self._expected_services(*SERVICES), res.json)
@mock.patch('cinder.db.worker_get_all',
return_value=[mock.Mock(service_id=1, resource_type='Volume')])
@mock.patch('cinder.scheduler.rpcapi.SchedulerAPI.work_cleanup',
return_value=SERVICES)
def test_cleanup_missing_location_ok(self, rpc_mock, worker_mock):
res = self._get_resp_post({'resource_id': fake.VOLUME_ID})
self.assertEqual(202, res.status_code)
rpc_mock.assert_called_once_with(self.context, mock.ANY)
cleanup_request = rpc_mock.call_args[0][1]
self.assertEqual(fake.VOLUME_ID, cleanup_request.resource_id)
self.assertEqual(1, cleanup_request.service_id)
self.assertEqual('Volume', cleanup_request.resource_type)
self.assertEqual(self._expected_services(*SERVICES), res.json)
@mock.patch('cinder.scheduler.rpcapi.SchedulerAPI.work_cleanup')
def test_cleanup_missing_location_fail_none(self, rpc_mock):
res = self._get_resp_post({'resource_id': fake.VOLUME_ID})
self.assertEqual(400, res.status_code)
self.assertIn('Invalid input', res.json['badRequest']['message'])
rpc_mock.assert_not_called()
@mock.patch('cinder.scheduler.rpcapi.SchedulerAPI.work_cleanup',
return_value=[1, 2])
def test_cleanup_missing_location_fail_multiple(self, rpc_mock):
res = self._get_resp_post({'resource_id': fake.VOLUME_ID})
self.assertEqual(400, res.status_code)
self.assertIn('Invalid input', res.json['badRequest']['message'])
rpc_mock.assert_not_called()

View File

@ -143,5 +143,7 @@
"clusters:get": "rule:admin_api",
"clusters:get_all": "rule:admin_api",
"clusters:update": "rule:admin_api"
"clusters:update": "rule:admin_api",
"workers:cleanup": "rule:admin_api"
}

View File

@ -22,6 +22,7 @@ import mock
from cinder import context
from cinder import exception
from cinder import objects
from cinder.scheduler import rpcapi as scheduler_rpcapi
from cinder import test
from cinder.tests.unit import fake_constants
@ -227,3 +228,36 @@ class SchedulerRpcAPITestCase(test.TestCase):
filter_properties_list=
['fake_filter_properties_list'],
version='3.0')
@ddt.data(('work_cleanup', 'myhost', None),
('work_cleanup', 'myhost', 'mycluster'),
('do_cleanup', 'myhost', None),
('do_cleanup', 'myhost', 'mycluster'))
@ddt.unpack
@mock.patch('cinder.rpc.get_client')
def test_cleanup(self, method, host, cluster, get_client):
cleanup_request = objects.CleanupRequest(self.context,
host=host,
cluster_name=cluster)
rpcapi = scheduler_rpcapi.SchedulerAPI()
getattr(rpcapi, method)(self.context, cleanup_request)
prepare = get_client.return_value.prepare
prepare.assert_called_once_with(
version='3.4')
rpc_call = 'cast' if method == 'do_cleanup' else 'call'
getattr(prepare.return_value, rpc_call).assert_called_once_with(
self.context, method, cleanup_request=cleanup_request)
@ddt.data('do_cleanup', 'work_cleanup')
def test_cleanup_too_old(self, method):
cleanup_request = objects.CleanupRequest(self.context)
rpcapi = scheduler_rpcapi.SchedulerAPI()
with mock.patch.object(rpcapi.client, 'can_send_version',
return_value=False) as can_send_mock:
self.assertRaises(exception.ServiceTooOld,
getattr(rpcapi, method),
self.context,
cleanup_request)
can_send_mock.assert_called_once_with('3.4')

View File

@ -17,6 +17,8 @@
Tests For Scheduler
"""
import collections
import mock
from oslo_config import cfg
@ -75,7 +77,8 @@ class SchedulerManagerTestCase(test.TestCase):
@mock.patch('cinder.objects.service.Service.get_minimum_rpc_version')
@mock.patch('cinder.objects.service.Service.get_minimum_obj_version')
@mock.patch('cinder.rpc.LAST_RPC_VERSIONS', {'cinder-volume': '1.3'})
@mock.patch('cinder.rpc.LAST_OBJ_VERSIONS', {'cinder-volume': '1.4'})
@mock.patch('cinder.rpc.LAST_OBJ_VERSIONS', {'cinder-volume': '1.4',
'cinder-scheduler': '1.4'})
def test_reset(self, get_min_obj, get_min_rpc):
mgr = self.manager_cls()
@ -347,6 +350,98 @@ class SchedulerManagerTestCase(test.TestCase):
vol.refresh()
self.assertEqual('error', vol.status)
@mock.patch('cinder.scheduler.rpcapi.SchedulerAPI'
'.determine_rpc_version_cap', mock.Mock(return_value='2.0'))
def test_upgrading_cloud(self):
self.assertTrue(self.manager.upgrading_cloud)
@mock.patch('cinder.scheduler.rpcapi.SchedulerAPI'
'.determine_rpc_version_cap')
def test_upgrading_cloud_not(self, cap_mock):
cap_mock.return_value = self.manager.RPC_API_VERSION
self.assertFalse(self.manager.upgrading_cloud)
def test_cleanup_destination_scheduler(self):
service = objects.Service(id=1, host='hostname',
binary='cinder-scheduler')
result = self.manager._cleanup_destination(None, service)
expected = self.manager.sch_api.do_cleanup, None, service.host
self.assertEqual(expected, result)
def test_cleanup_destination_volume(self):
service = objects.Service(id=1, host='hostname', cluster_name=None,
binary='cinder-volume')
result = self.manager._cleanup_destination(None, service)
expected = self.manager.volume_api.do_cleanup, service, service.host
self.assertEqual(expected, result)
def test_cleanup_destination_volume_cluster_cache_hit(self):
cluster = objects.Cluster(id=1, name='mycluster',
binary='cinder-volume')
service = objects.Service(id=2, host='hostname',
cluster_name=cluster.name,
binary='cinder-volume')
cluster_cache = {'cinder-volume': {'mycluster': cluster}}
result = self.manager._cleanup_destination(cluster_cache, service)
expected = self.manager.volume_api.do_cleanup, cluster, cluster.name
self.assertEqual(expected, result)
@mock.patch('cinder.objects.Cluster.get_by_id')
def test_cleanup_destination_volume_cluster_cache_miss(self, get_mock):
cluster = objects.Cluster(id=1, name='mycluster',
binary='cinder-volume')
service = objects.Service(self.context,
id=2, host='hostname',
cluster_name=cluster.name,
binary='cinder-volume')
get_mock.return_value = cluster
cluster_cache = collections.defaultdict(dict)
result = self.manager._cleanup_destination(cluster_cache, service)
expected = self.manager.volume_api.do_cleanup, cluster, cluster.name
self.assertEqual(expected, result)
@mock.patch('cinder.scheduler.manager.SchedulerManager.upgrading_cloud')
def test_work_cleanup_upgrading(self, upgrading_mock):
cleanup_request = objects.CleanupRequest(host='myhost')
upgrading_mock.return_value = True
self.assertRaises(exception.UnavailableDuringUpgrade,
self.manager.work_cleanup,
self.context,
cleanup_request)
@mock.patch('cinder.objects.Cluster.is_up', True)
@mock.patch('cinder.objects.Service.is_up', False)
@mock.patch('cinder.scheduler.rpcapi.SchedulerAPI.do_cleanup')
@mock.patch('cinder.volume.rpcapi.VolumeAPI.do_cleanup')
@mock.patch('cinder.objects.ServiceList.get_all')
def test_work_cleanup(self, get_mock, vol_clean_mock, sch_clean_mock):
args = dict(service_id=1, cluster_name='cluster_name', host='host',
binary='cinder-volume', is_up=False, disabled=True,
resource_id=fake.VOLUME_ID, resource_type='Volume')
cluster = objects.Cluster(id=1, name=args['cluster_name'],
binary='cinder-volume')
services = [objects.Service(self.context,
id=2, host='hostname',
cluster_name=cluster.name,
binary='cinder-volume',
cluster=cluster),
objects.Service(self.context,
id=3, host='hostname',
cluster_name=None,
binary='cinder-scheduler'),
objects.Service(self.context,
id=4, host='hostname',
cluster_name=None,
binary='cinder-volume')]
get_mock.return_value = services
cleanup_request = objects.CleanupRequest(self.context, **args)
res = self.manager.work_cleanup(self.context, cleanup_request)
self.assertEqual((services[:2], services[2:]), res)
self.assertEqual(1, vol_clean_mock.call_count)
self.assertEqual(1, sch_clean_mock.call_count)
class SchedulerTestCase(test.TestCase):
"""Test case for base scheduler driver class."""

View File

@ -25,6 +25,7 @@ from oslo_serialization import jsonutils
from cinder.common import constants
from cinder import context
from cinder import db
from cinder import exception
from cinder import objects
from cinder.objects import base as ovo_base
from cinder.objects import fields
@ -731,3 +732,28 @@ class VolumeRpcAPITestCase(test.TestCase):
self._test_group_api('delete_group_snapshot', rpc_method='cast',
group_snapshot=self.fake_group_snapshot,
version='3.0')
@ddt.data(('myhost', None), ('myhost', 'mycluster'))
@ddt.unpack
@mock.patch('cinder.volume.rpcapi.VolumeAPI._get_cctxt')
def test_do_cleanup(self, host, cluster, get_cctxt_mock):
cleanup_request = objects.CleanupRequest(self.context,
host=host,
cluster_name=cluster)
rpcapi = volume_rpcapi.VolumeAPI()
rpcapi.do_cleanup(self.context, cleanup_request)
get_cctxt_mock.assert_called_once_with(
cleanup_request.service_topic_queue, '3.7')
get_cctxt_mock.return_value.cast.assert_called_once_with(
self.context, 'do_cleanup', cleanup_request=cleanup_request)
def test_do_cleanup_too_old(self):
cleanup_request = objects.CleanupRequest(self.context)
rpcapi = volume_rpcapi.VolumeAPI()
with mock.patch.object(rpcapi.client, 'can_send_version',
return_value=False) as can_send_mock:
self.assertRaises(exception.ServiceTooOld,
rpcapi.do_cleanup,
self.context,
cleanup_request)
can_send_mock.assert_called_once_with('3.7')

View File

@ -14,6 +14,8 @@
from cinder.common import constants
from cinder import exception
from cinder.i18n import _
from cinder import objects
from cinder import quota
from cinder import rpc
@ -118,9 +120,11 @@ class VolumeAPI(rpc.RPCAPI):
3.5 - Adds support for cluster in retype and migrate_volume
3.6 - Switch to use oslo.messaging topics to indicate backends instead
of @backend suffixes in server names.
3.7 - Adds do_cleanup method to do volume cleanups from other nodes
that we were doing in init_host.
"""
RPC_API_VERSION = '3.6'
RPC_API_VERSION = '3.7'
RPC_DEFAULT_VERSION = '3.0'
TOPIC = constants.VOLUME_TOPIC
BINARY = 'cinder-volume'
@ -390,3 +394,17 @@ class VolumeAPI(rpc.RPCAPI):
cctxt = self._get_cctxt(group_snapshot.service_topic_queue)
cctxt.cast(ctxt, 'delete_group_snapshot',
group_snapshot=group_snapshot)
def do_cleanup(self, ctxt, cleanup_request):
"""Perform this service/cluster resource cleanup as requested."""
if not self.client.can_send_version('3.7'):
msg = _('One of cinder-volume services is too old to accept such '
'a request. Are you running mixed Newton-Ocata services?')
raise exception.ServiceTooOld(msg)
destination = cleanup_request.service_topic_queue
cctxt = self._get_cctxt(destination, '3.7')
# NOTE(geguileo): This call goes to do_cleanup code in
# cinder.manager.CleanableManager unless in the future we overwrite it
# in cinder.volume.manager
cctxt.cast(ctxt, 'do_cleanup', cleanup_request=cleanup_request)

View File

@ -137,5 +137,7 @@
"clusters:get": "rule:admin_api",
"clusters:get_all": "rule:admin_api",
"clusters:update": "rule:admin_api"
"clusters:update": "rule:admin_api",
"workers:cleanup": "rule:admin_api"
}