diff --git a/api-ref/source/v3/parameters.yaml b/api-ref/source/v3/parameters.yaml index a15041a719c..74e1ac41f66 100644 --- a/api-ref/source/v3/parameters.yaml +++ b/api-ref/source/v3/parameters.yaml @@ -349,6 +349,13 @@ cgsnapshot_id: in: body required: false type: string +cluster_mutex: + description: | + The OpenStack Block Storage cluster where the resource resides. Optional + only if host field is provided. + in: body + required: false + type: string connector: description: | The ``connector`` object. @@ -638,6 +645,13 @@ host: in: body required: true type: string +host_mutex: + description: | + The OpenStack Block Storage host where the existing resource resides. + Optional only if cluster field is provided. + in: body + required: false + type: string host_name: description: | The name of the attaching host. diff --git a/api-ref/source/v3/samples/volume-manage-request-cluster.json b/api-ref/source/v3/samples/volume-manage-request-cluster.json new file mode 100644 index 00000000000..5467c98d7bd --- /dev/null +++ b/api-ref/source/v3/samples/volume-manage-request-cluster.json @@ -0,0 +1,19 @@ +{ + "volume": { + "host": null, + "cluster": "cluster@backend", + "ref": { + "source-name": "existingLV", + "source-id": "1234" + }, + "name": "New Volume", + "availability_zone": "az2", + "description": "Volume imported from existingLV", + "volume_type": null, + "bootable": true, + "metadata": { + "key1": "value1", + "key2": "value2" + } + } +} diff --git a/api-ref/source/v3/volume-manage.inc b/api-ref/source/v3/volume-manage.inc index 9bd5cb7c622..d9d33b2bb86 100644 --- a/api-ref/source/v3/volume-manage.inc +++ b/api-ref/source/v3/volume-manage.inc @@ -24,6 +24,10 @@ or source-name element, if possible. The API chooses the size of the volume by rounding up the size of the existing storage volume to the next gibibyte (GiB). +Prior to microversion 3.16 host field was required, with the possibility of +defining the cluster it is no longer required, but we must have either a host +or a cluster field but we cannot have them both with values. + Error response codes:202, @@ -38,7 +42,8 @@ Request - volume_type: volume_type - name: name - volume: volume - - host: host + - host: host_mutex + - cluster: cluster_mutex - ref: ref - metadata: metadata - tenant_id: tenant_id @@ -48,3 +53,6 @@ Request Example .. literalinclude:: ./samples/volume-manage-request.json :language: javascript + +.. literalinclude:: ./samples/volume-manage-request-cluster.json + :language: javascript diff --git a/cinder/api/common.py b/cinder/api/common.py index c39770cf996..09e2e8ed2f3 100644 --- a/cinder/api/common.py +++ b/cinder/api/common.py @@ -370,3 +370,17 @@ class ViewBuilder(object): url_parts[2] = prefix_parts[2] + url_parts[2] return urllib.parse.urlunsplit(url_parts).rstrip('/') + + +def get_cluster_host(req, params, cluster_version): + if req.api_version_request.matches(cluster_version): + cluster_name = params.get('cluster') + msg = _('One and only one of cluster and host must be set.') + else: + cluster_name = None + msg = _('Host field is missing.') + + host = params.get('host') + if bool(cluster_name) == bool(host): + raise exception.InvalidInput(reason=msg) + return cluster_name, host diff --git a/cinder/api/contrib/admin_actions.py b/cinder/api/contrib/admin_actions.py index b673e4881e8..4893e0bd7c5 100644 --- a/cinder/api/contrib/admin_actions.py +++ b/cinder/api/contrib/admin_actions.py @@ -17,6 +17,7 @@ import oslo_messaging as messaging import webob from webob import exc +from cinder.api import common from cinder.api import extensions from cinder.api.openstack import wsgi from cinder import backup @@ -241,14 +242,12 @@ class VolumeAdminController(AdminController): # Not found exception will be handled at the wsgi level volume = self._get(context, id) params = body['os-migrate_volume'] - try: - host = params['host'] - except KeyError: - raise exc.HTTPBadRequest(explanation=_("Must specify 'host'.")) + + cluster_name, host = common.get_cluster_host(req, params, '3.16') force_host_copy = utils.get_bool_param('force_host_copy', params) lock_volume = utils.get_bool_param('lock_volume', params) - self.volume_api.migrate_volume(context, volume, host, force_host_copy, - lock_volume) + self.volume_api.migrate_volume(context, volume, host, cluster_name, + force_host_copy, lock_volume) return webob.Response(status_int=202) @wsgi.action('os-migrate_volume_completion') diff --git a/cinder/api/contrib/volume_manage.py b/cinder/api/contrib/volume_manage.py index ba7cc1cdf31..a6a735b9ec0 100644 --- a/cinder/api/contrib/volume_manage.py +++ b/cinder/api/contrib/volume_manage.py @@ -13,8 +13,8 @@ # under the License. from oslo_log import log as logging -from webob import exc +from cinder.api import common from cinder.api.contrib import resource_common_manage from cinder.api import extensions from cinder.api.openstack import wsgi @@ -64,6 +64,7 @@ class VolumeManageController(wsgi.Controller): 'volume': { 'host': , + 'cluster': , 'ref': , } } @@ -106,13 +107,10 @@ class VolumeManageController(wsgi.Controller): # Check that the required keys are present, return an error if they # are not. - required_keys = set(['ref', 'host']) - missing_keys = list(required_keys - set(volume.keys())) + if 'ref' not in volume: + raise exception.MissingRequired(element='ref') - if missing_keys: - msg = _("The following elements are required: %s") % \ - ', '.join(missing_keys) - raise exc.HTTPBadRequest(explanation=msg) + cluster_name, host = common.get_cluster_host(req, volume, '3.16') LOG.debug('Manage volume request body: %s', body) @@ -139,7 +137,8 @@ class VolumeManageController(wsgi.Controller): try: new_volume = self.volume_api.manage_existing(context, - volume['host'], + host, + cluster_name, volume['ref'], **kwargs) except exception.ServiceNotFound: diff --git a/cinder/api/openstack/api_version_request.py b/cinder/api/openstack/api_version_request.py index 70ceb4ba10f..6f118231882 100644 --- a/cinder/api/openstack/api_version_request.py +++ b/cinder/api/openstack/api_version_request.py @@ -64,6 +64,7 @@ REST_API_VERSION_HISTORY = """ * 3.14 - Add group snapshot and create group from src APIs. * 3.15 - Inject the response's `Etag` header to avoid the lost update problem with volume metadata. + * 3.16 - Migrate volume now supports cluster """ # The minimum and maximum versions of the API supported @@ -71,7 +72,7 @@ REST_API_VERSION_HISTORY = """ # minimum version of the API supported. # Explicitly using /v1 or /v2 enpoints will still work _MIN_API_VERSION = "3.0" -_MAX_API_VERSION = "3.15" +_MAX_API_VERSION = "3.16" _LEGACY_API_VERSION1 = "1.0" _LEGACY_API_VERSION2 = "2.0" diff --git a/cinder/api/openstack/rest_api_version_history.rst b/cinder/api/openstack/rest_api_version_history.rst index 1f5062885a9..ab0309901f4 100644 --- a/cinder/api/openstack/rest_api_version_history.rst +++ b/cinder/api/openstack/rest_api_version_history.rst @@ -191,3 +191,12 @@ user documentation. ------------------------ Added injecting the response's `Etag` header to avoid the lost update problem with volume metadata. + +3.16 +---- + os-migrate_volume now accepts ``cluster`` parameter when we want to migrate a + volume to a cluster. If we pass the ``host`` parameter for a volume that is + in a cluster, the request will be sent to the cluster as if we had requested + that specific cluster. Only ``host`` or ``cluster`` can be provided. + + Creating a managed volume also supports the cluster parameter. diff --git a/cinder/db/sqlalchemy/api.py b/cinder/db/sqlalchemy/api.py index 60a1f778d97..0abaa25c080 100644 --- a/cinder/db/sqlalchemy/api.py +++ b/cinder/db/sqlalchemy/api.py @@ -424,7 +424,7 @@ def _filter_host(field, value, match_level=None): def _service_query(context, session=None, read_deleted='no', host=None, cluster_name=None, is_up=None, backend_match_level=None, - **filters): + disabled=None, **filters): filters = _clean_filters(filters) if filters and not is_valid_model_filters(models.Service, filters): return None @@ -442,6 +442,22 @@ def _service_query(context, session=None, read_deleted='no', host=None, query = query.filter(_filter_host(models.Service.cluster_name, cluster_name, backend_match_level)) + # Now that we have clusters, a service is disabled if the service doesn't + # belong to a cluster or if it belongs to a cluster and the cluster itself + # is disabled. + if disabled is not None: + disabled_filter = or_( + and_(models.Service.cluster_name.is_(None), + models.Service.disabled), + and_(models.Service.cluster_name.isnot(None), + sql.exists().where(and_( + models.Cluster.name == models.Service.cluster_name, + models.Cluster.binary == models.Service.binary, + ~models.Cluster.deleted, + models.Cluster.disabled)))) + if not disabled: + disabled_filter = ~disabled_filter + query = query.filter(disabled_filter) if filters: query = query.filter_by(**filters) @@ -5074,16 +5090,14 @@ def consistencygroup_create(context, values, cg_snap_id=None, cg_id=None): if conditions: # We don't want duplicated field values - values.pop('volume_type_id', None) - values.pop('availability_zone', None) - values.pop('host', None) + names = ['volume_type_id', 'availability_zone', 'host', + 'cluster_name'] + for name in names: + values.pop(name, None) - sel = session.query(cg_model.volume_type_id, - cg_model.availability_zone, - cg_model.host, - *(bindparam(k, v) for k, v in values.items()) - ).filter(*conditions) - names = ['volume_type_id', 'availability_zone', 'host'] + fields = [getattr(cg_model, name) for name in names] + fields.extend(bindparam(k, v) for k, v in values.items()) + sel = session.query(*fields).filter(*conditions) names.extend(values.keys()) insert_stmt = cg_model.__table__.insert().from_select(names, sel) result = session.execute(insert_stmt) diff --git a/cinder/manager.py b/cinder/manager.py index 0cd1e2a701b..7beb9f873eb 100644 --- a/cinder/manager.py +++ b/cinder/manager.py @@ -177,7 +177,8 @@ class SchedulerDependentManager(Manager): context, self.service_name, self.host, - self.last_capabilities) + self.last_capabilities, + self.cluster) try: self.scheduler_rpcapi.notify_service_capabilities( context, diff --git a/cinder/objects/base.py b/cinder/objects/base.py index e8784a7bc1a..60e223c9d4b 100644 --- a/cinder/objects/base.py +++ b/cinder/objects/base.py @@ -457,6 +457,10 @@ class ClusteredObject(object): def service_topic_queue(self): return self.cluster_name or self.host + @property + def is_clustered(self): + return bool(self.cluster_name) + class CinderObjectSerializer(base.VersionedObjectSerializer): OBJ_BASE_CLASS = CinderObject diff --git a/cinder/scheduler/driver.py b/cinder/scheduler/driver.py index 2fc3a8da587..57d759e7185 100644 --- a/cinder/scheduler/driver.py +++ b/cinder/scheduler/driver.py @@ -41,13 +41,14 @@ CONF = cfg.CONF CONF.register_opts(scheduler_driver_opts) -def volume_update_db(context, volume_id, host): - """Set the host and set the scheduled_at field of a volume. +def volume_update_db(context, volume_id, host, cluster_name): + """Set the host, cluster_name, and set the scheduled_at field of a volume. :returns: A Volume with the updated fields set properly. """ volume = objects.Volume.get_by_id(context, volume_id) volume.host = host + volume.cluster_name = cluster_name volume.scheduled_at = timeutils.utcnow() volume.save() @@ -56,22 +57,24 @@ def volume_update_db(context, volume_id, host): return volume -def group_update_db(context, group, host): +def group_update_db(context, group, host, cluster_name): """Set the host and the scheduled_at field of a consistencygroup. :returns: A Consistencygroup with the updated fields set properly. """ - group.update({'host': host, 'updated_at': timeutils.utcnow()}) + group.update({'host': host, 'updated_at': timeutils.utcnow(), + 'cluster_name': cluster_name}) group.save() return group -def generic_group_update_db(context, group, host): +def generic_group_update_db(context, group, host, cluster_name): """Set the host and the scheduled_at field of a group. :returns: A Group with the updated fields set properly. """ - group.update({'host': host, 'updated_at': timeutils.utcnow()}) + group.update({'host': host, 'updated_at': timeutils.utcnow(), + 'cluster_name': cluster_name}) group.save() return group @@ -97,11 +100,14 @@ class Scheduler(object): return self.host_manager.has_all_capabilities() - def update_service_capabilities(self, service_name, host, capabilities): + def update_service_capabilities(self, service_name, host, capabilities, + cluster_name, timestamp): """Process a capability update from a service node.""" self.host_manager.update_service_capabilities(service_name, host, - capabilities) + capabilities, + cluster_name, + timestamp) def notify_service_capabilities(self, service_name, host, capabilities): """Notify capability update from a service node.""" diff --git a/cinder/scheduler/filter_scheduler.py b/cinder/scheduler/filter_scheduler.py index 7c7e8af1830..5ead5919a8f 100644 --- a/cinder/scheduler/filter_scheduler.py +++ b/cinder/scheduler/filter_scheduler.py @@ -74,12 +74,11 @@ class FilterScheduler(driver.Scheduler): if not weighed_host: raise exception.NoValidHost(reason=_("No weighed hosts available")) - host = weighed_host.obj.host + backend = weighed_host.obj + updated_group = driver.group_update_db(context, group, backend.host, + backend.cluster_name) - updated_group = driver.group_update_db(context, group, host) - - self.volume_rpcapi.create_consistencygroup(context, - updated_group, host) + self.volume_rpcapi.create_consistencygroup(context, updated_group) def schedule_create_group(self, context, group, group_spec, @@ -96,12 +95,13 @@ class FilterScheduler(driver.Scheduler): if not weighed_host: raise exception.NoValidHost(reason=_("No weighed hosts available")) - host = weighed_host.obj.host + backend = weighed_host.obj - updated_group = driver.generic_group_update_db(context, group, host) + updated_group = driver.generic_group_update_db(context, group, + backend.host, + backend.cluster_name) - self.volume_rpcapi.create_group(context, - updated_group, host) + self.volume_rpcapi.create_group(context, updated_group) def schedule_create_volume(self, context, request_spec, filter_properties): weighed_host = self._schedule(context, request_spec, @@ -110,18 +110,20 @@ class FilterScheduler(driver.Scheduler): if not weighed_host: raise exception.NoValidHost(reason=_("No weighed hosts available")) - host = weighed_host.obj.host + backend = weighed_host.obj volume_id = request_spec['volume_id'] - updated_volume = driver.volume_update_db(context, volume_id, host) + updated_volume = driver.volume_update_db(context, volume_id, + backend.host, + backend.cluster_name) self._post_select_populate_filter_properties(filter_properties, - weighed_host.obj) + backend) # context is not serializable filter_properties.pop('context', None) - self.volume_rpcapi.create_volume(context, updated_volume, host, - request_spec, filter_properties, + self.volume_rpcapi.create_volume(context, updated_volume, request_spec, + filter_properties, allow_reschedule=True) def host_passes_filters(self, context, host, request_spec, @@ -131,7 +133,7 @@ class FilterScheduler(driver.Scheduler): filter_properties) for weighed_host in weighed_hosts: host_state = weighed_host.obj - if host_state.host == host: + if host_state.backend_id == host: return host_state volume_id = request_spec.get('volume_id', '??volume_id missing??') @@ -144,26 +146,27 @@ class FilterScheduler(driver.Scheduler): migration_policy='never'): """Find a host that can accept the volume with its new type.""" filter_properties = filter_properties or {} - current_host = request_spec['volume_properties']['host'] + backend = (request_spec['volume_properties'].get('cluster_name') + or request_spec['volume_properties']['host']) - # The volume already exists on this host, and so we shouldn't check if - # it can accept the volume again in the CapacityFilter. - filter_properties['vol_exists_on'] = current_host + # The volume already exists on this backend, and so we shouldn't check + # if it can accept the volume again in the CapacityFilter. + filter_properties['vol_exists_on'] = backend - weighed_hosts = self._get_weighted_candidates(context, request_spec, - filter_properties) - if not weighed_hosts: + weighed_backends = self._get_weighted_candidates(context, request_spec, + filter_properties) + if not weighed_backends: raise exception.NoValidHost(reason=_('No valid hosts for volume ' '%(id)s with type %(type)s') % {'id': request_spec['volume_id'], 'type': request_spec['volume_type']}) - for weighed_host in weighed_hosts: - host_state = weighed_host.obj - if host_state.host == current_host: - return host_state + for weighed_backend in weighed_backends: + backend_state = weighed_backend.obj + if backend_state.backend_id == backend: + return backend_state - if utils.extract_host(current_host, 'pool') is None: + if utils.extract_host(backend, 'pool') is None: # legacy volumes created before pool is introduced has no pool # info in host. But host_state.host always include pool level # info. In this case if above exact match didn't work out, we @@ -172,11 +175,12 @@ class FilterScheduler(driver.Scheduler): # cause migration between pools on same host, which we consider # it is different from migration between hosts thus allow that # to happen even migration policy is 'never'. - for weighed_host in weighed_hosts: - host_state = weighed_host.obj - backend = utils.extract_host(host_state.host, 'backend') - if backend == current_host: - return host_state + for weighed_backend in weighed_backends: + backend_state = weighed_backend.obj + new_backend = utils.extract_host(backend_state.backend_id, + 'backend') + if new_backend == backend: + return backend_state if migration_policy == 'never': raise exception.NoValidHost(reason=_('Current host not valid for ' @@ -186,7 +190,7 @@ class FilterScheduler(driver.Scheduler): {'id': request_spec['volume_id'], 'type': request_spec['volume_type']}) - top_host = self._choose_top_host(weighed_hosts, request_spec) + top_host = self._choose_top_host(weighed_backends, request_spec) return top_host.obj def get_pools(self, context, filters): @@ -201,7 +205,7 @@ class FilterScheduler(driver.Scheduler): been selected by the scheduling process. """ # Add a retry entry for the selected volume backend: - self._add_retry_host(filter_properties, host_state.host) + self._add_retry_host(filter_properties, host_state.backend_id) def _add_retry_host(self, filter_properties, host): """Add a retry entry for the selected volume backend. @@ -418,8 +422,8 @@ class FilterScheduler(driver.Scheduler): for host2 in temp_weighed_hosts: # Should schedule creation of CG on backend level, # not pool level. - if (utils.extract_host(host1.obj.host) == - utils.extract_host(host2.obj.host)): + if (utils.extract_host(host1.obj.backend_id) == + utils.extract_host(host2.obj.backend_id)): new_weighed_hosts.append(host1) weighed_hosts = new_weighed_hosts if not weighed_hosts: @@ -530,8 +534,8 @@ class FilterScheduler(driver.Scheduler): for host2 in host_list2: # Should schedule creation of group on backend level, # not pool level. - if (utils.extract_host(host1.obj.host) == - utils.extract_host(host2.obj.host)): + if (utils.extract_host(host1.obj.backend_id) == + utils.extract_host(host2.obj.backend_id)): new_hosts.append(host1) if not new_hosts: return [] @@ -615,7 +619,7 @@ class FilterScheduler(driver.Scheduler): # Get host name including host@backend#pool info from # weighed_hosts. for host in weighed_hosts[::-1]: - backend = utils.extract_host(host.obj.host) + backend = utils.extract_host(host.obj.backend_id) if backend != group_backend: weighed_hosts.remove(host) if not weighed_hosts: @@ -651,7 +655,7 @@ class FilterScheduler(driver.Scheduler): def _choose_top_host(self, weighed_hosts, request_spec): top_host = weighed_hosts[0] host_state = top_host.obj - LOG.debug("Choosing %s", host_state.host) + LOG.debug("Choosing %s", host_state.backend_id) volume_properties = request_spec['volume_properties'] host_state.consume_from_volume(volume_properties) return top_host @@ -659,11 +663,11 @@ class FilterScheduler(driver.Scheduler): def _choose_top_host_group(self, weighed_hosts, request_spec_list): top_host = weighed_hosts[0] host_state = top_host.obj - LOG.debug("Choosing %s", host_state.host) + LOG.debug("Choosing %s", host_state.backend_id) return top_host def _choose_top_host_generic_group(self, weighed_hosts): top_host = weighed_hosts[0] host_state = top_host.obj - LOG.debug("Choosing %s", host_state.host) + LOG.debug("Choosing %s", host_state.backend_id) return top_host diff --git a/cinder/scheduler/filters/affinity_filter.py b/cinder/scheduler/filters/affinity_filter.py index 7d2910a2df7..560167b88fd 100644 --- a/cinder/scheduler/filters/affinity_filter.py +++ b/cinder/scheduler/filters/affinity_filter.py @@ -24,6 +24,14 @@ class AffinityFilter(filters.BaseHostFilter): def __init__(self): self.volume_api = volume.API() + def _get_volumes(self, context, affinity_uuids, backend_state): + filters = {'id': affinity_uuids, 'deleted': False} + if backend_state.cluster_name: + filters['cluster_name'] = backend_state.cluster_name + else: + filters['host'] = backend_state.host + return self.volume_api.get_all(context, filters=filters) + class DifferentBackendFilter(AffinityFilter): """Schedule volume on a different back-end from a set of volumes.""" @@ -53,11 +61,8 @@ class DifferentBackendFilter(AffinityFilter): return False if affinity_uuids: - return not self.volume_api.get_all( - context, filters={'host': host_state.host, - 'id': affinity_uuids, - 'deleted': False}) - + return not self._get_volumes(context, affinity_uuids, + host_state) # With no different_host key return True @@ -90,10 +95,7 @@ class SameBackendFilter(AffinityFilter): return False if affinity_uuids: - return self.volume_api.get_all( - context, filters={'host': host_state.host, - 'id': affinity_uuids, - 'deleted': False}) + return self._get_volumes(context, affinity_uuids, host_state) # With no same_host key return True diff --git a/cinder/scheduler/filters/capacity_filter.py b/cinder/scheduler/filters/capacity_filter.py index 52e0211ef49..1959d36fbb5 100644 --- a/cinder/scheduler/filters/capacity_filter.py +++ b/cinder/scheduler/filters/capacity_filter.py @@ -36,25 +36,29 @@ class CapacityFilter(filters.BaseHostFilter): # If the volume already exists on this host, don't fail it for # insufficient capacity (e.g., if we are retyping) - if host_state.host == filter_properties.get('vol_exists_on'): + if host_state.backend_id == filter_properties.get('vol_exists_on'): return True spec = filter_properties.get('request_spec') if spec: volid = spec.get('volume_id') + grouping = 'cluster' if host_state.cluster_name else 'host' if filter_properties.get('new_size'): # If new_size is passed, we are allocating space to extend a volume requested_size = (int(filter_properties.get('new_size')) - int(filter_properties.get('size'))) - LOG.debug('Checking if host %(host)s can extend the volume %(id)s' - 'in %(size)s GB', {'host': host_state.host, 'id': volid, - 'size': requested_size}) + LOG.debug('Checking if %(grouping)s %(grouping_name)s can extend ' + 'the volume %(id)s in %(size)s GB', + {'grouping': grouping, + 'grouping_name': host_state.backend_id, 'id': volid, + 'size': requested_size}) else: requested_size = filter_properties.get('size') - LOG.debug('Checking if host %(host)s can create a %(size)s GB ' - 'volume (%(id)s)', - {'host': host_state.host, 'id': volid, + LOG.debug('Checking if %(grouping)s %(grouping_name)s can create ' + 'a %(size)s GB volume (%(id)s)', + {'grouping': grouping, + 'grouping_name': host_state.backend_id, 'id': volid, 'size': requested_size}) if host_state.free_capacity_gb is None: @@ -85,18 +89,16 @@ class CapacityFilter(filters.BaseHostFilter): total = float(total_space) if total <= 0: LOG.warning(_LW("Insufficient free space for volume creation. " - "Total capacity is %(total).2f on host %(host)s."), + "Total capacity is %(total).2f on %(grouping)s " + "%(grouping_name)s."), {"total": total, - "host": host_state.host}) + "grouping": grouping, + "grouping_name": host_state.backend_id}) return False # Calculate how much free space is left after taking into account # the reserved space. free = free_space - math.floor(total * reserved) - msg_args = {"host": host_state.host, - "requested": requested_size, - "available": free} - # NOTE(xyang): If 'provisioning:type' is 'thick' in extra_specs, # we will not use max_over_subscription_ratio and # provisioned_capacity_gb to determine whether a volume can be @@ -117,15 +119,18 @@ class CapacityFilter(filters.BaseHostFilter): provisioned_ratio = ((host_state.provisioned_capacity_gb + requested_size) / total) if provisioned_ratio > host_state.max_over_subscription_ratio: + msg_args = { + "provisioned_ratio": provisioned_ratio, + "oversub_ratio": host_state.max_over_subscription_ratio, + "grouping": grouping, + "grouping_name": host_state.backend_id, + } LOG.warning(_LW( "Insufficient free space for thin provisioning. " "The ratio of provisioned capacity over total capacity " "%(provisioned_ratio).2f has exceeded the maximum over " - "subscription ratio %(oversub_ratio).2f on host " - "%(host)s."), - {"provisioned_ratio": provisioned_ratio, - "oversub_ratio": host_state.max_over_subscription_ratio, - "host": host_state.host}) + "subscription ratio %(oversub_ratio).2f on %(grouping)s " + "%(grouping_name)s."), msg_args) return False else: # Thin provisioning is enabled and projected over-subscription @@ -138,23 +143,30 @@ class CapacityFilter(filters.BaseHostFilter): free * host_state.max_over_subscription_ratio) return adjusted_free_virtual >= requested_size elif thin and host_state.thin_provisioning_support: - LOG.warning(_LW("Filtering out host %(host)s with an invalid " - "maximum over subscription ratio of " - "%(oversub_ratio).2f. The ratio should be a " + LOG.warning(_LW("Filtering out %(grouping)s %(grouping_name)s " + "with an invalid maximum over subscription ratio " + "of %(oversub_ratio).2f. The ratio should be a " "minimum of 1.0."), {"oversub_ratio": host_state.max_over_subscription_ratio, - "host": host_state.host}) + "grouping": grouping, + "grouping_name": host_state.backend_id}) return False + msg_args = {"grouping_name": host_state.backend_id, + "grouping": grouping, + "requested": requested_size, + "available": free} + if free < requested_size: LOG.warning(_LW("Insufficient free space for volume creation " - "on host %(host)s (requested / avail): " - "%(requested)s/%(available)s"), msg_args) + "on %(grouping)s %(grouping_name)s (requested / " + "avail): %(requested)s/%(available)s"), + msg_args) return False LOG.debug("Space information for volume creation " - "on host %(host)s (requested / avail): " + "on %(grouping)s %(grouping_name)s (requested / avail): " "%(requested)s/%(available)s", msg_args) return True diff --git a/cinder/scheduler/filters/driver_filter.py b/cinder/scheduler/filters/driver_filter.py index b57532413e5..329b3c7ed75 100644 --- a/cinder/scheduler/filters/driver_filter.py +++ b/cinder/scheduler/filters/driver_filter.py @@ -35,10 +35,11 @@ class DriverFilter(filters.BaseHostFilter): """Determines whether a host has a passing filter_function or not.""" stats = self._generate_stats(host_state, filter_properties) - LOG.debug("Checking host '%s'", stats['host_stats']['host']) + LOG.debug("Checking backend '%s'", stats['host_stats']['backend_id']) result = self._check_filter_function(stats) LOG.debug("Result: %s", result) - LOG.debug("Done checking host '%s'", stats['host_stats']['host']) + LOG.debug("Done checking backend '%s'", + stats['host_stats']['backend_id']) return result @@ -89,6 +90,8 @@ class DriverFilter(filters.BaseHostFilter): host_stats = { 'host': host_state.host, + 'cluster_name': host_state.cluster_name, + 'backend_id': host_state.backend_id, 'volume_backend_name': host_state.volume_backend_name, 'vendor_name': host_state.vendor_name, 'driver_version': host_state.driver_version, diff --git a/cinder/scheduler/filters/ignore_attempted_hosts_filter.py b/cinder/scheduler/filters/ignore_attempted_hosts_filter.py index beffe16a4f8..f37241652f7 100644 --- a/cinder/scheduler/filters/ignore_attempted_hosts_filter.py +++ b/cinder/scheduler/filters/ignore_attempted_hosts_filter.py @@ -45,7 +45,7 @@ class IgnoreAttemptedHostsFilter(filters.BaseHostFilter): return True hosts = attempted.get('hosts', []) - host = host_state.host + host = host_state.backend_id passes = host not in hosts pass_msg = "passes" if passes else "fails" diff --git a/cinder/scheduler/filters/instance_locality_filter.py b/cinder/scheduler/filters/instance_locality_filter.py index 7ab7c513365..e2d562f34e9 100644 --- a/cinder/scheduler/filters/instance_locality_filter.py +++ b/cinder/scheduler/filters/instance_locality_filter.py @@ -69,9 +69,9 @@ class InstanceLocalityFilter(filters.BaseHostFilter): return self._nova_ext_srv_attr - def host_passes(self, host_state, filter_properties): + def host_passes(self, backend_state, filter_properties): context = filter_properties['context'] - host = volume_utils.extract_host(host_state.host, 'host') + host = volume_utils.extract_host(backend_state.backend_id, 'host') scheduler_hints = filter_properties.get('scheduler_hints') or {} instance_uuid = scheduler_hints.get(HINT_KEYWORD, None) diff --git a/cinder/scheduler/host_manager.py b/cinder/scheduler/host_manager.py index 6c3203af97d..345c8c836f1 100644 --- a/cinder/scheduler/host_manager.py +++ b/cinder/scheduler/host_manager.py @@ -86,10 +86,11 @@ class ReadOnlyDict(collections.Mapping): class HostState(object): """Mutable and immutable information tracked for a volume backend.""" - def __init__(self, host, capabilities=None, service=None): + def __init__(self, host, cluster_name, capabilities=None, service=None): self.capabilities = None self.service = None self.host = host + self.cluster_name = cluster_name self.update_capabilities(capabilities, service) self.volume_backend_name = None @@ -122,6 +123,10 @@ class HostState(object): self.updated = None + @property + def backend_id(self): + return self.cluster_name or self.host + def update_capabilities(self, capabilities=None, service=None): # Read-only capability dicts @@ -210,7 +215,8 @@ class HostState(object): cur_pool = self.pools.get(pool_name, None) if not cur_pool: # Add new pool - cur_pool = PoolState(self.host, pool_cap, pool_name) + cur_pool = PoolState(self.host, self.cluster_name, + pool_cap, pool_name) self.pools[pool_name] = cur_pool cur_pool.update_from_volume_capability(pool_cap, service) @@ -227,7 +233,8 @@ class HostState(object): if len(self.pools) == 0: # No pool was there - single_pool = PoolState(self.host, capability, pool_name) + single_pool = PoolState(self.host, self.cluster_name, + capability, pool_name) self._append_backend_info(capability) self.pools[pool_name] = single_pool else: @@ -235,7 +242,8 @@ class HostState(object): try: single_pool = self.pools[pool_name] except KeyError: - single_pool = PoolState(self.host, capability, pool_name) + single_pool = PoolState(self.host, self.cluster_name, + capability, pool_name) self._append_backend_info(capability) self.pools[pool_name] = single_pool @@ -293,14 +301,18 @@ class HostState(object): # FIXME(zhiteng) backend level free_capacity_gb isn't as # meaningful as it used to be before pool is introduced, we'd # come up with better representation of HostState. - return ("host '%s': free_capacity_gb: %s, pools: %s" % - (self.host, self.free_capacity_gb, self.pools)) + grouping = 'cluster' if self.cluster_name else 'host' + grouping_name = self.backend_id + + return ("%s '%s': free_capacity_gb: %s, pools: %s" % + (grouping, grouping_name, self.free_capacity_gb, self.pools)) class PoolState(HostState): - def __init__(self, host, capabilities, pool_name): + def __init__(self, host, cluster_name, capabilities, pool_name): new_host = vol_utils.append_host(host, pool_name) - super(PoolState, self).__init__(new_host, capabilities) + new_cluster = vol_utils.append_host(cluster_name, pool_name) + super(PoolState, self).__init__(new_host, new_cluster, capabilities) self.pool_name = pool_name # No pools in pool self.pools = None @@ -443,7 +455,8 @@ class HostManager(object): hosts, weight_properties) - def update_service_capabilities(self, service_name, host, capabilities): + def update_service_capabilities(self, service_name, host, capabilities, + cluster_name, timestamp): """Update the per-service capabilities based on this notification.""" if service_name != 'volume': LOG.debug('Ignoring %(service_name)s service update ' @@ -451,9 +464,12 @@ class HostManager(object): {'service_name': service_name, 'host': host}) return + # TODO(geguileo): In P - Remove the next line since we receive the + # timestamp + timestamp = timestamp or timeutils.utcnow() # Copy the capabilities, so we don't modify the original dict capab_copy = dict(capabilities) - capab_copy["timestamp"] = timeutils.utcnow() # Reported time + capab_copy["timestamp"] = timestamp # Set the default capabilities in case None is set. capab_old = self.service_states.get(host, {"timestamp": 0}) @@ -474,15 +490,19 @@ class HostManager(object): self.service_states[host] = capab_copy - LOG.debug("Received %(service_name)s service update from " - "%(host)s: %(cap)s", + cluster_msg = (('Cluster: %s - Host: ' % cluster_name) if cluster_name + else '') + LOG.debug("Received %(service_name)s service update from %(cluster)s" + "%(host)s: %(cap)s%(cluster)s", {'service_name': service_name, 'host': host, - 'cap': capabilities}) + 'cap': capabilities, + 'cluster': cluster_msg}) self._no_capabilities_hosts.discard(host) def notify_service_capabilities(self, service_name, host, capabilities): """Notify the ceilometer with updated volume stats""" + # TODO(geguileo): Make this work with Active/Active if service_name != 'volume': return @@ -519,6 +539,7 @@ class HostManager(object): volume_services = objects.ServiceList.get_all_by_topic(context, topic, disabled=False) + active_backends = set() active_hosts = set() no_capabilities_hosts = set() for service in volume_services.objects: @@ -526,32 +547,46 @@ class HostManager(object): if not service.is_up: LOG.warning(_LW("volume service is down. (host: %s)"), host) continue + capabilities = self.service_states.get(host, None) if capabilities is None: no_capabilities_hosts.add(host) continue - host_state = self.host_state_map.get(host) - if not host_state: - host_state = self.host_state_cls(host, - capabilities=capabilities, - service= - dict(service)) - self.host_state_map[host] = host_state - # update capabilities and attributes in host_state - host_state.update_from_volume_capability(capabilities, - service= - dict(service)) + # Since the service could have been added or remove from a cluster + backend_key = service.service_topic_queue + backend_state = self.host_state_map.get(backend_key, None) + if not backend_state: + backend_state = self.host_state_cls( + host, + service.cluster_name, + capabilities=capabilities, + service=dict(service)) + self.host_state_map[backend_key] = backend_state + + # We may be receiving capability reports out of order from + # different volume services in a cluster, so we drop older updates + # and only update for newer capability reports. + if (backend_state.capabilities['timestamp'] <= + capabilities['timestamp']): + # update capabilities and attributes in backend_state + backend_state.update_from_volume_capability( + capabilities, service=dict(service)) + active_backends.add(backend_key) active_hosts.add(host) self._no_capabilities_hosts = no_capabilities_hosts - # remove non-active hosts from host_state_map - nonactive_hosts = set(self.host_state_map.keys()) - active_hosts - for host in nonactive_hosts: - LOG.info(_LI("Removing non-active host: %(host)s from " - "scheduler cache."), {'host': host}) - del self.host_state_map[host] + # remove non-active keys from host_state_map + inactive_backend_keys = set(self.host_state_map) - active_backends + for backend_key in inactive_backend_keys: + # NOTE(geguileo): We don't want to log the removal of a host from + # the map when we are removing it because it has been added to a + # cluster. + if backend_key not in active_hosts: + LOG.info(_LI("Removing non-active backend: %(backend)s from " + "scheduler cache."), {'backend': backend_key}) + del self.host_state_map[backend_key] def get_all_host_states(self, context): """Returns a dict of all the hosts the HostManager knows about. diff --git a/cinder/scheduler/manager.py b/cinder/scheduler/manager.py index c2ab5dc1008..9b69582d098 100644 --- a/cinder/scheduler/manager.py +++ b/cinder/scheduler/manager.py @@ -19,12 +19,15 @@ Scheduler Service """ +from datetime import datetime + import eventlet from oslo_config import cfg from oslo_log import log as logging import oslo_messaging as messaging from oslo_utils import excutils from oslo_utils import importutils +from oslo_utils import timeutils import six from cinder import context @@ -33,6 +36,7 @@ from cinder import exception from cinder import flow_utils from cinder.i18n import _, _LE from cinder import manager +from cinder import objects from cinder import quota from cinder import rpc from cinder.scheduler.flows import create_volume @@ -53,7 +57,7 @@ QUOTAS = quota.QUOTAS LOG = logging.getLogger(__name__) -class SchedulerManager(manager.Manager): +class SchedulerManager(manager.CleanableManager, manager.Manager): """Chooses a host to create volumes.""" RPC_API_VERSION = scheduler_rpcapi.SchedulerAPI.RPC_API_VERSION @@ -80,13 +84,22 @@ class SchedulerManager(manager.Manager): self.driver.reset() def update_service_capabilities(self, context, service_name=None, - host=None, capabilities=None, **kwargs): + host=None, capabilities=None, + cluster_name=None, timestamp=None, + **kwargs): """Process a capability update from a service node.""" if capabilities is None: capabilities = {} + # If we received the timestamp we have to deserialize it + elif timestamp: + timestamp = datetime.strptime(timestamp, + timeutils.PERFECT_TIME_FORMAT) + self.driver.update_service_capabilities(service_name, host, - capabilities) + capabilities, + cluster_name, + timestamp) def notify_service_capabilities(self, context, service_name, host, capabilities): @@ -150,9 +163,9 @@ class SchedulerManager(manager.Manager): group.status = 'error' group.save() + @objects.Volume.set_workers def create_volume(self, context, volume, snapshot_id=None, image_id=None, request_spec=None, filter_properties=None): - self._wait_for_scheduler() try: @@ -171,13 +184,21 @@ class SchedulerManager(manager.Manager): with flow_utils.DynamicLogListener(flow_engine, logger=LOG): flow_engine.run() + def _do_cleanup(self, ctxt, vo_resource): + # We can only receive cleanup requests for volumes, but we check anyway + # We need to cleanup the volume status for cases where the scheduler + # died while scheduling the volume creation. + if (isinstance(vo_resource, objects.Volume) and + vo_resource.status == 'creating'): + vo_resource.status = 'error' + vo_resource.save() + def request_service_capabilities(self, context): volume_rpcapi.VolumeAPI().publish_service_capabilities(context) - def migrate_volume_to_host(self, context, volume, host, force_host_copy, - request_spec, filter_properties=None): + def migrate_volume(self, context, volume, backend, force_copy, + request_spec, filter_properties): """Ensure that the host exists and can accept the volume.""" - self._wait_for_scheduler() def _migrate_volume_set_error(self, context, ex, request_spec): @@ -193,9 +214,9 @@ class SchedulerManager(manager.Manager): context, ex, request_spec) try: - tgt_host = self.driver.host_passes_filters(context, host, - request_spec, - filter_properties) + tgt_backend = self.driver.host_passes_filters(context, backend, + request_spec, + filter_properties) except exception.NoValidHost as ex: _migrate_volume_set_error(self, context, ex, request_spec) except Exception as ex: @@ -203,8 +224,14 @@ class SchedulerManager(manager.Manager): _migrate_volume_set_error(self, context, ex, request_spec) else: volume_rpcapi.VolumeAPI().migrate_volume(context, volume, - tgt_host, - force_host_copy) + tgt_backend, + force_copy) + + # FIXME(geguileo): Remove this in v4.0 of RPC API. + def migrate_volume_to_host(self, context, volume, host, force_host_copy, + request_spec, filter_properties=None): + return self.migrate_volume(context, volume, host, force_host_copy, + request_spec, filter_properties) def retype(self, context, volume, request_spec, filter_properties=None): """Schedule the modification of a volume's type. @@ -272,7 +299,7 @@ class SchedulerManager(manager.Manager): try: self.driver.host_passes_filters(context, - volume.host, + volume.service_topic_queue, request_spec, filter_properties) except exception.NoValidHost as ex: @@ -306,7 +333,8 @@ class SchedulerManager(manager.Manager): filter_properties['new_size'] = new_size try: - self.driver.host_passes_filters(context, volume.host, + self.driver.host_passes_filters(context, + volume.service_topic_queue, request_spec, filter_properties) volume_rpcapi.VolumeAPI().extend_volume(context, volume, new_size, reservations) diff --git a/cinder/scheduler/rpcapi.py b/cinder/scheduler/rpcapi.py index 40b8ee0fa1e..a5124a80b86 100644 --- a/cinder/scheduler/rpcapi.py +++ b/cinder/scheduler/rpcapi.py @@ -17,6 +17,7 @@ Client side of the scheduler manager RPC API. """ from oslo_serialization import jsonutils +from oslo_utils import timeutils from cinder.common import constants from cinder import exception @@ -62,9 +63,12 @@ class SchedulerAPI(rpc.RPCAPI): 3.0 - Remove 2.x compatibility 3.1 - Adds notify_service_capabilities() 3.2 - Adds extend_volume() + 3.3 - Add cluster support to migrate_volume, and to + update_service_capabilities and send the timestamp from the + capabilities. """ - RPC_API_VERSION = '3.2' + RPC_API_VERSION = '3.3' RPC_DEFAULT_VERSION = '3.0' TOPIC = constants.SCHEDULER_TOPIC BINARY = 'cinder-scheduler' @@ -106,15 +110,24 @@ class SchedulerAPI(rpc.RPCAPI): 'filter_properties': filter_properties, 'volume': volume} return cctxt.cast(ctxt, 'create_volume', **msg_args) - def migrate_volume_to_host(self, ctxt, volume, host, force_host_copy=False, - request_spec=None, filter_properties=None): - cctxt = self._get_cctxt() + def migrate_volume(self, ctxt, volume, backend, force_copy=False, + request_spec=None, filter_properties=None): request_spec_p = jsonutils.to_primitive(request_spec) - msg_args = {'host': host, 'force_host_copy': force_host_copy, - 'request_spec': request_spec_p, + msg_args = {'request_spec': request_spec_p, 'filter_properties': filter_properties, 'volume': volume} + version = '3.3' + if self.client.can_send_version(version): + msg_args['backend'] = backend + msg_args['force_copy'] = force_copy + method = 'migrate_volume' + else: + version = '3.0' + msg_args['host'] = backend + msg_args['force_host_copy'] = force_copy + method = 'migrate_volume_to_host' - return cctxt.cast(ctxt, 'migrate_volume_to_host', **msg_args) + cctxt = self._get_cctxt(version=version) + return cctxt.cast(ctxt, method, **msg_args) def retype(self, ctxt, volume, request_spec=None, filter_properties=None): cctxt = self._get_cctxt() @@ -157,14 +170,27 @@ class SchedulerAPI(rpc.RPCAPI): return cctxt.call(ctxt, 'get_pools', filters=filters) def update_service_capabilities(self, ctxt, service_name, host, - capabilities): - cctxt = self._get_cctxt(fanout=True) - cctxt.cast(ctxt, 'update_service_capabilities', - service_name=service_name, host=host, - capabilities=capabilities) + capabilities, cluster_name, + timestamp=None): + msg_args = dict(service_name=service_name, host=host, + capabilities=capabilities) + + version = '3.3' + # If server accepts timestamping the capabilities and the cluster name + if self.client.can_send_version(version): + # Serialize the timestamp + timestamp = timestamp or timeutils.utcnow() + msg_args.update(cluster_name=cluster_name, + timestamp=jsonutils.to_primitive(timestamp)) + else: + version = '3.0' + + cctxt = self._get_cctxt(fanout=True, version=version) + cctxt.cast(ctxt, 'update_service_capabilities', **msg_args) def notify_service_capabilities(self, ctxt, service_name, host, capabilities): + # TODO(geguileo): Make this work with Active/Active cctxt = self._get_cctxt(version='3.1') if not cctxt.can_send_version('3.1'): msg = _('notify_service_capabilities requires cinder-scheduler ' diff --git a/cinder/tests/unit/api/contrib/test_admin_actions.py b/cinder/tests/unit/api/contrib/test_admin_actions.py index 311fb09d1d0..47e678806da 100644 --- a/cinder/tests/unit/api/contrib/test_admin_actions.py +++ b/cinder/tests/unit/api/contrib/test_admin_actions.py @@ -10,6 +10,7 @@ # License for the specific language governing permissions and limitations # under the License. +import ddt import fixtures import mock from oslo_concurrency import lockutils @@ -21,6 +22,7 @@ import webob from webob import exc from cinder.api.contrib import admin_actions +from cinder.api.openstack import api_version_request as api_version from cinder.backup import api as backup_api from cinder.backup import rpcapi as backup_rpcapi from cinder.common import constants @@ -70,6 +72,7 @@ class BaseAdminTest(test.TestCase): return volume +@ddt.ddt class AdminActionsTest(BaseAdminTest): def setUp(self): super(AdminActionsTest, self).setUp() @@ -101,6 +104,7 @@ class AdminActionsTest(BaseAdminTest): self.patch('cinder.objects.Service.get_minimum_rpc_version', side_effect=_get_minimum_rpc_version_mock) + self.controller = admin_actions.VolumeAdminController() def tearDown(self): self.svc.stop() @@ -138,7 +142,7 @@ class AdminActionsTest(BaseAdminTest): updated_status) def test_valid_updates(self): - vac = admin_actions.VolumeAdminController() + vac = self.controller vac.validate_update({'status': 'creating'}) vac.validate_update({'status': 'available'}) @@ -503,10 +507,74 @@ class AdminActionsTest(BaseAdminTest): {'host': 'test2', 'topic': constants.VOLUME_TOPIC, 'created_at': timeutils.utcnow()}) + db.service_create(self.ctx, + {'host': 'clustered_host', + 'topic': constants.VOLUME_TOPIC, + 'binary': constants.VOLUME_BINARY, + 'cluster_name': 'cluster', + 'created_at': timeutils.utcnow()}) + db.cluster_create(self.ctx, + {'name': 'cluster', + 'binary': constants.VOLUME_BINARY}) # current status is available volume = self._create_volume(self.ctx) return volume + def _migrate_volume_3_exec(self, ctx, volume, host, expected_status, + force_host_copy=False, version=None, + cluster=None): + # build request to migrate to host + # req = fakes.HTTPRequest.blank('/v3/%s/volumes/%s/action' % ( + # fake.PROJECT_ID, volume['id'])) + req = webob.Request.blank('/v3/%s/volumes/%s/action' % ( + fake.PROJECT_ID, volume['id'])) + req.method = 'POST' + req.headers['content-type'] = 'application/json' + body = {'os-migrate_volume': {'host': host, + 'force_host_copy': force_host_copy}} + version = version or '3.0' + req.headers = {'OpenStack-API-Version': 'volume %s' % version} + req.api_version_request = api_version.APIVersionRequest(version) + if version == '3.16': + body['os-migrate_volume']['cluster'] = cluster + req.body = jsonutils.dump_as_bytes(body) + req.environ['cinder.context'] = ctx + resp = self.controller._migrate_volume(req, volume.id, body) + + # verify status + self.assertEqual(expected_status, resp.status_int) + volume = db.volume_get(self.ctx, volume['id']) + return volume + + @ddt.data('3.0', '3.15', '3.16') + def test_migrate_volume_success_3(self, version): + expected_status = 202 + host = 'test2' + volume = self._migrate_volume_prep() + volume = self._migrate_volume_3_exec(self.ctx, volume, host, + expected_status, version=version) + self.assertEqual('starting', volume['migration_status']) + + def test_migrate_volume_success_cluster(self): + expected_status = 202 + # We cannot provide host and cluster, so send host to None + host = None + cluster = 'cluster' + volume = self._migrate_volume_prep() + volume = self._migrate_volume_3_exec(self.ctx, volume, host, + expected_status, version='3.16', + cluster=cluster) + self.assertEqual('starting', volume['migration_status']) + + def test_migrate_volume_fail_host_and_cluster(self): + # We cannot send host and cluster in the request + host = 'test2' + cluster = 'cluster' + volume = self._migrate_volume_prep() + self.assertRaises(exception.InvalidInput, + self._migrate_volume_3_exec, self.ctx, volume, host, + None, version='3.16', cluster=cluster) + def _migrate_volume_exec(self, ctx, volume, host, expected_status, force_host_copy=False): # build request to migrate to host diff --git a/cinder/tests/unit/api/contrib/test_snapshot_manage.py b/cinder/tests/unit/api/contrib/test_snapshot_manage.py index fb6980977fd..65daed472a8 100644 --- a/cinder/tests/unit/api/contrib/test_snapshot_manage.py +++ b/cinder/tests/unit/api/contrib/test_snapshot_manage.py @@ -45,7 +45,7 @@ def volume_get(self, context, volume_id, viewable_admin_meta=False): if volume_id == fake.VOLUME_ID: return objects.Volume(context, id=fake.VOLUME_ID, _name_id=fake.VOLUME2_ID, - host='fake_host') + host='fake_host', cluster_name=None) raise exception.VolumeNotFound(volume_id=volume_id) @@ -109,7 +109,7 @@ class SnapshotManageTest(test.TestCase): @mock.patch('cinder.volume.rpcapi.VolumeAPI.manage_existing_snapshot') @mock.patch('cinder.volume.api.API.create_snapshot_in_db') - @mock.patch('cinder.db.service_get') + @mock.patch('cinder.db.sqlalchemy.api.service_get') def test_manage_snapshot_ok(self, mock_db, mock_create_snapshot, mock_rpcapi): """Test successful manage snapshot execution. @@ -128,7 +128,8 @@ class SnapshotManageTest(test.TestCase): # Check the db.service_get was called with correct arguments. mock_db.assert_called_once_with( - mock.ANY, host='fake_host', binary='cinder-volume') + mock.ANY, None, host='fake_host', binary='cinder-volume', + cluster_name=None) # Check the create_snapshot_in_db was called with correct arguments. self.assertEqual(1, mock_create_snapshot.call_count) @@ -149,7 +150,7 @@ class SnapshotManageTest(test.TestCase): new_callable=mock.PropertyMock) @mock.patch('cinder.volume.rpcapi.VolumeAPI.manage_existing_snapshot') @mock.patch('cinder.volume.api.API.create_snapshot_in_db') - @mock.patch('cinder.db.service_get') + @mock.patch('cinder.db.sqlalchemy.api.service_get') def test_manage_snapshot_disabled(self, mock_db, mock_create_snapshot, mock_rpcapi, mock_is_up): """Test manage snapshot failure due to disabled service.""" @@ -168,7 +169,7 @@ class SnapshotManageTest(test.TestCase): new_callable=mock.PropertyMock) @mock.patch('cinder.volume.rpcapi.VolumeAPI.manage_existing_snapshot') @mock.patch('cinder.volume.api.API.create_snapshot_in_db') - @mock.patch('cinder.db.service_get') + @mock.patch('cinder.db.sqlalchemy.api.service_get') def test_manage_snapshot_is_down(self, mock_db, mock_create_snapshot, mock_rpcapi, mock_is_up): """Test manage snapshot failure due to down service.""" @@ -280,7 +281,7 @@ class SnapshotManageTest(test.TestCase): sort_dirs=['asc'], sort_keys=['reference']) @mock.patch('cinder.objects.service.Service.is_up', return_value=True) - @mock.patch('cinder.db.service_get') + @mock.patch('cinder.db.sqlalchemy.api.service_get') def test_get_manageable_snapshots_disabled(self, mock_db, mock_is_up): mock_db.return_value = fake_service.fake_service_obj(self._admin_ctxt, disabled=True) @@ -292,7 +293,7 @@ class SnapshotManageTest(test.TestCase): @mock.patch('cinder.objects.service.Service.is_up', return_value=False, new_callable=mock.PropertyMock) - @mock.patch('cinder.db.service_get') + @mock.patch('cinder.db.sqlalchemy.api.service_get') def test_get_manageable_snapshots_is_down(self, mock_db, mock_is_up): mock_db.return_value = fake_service.fake_service_obj(self._admin_ctxt) res = self._get_resp_get('host_ok', False, True) diff --git a/cinder/tests/unit/api/contrib/test_volume_manage.py b/cinder/tests/unit/api/contrib/test_volume_manage.py index f7d96dfabc8..a7862ba6ca1 100644 --- a/cinder/tests/unit/api/contrib/test_volume_manage.py +++ b/cinder/tests/unit/api/contrib/test_volume_manage.py @@ -23,6 +23,7 @@ except ImportError: from urllib.parse import urlencode import webob +from cinder.api.contrib import volume_manage from cinder.api.openstack import api_version_request as api_version from cinder import context from cinder import exception @@ -51,7 +52,7 @@ def app_v3(): return mapper -def service_get(context, host, binary): +def service_get(context, id, host=None, binary=None, *args, **kwargs): """Replacement for Service.service_get_by_host_and_topic. We mock the Service.service_get_by_host_and_topic method to return @@ -146,7 +147,7 @@ def api_get_manageable_volumes(*args, **kwargs): @ddt.ddt -@mock.patch('cinder.db.service_get', service_get) +@mock.patch('cinder.db.sqlalchemy.api.service_get', service_get) @mock.patch('cinder.volume.volume_types.get_volume_type_by_name', vt_get_volume_type_by_name) @mock.patch('cinder.volume.volume_types.get_volume_type', @@ -173,6 +174,7 @@ class VolumeManageTest(test.TestCase): self._non_admin_ctxt = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, is_admin=False) + self.controller = volume_manage.VolumeManageController() def _get_resp_post(self, body): """Helper to execute a POST os-volume-manage API call.""" @@ -196,10 +198,11 @@ class VolumeManageTest(test.TestCase): res = req.get_response(app_v3()) return res + @ddt.data(False, True) @mock.patch('cinder.volume.api.API.manage_existing', wraps=api_manage) @mock.patch( 'cinder.api.openstack.wsgi.Controller.validate_name_and_description') - def test_manage_volume_ok(self, mock_validate, mock_api_manage): + def test_manage_volume_ok(self, cluster, mock_validate, mock_api_manage): """Test successful manage volume execution. Tests for correct operation when valid arguments are passed in the @@ -209,6 +212,9 @@ class VolumeManageTest(test.TestCase): """ body = {'volume': {'host': 'host_ok', 'ref': 'fake_ref'}} + # This will be ignored + if cluster: + body['volume']['cluster'] = 'cluster' res = self._get_resp_post(body) self.assertEqual(202, res.status_int) @@ -216,9 +222,48 @@ class VolumeManageTest(test.TestCase): self.assertEqual(1, mock_api_manage.call_count) args = mock_api_manage.call_args[0] self.assertEqual(body['volume']['host'], args[1]) - self.assertEqual(body['volume']['ref'], args[2]) + self.assertIsNone(args[2]) # Cluster argument + self.assertEqual(body['volume']['ref'], args[3]) self.assertTrue(mock_validate.called) + def _get_resp_create(self, body, version='3.0'): + url = '/v3/%s/os-volume-manage' % fake.PROJECT_ID + req = webob.Request.blank(url, base_url='http://localhost.com' + url) + req.method = 'POST' + req.headers['Content-Type'] = 'application/json' + req.environ['cinder.context'] = self._admin_ctxt + req.body = jsonutils.dump_as_bytes(body) + req.headers = {'OpenStack-API-Version': 'volume %s' % version} + req.api_version_request = api_version.APIVersionRequest(version) + res = self.controller.create(req, body) + return res + + @mock.patch('cinder.volume.api.API.manage_existing', wraps=api_manage) + @mock.patch( + 'cinder.api.openstack.wsgi.Controller.validate_name_and_description') + def test_manage_volume_ok_cluster(self, mock_validate, mock_api_manage): + body = {'volume': {'cluster': 'cluster', + 'ref': 'fake_ref'}} + res = self._get_resp_create(body, '3.16') + self.assertEqual(['volume'], list(res.keys())) + + # Check that the manage API was called with the correct arguments. + self.assertEqual(1, mock_api_manage.call_count) + args = mock_api_manage.call_args[0] + self.assertIsNone(args[1]) + self.assertEqual(body['volume']['cluster'], args[2]) + self.assertEqual(body['volume']['ref'], args[3]) + self.assertTrue(mock_validate.called) + + @mock.patch( + 'cinder.api.openstack.wsgi.Controller.validate_name_and_description') + def test_manage_volume_fail_host_cluster(self, mock_validate): + body = {'volume': {'host': 'host_ok', + 'cluster': 'cluster', + 'ref': 'fake_ref'}} + self.assertRaises(exception.InvalidInput, + self._get_resp_create, body, '3.16') + def test_manage_volume_missing_host(self): """Test correct failure when host is not specified.""" body = {'volume': {'ref': 'fake_ref'}} diff --git a/cinder/tests/unit/api/v3/test_snapshot_manage.py b/cinder/tests/unit/api/v3/test_snapshot_manage.py index 58d2ee0af91..56ce469a60c 100644 --- a/cinder/tests/unit/api/v3/test_snapshot_manage.py +++ b/cinder/tests/unit/api/v3/test_snapshot_manage.py @@ -60,7 +60,7 @@ class SnapshotManageTest(test.TestCase): @mock.patch('cinder.volume.rpcapi.VolumeAPI.manage_existing_snapshot') @mock.patch('cinder.volume.api.API.create_snapshot_in_db') - @mock.patch('cinder.objects.service.Service.get_by_args') + @mock.patch('cinder.objects.service.Service.get_by_id') def test_manage_snapshot_route(self, mock_service_get, mock_create_snapshot, mock_rpcapi): """Test call to manage snapshot. diff --git a/cinder/tests/unit/scheduler/fakes.py b/cinder/tests/unit/scheduler/fakes.py index 071eb64e679..6e5539c9e43 100644 --- a/cinder/tests/unit/scheduler/fakes.py +++ b/cinder/tests/unit/scheduler/fakes.py @@ -23,6 +23,9 @@ from cinder.scheduler import filter_scheduler from cinder.scheduler import host_manager +UTC_NOW = timeutils.utcnow() + + class FakeFilterScheduler(filter_scheduler.FilterScheduler): def __init__(self, *args, **kwargs): super(FakeFilterScheduler, self).__init__(*args, **kwargs) @@ -43,7 +46,7 @@ class FakeHostManager(host_manager.HostManager): 'thick_provisioning_support': True, 'reserved_percentage': 10, 'volume_backend_name': 'lvm1', - 'timestamp': None}, + 'timestamp': UTC_NOW}, 'host2': {'total_capacity_gb': 2048, 'free_capacity_gb': 300, 'allocated_capacity_gb': 1748, @@ -53,7 +56,7 @@ class FakeHostManager(host_manager.HostManager): 'thick_provisioning_support': False, 'reserved_percentage': 10, 'volume_backend_name': 'lvm2', - 'timestamp': None}, + 'timestamp': UTC_NOW}, 'host3': {'total_capacity_gb': 512, 'free_capacity_gb': 256, 'allocated_capacity_gb': 256, @@ -63,7 +66,7 @@ class FakeHostManager(host_manager.HostManager): 'thick_provisioning_support': True, 'reserved_percentage': 0, 'volume_backend_name': 'lvm3', - 'timestamp': None}, + 'timestamp': UTC_NOW}, 'host4': {'total_capacity_gb': 2048, 'free_capacity_gb': 200, 'allocated_capacity_gb': 1848, @@ -73,7 +76,7 @@ class FakeHostManager(host_manager.HostManager): 'thick_provisioning_support': False, 'reserved_percentage': 5, 'volume_backend_name': 'lvm4', - 'timestamp': None, + 'timestamp': UTC_NOW, 'consistencygroup_support': True}, 'host5': {'total_capacity_gb': 'infinite', 'free_capacity_gb': 'unknown', @@ -83,13 +86,13 @@ class FakeHostManager(host_manager.HostManager): 'thin_provisioning_support': True, 'thick_provisioning_support': False, 'reserved_percentage': 5, - 'timestamp': None}, + 'timestamp': UTC_NOW}, } class FakeHostState(host_manager.HostState): def __init__(self, host, attribute_dict): - super(FakeHostState, self).__init__(host) + super(FakeHostState, self).__init__(host, None) for (key, val) in attribute_dict.items(): setattr(self, key, val) diff --git a/cinder/tests/unit/scheduler/test_base_filter.py b/cinder/tests/unit/scheduler/test_base_filter.py index 05377c3194d..a0ecdfaf83e 100644 --- a/cinder/tests/unit/scheduler/test_base_filter.py +++ b/cinder/tests/unit/scheduler/test_base_filter.py @@ -178,7 +178,7 @@ class TestBaseFilterHandler(test.TestCase): def test_get_filtered_objects_info_and_debug_log_none_returned(self): all_filters = [FilterA, FilterA, FilterB] - fake_hosts = [host_manager.HostState('fake_host%s' % x) + fake_hosts = [host_manager.HostState('fake_host%s' % x, None) for x in range(1, 4)] filt_props = {"request_spec": {'volume_id': fake.VOLUME_ID, diff --git a/cinder/tests/unit/scheduler/test_capacity_weigher.py b/cinder/tests/unit/scheduler/test_capacity_weigher.py index b15823d4661..77fb2ba2ea7 100644 --- a/cinder/tests/unit/scheduler/test_capacity_weigher.py +++ b/cinder/tests/unit/scheduler/test_capacity_weigher.py @@ -15,6 +15,7 @@ """ Tests For Capacity Weigher. """ +from datetime import datetime import ddt import mock @@ -248,7 +249,7 @@ class CapacityWeigherTestCase(test.TestCase): 'thin_provisioning_support': True, 'thick_provisioning_support': False, 'reserved_percentage': 5, - 'timestamp': None} + 'timestamp': datetime.utcnow()} hostinfo_list = self._get_all_hosts() # host1: thin_provisioning_support = False @@ -290,7 +291,7 @@ class CapacityWeigherTestCase(test.TestCase): 'thin_provisioning_support': True, 'thick_provisioning_support': False, 'reserved_percentage': 5, - 'timestamp': None} + 'timestamp': datetime.utcnow()} hostinfo_list = self._get_all_hosts() # host1: thin_provisioning_support = False @@ -332,7 +333,7 @@ class CapacityWeigherTestCase(test.TestCase): 'thin_provisioning_support': True, 'thick_provisioning_support': False, 'reserved_percentage': 5, - 'timestamp': None} + 'timestamp': datetime.utcnow()} hostinfo_list = self._get_all_hosts() # host1: thin_provisioning_support = False @@ -374,7 +375,7 @@ class CapacityWeigherTestCase(test.TestCase): 'thin_provisioning_support': True, 'thick_provisioning_support': False, 'reserved_percentage': 5, - 'timestamp': None} + 'timestamp': datetime.utcnow()} hostinfo_list = self._get_all_hosts() # host1: thin_provisioning_support = False diff --git a/cinder/tests/unit/scheduler/test_chance_weigher.py b/cinder/tests/unit/scheduler/test_chance_weigher.py index 5732aed7a42..651707af17f 100644 --- a/cinder/tests/unit/scheduler/test_chance_weigher.py +++ b/cinder/tests/unit/scheduler/test_chance_weigher.py @@ -58,7 +58,7 @@ class ChanceWeigherTestCase(test.TestCase): # ensure we don't lose any hosts when weighing with # the ChanceWeigher hm = host_manager.HostManager() - fake_hosts = [host_manager.HostState('fake_host%s' % x) + fake_hosts = [host_manager.HostState('fake_host%s' % x, None) for x in range(1, 5)] weighed_hosts = hm.get_weighed_hosts(fake_hosts, {}, 'ChanceWeigher') self.assertEqual(4, len(weighed_hosts)) diff --git a/cinder/tests/unit/scheduler/test_filter_scheduler.py b/cinder/tests/unit/scheduler/test_filter_scheduler.py index c9ed2f1ce27..f45dad7c7c7 100644 --- a/cinder/tests/unit/scheduler/test_filter_scheduler.py +++ b/cinder/tests/unit/scheduler/test_filter_scheduler.py @@ -414,7 +414,7 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase): filter_properties = {'retry': retry} sched = fakes.FakeFilterScheduler() - host_state = host_manager.HostState('host') + host_state = host_manager.HostState('host', None) host_state.total_capacity_gb = 1024 sched._post_select_populate_filter_properties(filter_properties, host_state) diff --git a/cinder/tests/unit/scheduler/test_host_manager.py b/cinder/tests/unit/scheduler/test_host_manager.py index ad5490bbc69..1dc509a29a8 100644 --- a/cinder/tests/unit/scheduler/test_host_manager.py +++ b/cinder/tests/unit/scheduler/test_host_manager.py @@ -19,14 +19,18 @@ Tests For HostManager from datetime import datetime import mock +from oslo_serialization import jsonutils from oslo_utils import timeutils from cinder.common import constants +from cinder import context +from cinder import db from cinder import exception from cinder import objects from cinder.scheduler import filters from cinder.scheduler import host_manager from cinder import test +from cinder.tests.unit import fake_constants as fake from cinder.tests.unit.objects import test_service @@ -46,7 +50,7 @@ class HostManagerTestCase(test.TestCase): def setUp(self): super(HostManagerTestCase, self).setUp() self.host_manager = host_manager.HostManager() - self.fake_hosts = [host_manager.HostState('fake_host%s' % x) + self.fake_hosts = [host_manager.HostState('fake_host%s' % x, None) for x in range(1, 5)] # For a second scheduler service. self.host_manager_1 = host_manager.HostManager() @@ -93,27 +97,29 @@ class HostManagerTestCase(test.TestCase): _mock_get_updated_pools): service_states = self.host_manager.service_states self.assertDictMatch({}, service_states) - _mock_utcnow.side_effect = [31337, 31338, 31339] + _mock_utcnow.side_effect = [31338, 31339] _mock_get_updated_pools.return_value = [] - host1_volume_capabs = dict(free_capacity_gb=4321, timestamp=1) - host2_volume_capabs = dict(free_capacity_gb=5432, timestamp=1) - host3_volume_capabs = dict(free_capacity_gb=6543, timestamp=1) + timestamp = jsonutils.to_primitive(datetime.utcnow()) + host1_volume_capabs = dict(free_capacity_gb=4321, timestamp=timestamp) + host2_volume_capabs = dict(free_capacity_gb=5432) + host3_volume_capabs = dict(free_capacity_gb=6543) service_name = 'volume' self.host_manager.update_service_capabilities(service_name, 'host1', - host1_volume_capabs) + host1_volume_capabs, + None, timestamp) self.host_manager.update_service_capabilities(service_name, 'host2', - host2_volume_capabs) + host2_volume_capabs, + None, None) self.host_manager.update_service_capabilities(service_name, 'host3', - host3_volume_capabs) + host3_volume_capabs, + None, None) # Make sure dictionary isn't re-assigned self.assertEqual(service_states, self.host_manager.service_states) - # Make sure original dictionary wasn't copied - self.assertEqual(1, host1_volume_capabs['timestamp']) - host1_volume_capabs['timestamp'] = 31337 + host1_volume_capabs['timestamp'] = timestamp host2_volume_capabs['timestamp'] = 31338 host3_volume_capabs['timestamp'] = 31339 @@ -150,7 +156,7 @@ class HostManagerTestCase(test.TestCase): # S0: update_service_capabilities() self.host_manager.update_service_capabilities(service_name, 'host1', - capab1) + capab1, None, None) self.assertDictMatch(dict(dict(timestamp=31337), **capab1), self.host_manager.service_states['host1']) @@ -168,7 +174,7 @@ class HostManagerTestCase(test.TestCase): # S1: update_service_capabilities() self.host_manager_1.update_service_capabilities(service_name, 'host1', - capab1) + capab1, None, None) self.assertDictMatch(dict(dict(timestamp=31339), **capab1), self.host_manager_1.service_states['host1']) @@ -208,7 +214,7 @@ class HostManagerTestCase(test.TestCase): # S0: update_service_capabilities() self.host_manager.update_service_capabilities(service_name, 'host1', - capab1) + capab1, None, None) self.assertDictMatch(dict(dict(timestamp=31340), **capab1), self.host_manager.service_states['host1']) @@ -219,7 +225,7 @@ class HostManagerTestCase(test.TestCase): # S1: update_service_capabilities() self.host_manager_1.update_service_capabilities(service_name, 'host1', - capab1) + capab1, None, None) self.assertDictMatch(dict(dict(timestamp=31341), **capab1), self.host_manager_1.service_states['host1']) @@ -292,7 +298,7 @@ class HostManagerTestCase(test.TestCase): # S0: update_service_capabilities() self.host_manager.update_service_capabilities(service_name, 'host1', - capab1) + capab1, None, None) self.assertDictMatch( dict(dict(timestamp=31340), **capab1), @@ -303,7 +309,7 @@ class HostManagerTestCase(test.TestCase): # S1: update_service_capabilities() self.host_manager_1.update_service_capabilities(service_name, 'host1', - capab1) + capab1, None, None) self.assertDictMatch(dict(dict(timestamp=31345), **capab1), self.host_manager_1.service_states['host1']) @@ -355,7 +361,7 @@ class HostManagerTestCase(test.TestCase): # S0: update_service_capabilities() self.host_manager.update_service_capabilities(service_name, 'host1', - capab2) + capab2, None, None) self.assertDictMatch( dict(dict(timestamp=31340), **capab1), self.host_manager.service_states_last_update['host1']) @@ -378,7 +384,7 @@ class HostManagerTestCase(test.TestCase): # S1: update_service_capabilities() self.host_manager_1.update_service_capabilities(service_name, 'host1', - capab2) + capab2, None, None) self.assertDictMatch(dict(dict(timestamp=31348), **capab2), self.host_manager_1.service_states['host1']) @@ -452,7 +458,7 @@ class HostManagerTestCase(test.TestCase): # S0: update_service_capabilities() self.host_manager.update_service_capabilities(service_name, 'host1', - capab2) + capab2, None, None) self.assertDictMatch( dict(dict(timestamp=31349), **capab2), self.host_manager.service_states_last_update['host1']) @@ -462,7 +468,7 @@ class HostManagerTestCase(test.TestCase): # S1: update_service_capabilities() self.host_manager_1.update_service_capabilities(service_name, 'host1', - capab2) + capab2, None, None) self.assertDictMatch( dict(dict(timestamp=31348), **capab2), @@ -490,19 +496,23 @@ class HostManagerTestCase(test.TestCase): self.host_manager = host_manager.HostManager() self.assertFalse(self.host_manager.has_all_capabilities()) - host1_volume_capabs = dict(free_capacity_gb=4321, timestamp=1) - host2_volume_capabs = dict(free_capacity_gb=5432, timestamp=1) - host3_volume_capabs = dict(free_capacity_gb=6543, timestamp=1) + timestamp = jsonutils.to_primitive(datetime.utcnow()) + host1_volume_capabs = dict(free_capacity_gb=4321) + host2_volume_capabs = dict(free_capacity_gb=5432) + host3_volume_capabs = dict(free_capacity_gb=6543) service_name = 'volume' self.host_manager.update_service_capabilities(service_name, 'host1', - host1_volume_capabs) + host1_volume_capabs, + None, timestamp) self.assertFalse(self.host_manager.has_all_capabilities()) self.host_manager.update_service_capabilities(service_name, 'host2', - host2_volume_capabs) + host2_volume_capabs, + None, timestamp) self.assertFalse(self.host_manager.has_all_capabilities()) self.host_manager.update_service_capabilities(service_name, 'host3', - host3_volume_capabs) + host3_volume_capabs, + None, timestamp) self.assertTrue(self.host_manager.has_all_capabilities()) @mock.patch('cinder.db.service_get_all') @@ -532,7 +542,7 @@ class HostManagerTestCase(test.TestCase): mocked_service_states = { 'host1': dict(volume_backend_name='AAA', total_capacity_gb=512, free_capacity_gb=200, - timestamp=None, reserved_percentage=0), + timestamp=dates[1], reserved_percentage=0), } _mock_service_get_all.return_value = services @@ -547,24 +557,93 @@ class HostManagerTestCase(test.TestCase): mocked_service_states): self.host_manager.update_service_capabilities(service_name, 'host1', - host_volume_capabs) + host_volume_capabs, + None, None) res = self.host_manager.get_pools(context) self.assertEqual(1, len(res)) self.assertEqual(dates[1], res[0]['capabilities']['timestamp']) self.host_manager.update_service_capabilities(service_name, 'host1', - host_volume_capabs) + host_volume_capabs, + None, None) res = self.host_manager.get_pools(context) self.assertEqual(1, len(res)) self.assertEqual(dates[2], res[0]['capabilities']['timestamp']) + @mock.patch('cinder.objects.Service.is_up', True) + def test_get_all_host_states_cluster(self): + """Test get_all_host_states when we have clustered services. + + Confirm that clustered services are grouped and that only the latest + of the capability reports is relevant. + """ + ctxt = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True) + + cluster_name = 'cluster' + db.cluster_create(ctxt, {'name': cluster_name, + 'binary': constants.VOLUME_BINARY}) + + services = ( + db.service_create(ctxt, + {'host': 'clustered_host_1', + 'topic': constants.VOLUME_TOPIC, + 'binary': constants.VOLUME_BINARY, + 'cluster_name': cluster_name, + 'created_at': timeutils.utcnow()}), + # Even if this service is disabled, since it belongs to an enabled + # cluster, it's not really disabled. + db.service_create(ctxt, + {'host': 'clustered_host_2', + 'topic': constants.VOLUME_TOPIC, + 'binary': constants.VOLUME_BINARY, + 'disabled': True, + 'cluster_name': cluster_name, + 'created_at': timeutils.utcnow()}), + db.service_create(ctxt, + {'host': 'clustered_host_3', + 'topic': constants.VOLUME_TOPIC, + 'binary': constants.VOLUME_BINARY, + 'cluster_name': cluster_name, + 'created_at': timeutils.utcnow()}), + db.service_create(ctxt, + {'host': 'non_clustered_host', + 'topic': constants.VOLUME_TOPIC, + 'binary': constants.VOLUME_BINARY, + 'created_at': timeutils.utcnow()}), + # This service has no capabilities + db.service_create(ctxt, + {'host': 'no_capabilities_host', + 'topic': constants.VOLUME_TOPIC, + 'binary': constants.VOLUME_BINARY, + 'created_at': timeutils.utcnow()}), + ) + + capabilities = ((1, {'free_capacity_gb': 1000}), + # This is the capacity that will be selected for the + # cluster because is the one with the latest timestamp. + (3, {'free_capacity_gb': 2000}), + (2, {'free_capacity_gb': 3000}), + (1, {'free_capacity_gb': 4000})) + + for i in range(len(capabilities)): + self.host_manager.update_service_capabilities( + 'volume', services[i].host, capabilities[i][1], + services[i].cluster_name, capabilities[i][0]) + + res = self.host_manager.get_all_host_states(ctxt) + result = {(s.cluster_name or s.host, s.free_capacity_gb) for s in res} + expected = {(cluster_name + '#_pool0', 2000), + ('non_clustered_host#_pool0', 4000)} + self.assertSetEqual(expected, result) + @mock.patch('cinder.db.service_get_all') @mock.patch('cinder.objects.service.Service.is_up', new_callable=mock.PropertyMock) def test_get_all_host_states(self, _mock_service_is_up, _mock_service_get_all): context = 'fake_context' + timestamp = datetime.utcnow() topic = constants.VOLUME_TOPIC services = [ @@ -596,15 +675,15 @@ class HostManagerTestCase(test.TestCase): service_states = { 'host1': dict(volume_backend_name='AAA', total_capacity_gb=512, free_capacity_gb=200, - timestamp=None, reserved_percentage=0, + timestamp=timestamp, reserved_percentage=0, provisioned_capacity_gb=312), 'host2': dict(volume_backend_name='BBB', total_capacity_gb=256, free_capacity_gb=100, - timestamp=None, reserved_percentage=0, + timestamp=timestamp, reserved_percentage=0, provisioned_capacity_gb=156), 'host3': dict(volume_backend_name='CCC', total_capacity_gb=10000, free_capacity_gb=700, - timestamp=None, reserved_percentage=0, + timestamp=timestamp, reserved_percentage=0, provisioned_capacity_gb=9300), } # First test: service.is_up is always True, host5 is disabled, @@ -665,6 +744,7 @@ class HostManagerTestCase(test.TestCase): def test_get_pools(self, _mock_service_is_up, _mock_service_get_all): context = 'fake_context' + timestamp = datetime.utcnow() services = [ dict(id=1, host='host1', topic='volume', disabled=False, @@ -678,15 +758,15 @@ class HostManagerTestCase(test.TestCase): mocked_service_states = { 'host1': dict(volume_backend_name='AAA', total_capacity_gb=512, free_capacity_gb=200, - timestamp=None, reserved_percentage=0, + timestamp=timestamp, reserved_percentage=0, provisioned_capacity_gb=312), 'host2@back1': dict(volume_backend_name='BBB', total_capacity_gb=256, free_capacity_gb=100, - timestamp=None, reserved_percentage=0, + timestamp=timestamp, reserved_percentage=0, provisioned_capacity_gb=156), 'host2@back2': dict(volume_backend_name='CCC', total_capacity_gb=10000, free_capacity_gb=700, - timestamp=None, reserved_percentage=0, + timestamp=timestamp, reserved_percentage=0, provisioned_capacity_gb=9300), } @@ -706,7 +786,7 @@ class HostManagerTestCase(test.TestCase): { 'name': 'host1#AAA', 'capabilities': { - 'timestamp': None, + 'timestamp': timestamp, 'volume_backend_name': 'AAA', 'free_capacity_gb': 200, 'driver_version': None, @@ -719,7 +799,7 @@ class HostManagerTestCase(test.TestCase): { 'name': 'host2@back1#BBB', 'capabilities': { - 'timestamp': None, + 'timestamp': timestamp, 'volume_backend_name': 'BBB', 'free_capacity_gb': 100, 'driver_version': None, @@ -732,7 +812,7 @@ class HostManagerTestCase(test.TestCase): { 'name': 'host2@back2#CCC', 'capabilities': { - 'timestamp': None, + 'timestamp': timestamp, 'volume_backend_name': 'CCC', 'free_capacity_gb': 700, 'driver_version': None, @@ -887,7 +967,7 @@ class HostStateTestCase(test.TestCase): """Test case for HostState class.""" def test_update_from_volume_capability_nopool(self): - fake_host = host_manager.HostState('host1') + fake_host = host_manager.HostState('host1', None) self.assertIsNone(fake_host.free_capacity_gb) volume_capability = {'total_capacity_gb': 1024, @@ -922,7 +1002,7 @@ class HostStateTestCase(test.TestCase): self.assertRaises(KeyError, lambda: fake_host.pools['pool0']) def test_update_from_volume_capability_with_pools(self): - fake_host = host_manager.HostState('host1') + fake_host = host_manager.HostState('host1', None) self.assertIsNone(fake_host.free_capacity_gb) capability = { 'volume_backend_name': 'Local iSCSI', @@ -1014,7 +1094,7 @@ class HostStateTestCase(test.TestCase): fake_host.pools['3rd pool'].provisioned_capacity_gb) def test_update_from_volume_infinite_capability(self): - fake_host = host_manager.HostState('host1') + fake_host = host_manager.HostState('host1', None) self.assertIsNone(fake_host.free_capacity_gb) volume_capability = {'total_capacity_gb': 'infinite', @@ -1035,7 +1115,7 @@ class HostStateTestCase(test.TestCase): fake_host.pools['_pool0'].free_capacity_gb) def test_update_from_volume_unknown_capability(self): - fake_host = host_manager.HostState('host1') + fake_host = host_manager.HostState('host1', None) self.assertIsNone(fake_host.free_capacity_gb) volume_capability = {'total_capacity_gb': 'infinite', @@ -1056,7 +1136,7 @@ class HostStateTestCase(test.TestCase): fake_host.pools['_pool0'].free_capacity_gb) def test_update_from_empty_volume_capability(self): - fake_host = host_manager.HostState('host1') + fake_host = host_manager.HostState('host1', None) vol_cap = {'timestamp': None} @@ -1076,7 +1156,7 @@ class PoolStateTestCase(test.TestCase): """Test case for HostState class.""" def test_update_from_volume_capability(self): - fake_pool = host_manager.PoolState('host1', None, 'pool0') + fake_pool = host_manager.PoolState('host1', None, None, 'pool0') self.assertIsNone(fake_pool.free_capacity_gb) volume_capability = {'total_capacity_gb': 1024, diff --git a/cinder/tests/unit/scheduler/test_rpcapi.py b/cinder/tests/unit/scheduler/test_rpcapi.py index d3f68f89ba4..3ffbe2b7b2a 100644 --- a/cinder/tests/unit/scheduler/test_rpcapi.py +++ b/cinder/tests/unit/scheduler/test_rpcapi.py @@ -17,6 +17,7 @@ Unit Tests for cinder.scheduler.rpcapi """ +import ddt import mock from cinder import context @@ -27,6 +28,7 @@ from cinder.tests.unit import fake_constants from cinder.tests.unit import fake_volume +@ddt.ddt class SchedulerRpcAPITestCase(test.TestCase): def setUp(self): @@ -75,14 +77,20 @@ class SchedulerRpcAPITestCase(test.TestCase): for kwarg, value in self.fake_kwargs.items(): self.assertEqual(expected_msg[kwarg], value) - def test_update_service_capabilities(self): + @ddt.data('3.0', '3.3') + @mock.patch('oslo_messaging.RPCClient.can_send_version') + def test_update_service_capabilities(self, version, can_send_version): + can_send_version.side_effect = lambda x: x == version self._test_scheduler_api('update_service_capabilities', rpc_method='cast', service_name='fake_name', host='fake_host', - capabilities='fake_capabilities', + cluster_name='cluster_name', + capabilities={}, fanout=True, - version='3.0') + version=version, + timestamp='123') + can_send_version.assert_called_once_with('3.3') def test_create_volume(self): volume = fake_volume.fake_volume_obj(self.context) @@ -135,17 +143,18 @@ class SchedulerRpcAPITestCase(test.TestCase): version='3.0') create_worker_mock.assert_called_once() - def test_migrate_volume_to_host(self): + @mock.patch('oslo_messaging.RPCClient.can_send_version') + def test_migrate_volume(self, can_send_version): volume = fake_volume.fake_volume_obj(self.context) create_worker_mock = self.mock_object(volume, 'create_worker') - self._test_scheduler_api('migrate_volume_to_host', + self._test_scheduler_api('migrate_volume', rpc_method='cast', - host='host', - force_host_copy=True, + backend='host', + force_copy=True, request_spec='fake_request_spec', filter_properties='filter_properties', volume=volume, - version='3.0') + version='3.3') create_worker_mock.assert_not_called() def test_retype(self): diff --git a/cinder/tests/unit/scheduler/test_scheduler.py b/cinder/tests/unit/scheduler/test_scheduler.py index b5902dfab91..9e4b6f8a1e1 100644 --- a/cinder/tests/unit/scheduler/test_scheduler.py +++ b/cinder/tests/unit/scheduler/test_scheduler.py @@ -103,7 +103,7 @@ class SchedulerManagerTestCase(test.TestCase): self.manager.update_service_capabilities(self.context, service_name=service, host=host) - _mock_update_cap.assert_called_once_with(service, host, {}) + _mock_update_cap.assert_called_once_with(service, host, {}, None, None) @mock.patch('cinder.scheduler.driver.Scheduler.' 'update_service_capabilities') @@ -117,7 +117,8 @@ class SchedulerManagerTestCase(test.TestCase): service_name=service, host=host, capabilities=capabilities) - _mock_update_cap.assert_called_once_with(service, host, capabilities) + _mock_update_cap.assert_called_once_with(service, host, capabilities, + None, None) @mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume') @mock.patch('cinder.message.api.API.create') @@ -164,6 +165,19 @@ class SchedulerManagerTestCase(test.TestCase): request_spec_obj, {}) self.assertFalse(_mock_sleep.called) + @mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume') + @mock.patch('eventlet.sleep') + def test_create_volume_set_worker(self, _mock_sleep, _mock_sched_create): + """Make sure that the worker is created when creating a volume.""" + volume = tests_utils.create_volume(self.context, status='creating') + + request_spec = {'volume_id': volume.id} + + self.manager.create_volume(self.context, volume, + request_spec=request_spec, + filter_properties={}) + volume.set_worker.assert_called_once_with() + @mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume') @mock.patch('cinder.scheduler.driver.Scheduler.is_ready') @mock.patch('eventlet.sleep') @@ -326,6 +340,13 @@ class SchedulerManagerTestCase(test.TestCase): self.manager.driver = original_driver + def test_do_cleanup(self): + vol = tests_utils.create_volume(self.context, status='creating') + self.manager._do_cleanup(self.context, vol) + + vol.refresh() + self.assertEqual('error', vol.status) + class SchedulerTestCase(test.TestCase): """Test case for base scheduler driver class.""" @@ -346,9 +367,9 @@ class SchedulerTestCase(test.TestCase): host = 'fake_host' capabilities = {'fake_capability': 'fake_value'} self.driver.update_service_capabilities(service_name, host, - capabilities) + capabilities, None) _mock_update_cap.assert_called_once_with(service_name, host, - capabilities) + capabilities, None) @mock.patch('cinder.scheduler.host_manager.HostManager.' 'has_all_capabilities', return_value=False) @@ -387,8 +408,10 @@ class SchedulerDriverModuleTestCase(test.TestCase): volume = fake_volume.fake_volume_obj(self.context) _mock_volume_get.return_value = volume - driver.volume_update_db(self.context, volume.id, 'fake_host') + driver.volume_update_db(self.context, volume.id, 'fake_host', + 'fake_cluster') scheduled_at = volume.scheduled_at.replace(tzinfo=None) _mock_vol_update.assert_called_once_with( self.context, volume.id, {'host': 'fake_host', + 'cluster_name': 'fake_cluster', 'scheduled_at': scheduled_at}) diff --git a/cinder/tests/unit/test_db_api.py b/cinder/tests/unit/test_db_api.py index fd5b2ad7f07..1598fa12f11 100644 --- a/cinder/tests/unit/test_db_api.py +++ b/cinder/tests/unit/test_db_api.py @@ -167,6 +167,39 @@ class DBAPIServiceTestCase(BaseTest): real_service1 = db.service_get(self.ctxt, host='host1', topic='topic1') self._assertEqualObjects(service1, real_service1) + def test_service_get_all_disabled_by_cluster(self): + values = [ + # Enabled services + {'host': 'host1', 'binary': 'b1', 'disabled': False}, + {'host': 'host2', 'binary': 'b1', 'disabled': False, + 'cluster_name': 'enabled_cluster'}, + {'host': 'host3', 'binary': 'b1', 'disabled': True, + 'cluster_name': 'enabled_cluster'}, + + # Disabled services + {'host': 'host4', 'binary': 'b1', 'disabled': True}, + {'host': 'host5', 'binary': 'b1', 'disabled': False, + 'cluster_name': 'disabled_cluster'}, + {'host': 'host6', 'binary': 'b1', 'disabled': True, + 'cluster_name': 'disabled_cluster'}, + ] + + db.cluster_create(self.ctxt, {'name': 'enabled_cluster', + 'binary': 'b1', + 'disabled': False}), + db.cluster_create(self.ctxt, {'name': 'disabled_cluster', + 'binary': 'b1', + 'disabled': True}), + services = [self._create_service(vals) for vals in values] + + enabled = db.service_get_all(self.ctxt, disabled=False) + disabled = db.service_get_all(self.ctxt, disabled=True) + + self.assertSetEqual({s.host for s in services[:3]}, + {s.host for s in enabled}) + self.assertSetEqual({s.host for s in services[3:]}, + {s.host for s in disabled}) + def test_service_get_all(self): expired = (datetime.datetime.utcnow() - datetime.timedelta(seconds=CONF.service_down_time + 1)) diff --git a/cinder/tests/unit/test_volume.py b/cinder/tests/unit/test_volume.py index 6f949e1bfb8..5016fa271ae 100644 --- a/cinder/tests/unit/test_volume.py +++ b/cinder/tests/unit/test_volume.py @@ -344,20 +344,19 @@ class VolumeTestCase(base.BaseVolumeTestCase): def test_init_host_added_to_cluster(self, cg_include_mock, vol_include_mock, vol_get_all_mock, snap_get_all_mock): - self.mock_object(self.volume, 'cluster', mock.sentinel.cluster) + cluster = str(mock.sentinel.cluster) + self.mock_object(self.volume, 'cluster', cluster) self.volume.init_host(added_to_cluster=True, service_id=self.service_id) - vol_include_mock.assert_called_once_with(mock.ANY, - mock.sentinel.cluster, + vol_include_mock.assert_called_once_with(mock.ANY, cluster, host=self.volume.host) - cg_include_mock.assert_called_once_with(mock.ANY, - mock.sentinel.cluster, + cg_include_mock.assert_called_once_with(mock.ANY, cluster, host=self.volume.host) vol_get_all_mock.assert_called_once_with( - mock.ANY, filters={'cluster_name': mock.sentinel.cluster}) + mock.ANY, filters={'cluster_name': cluster}) snap_get_all_mock.assert_called_once_with( - mock.ANY, search_opts={'cluster_name': mock.sentinel.cluster}) + mock.ANY, search_opts={'cluster_name': cluster}) @mock.patch('cinder.objects.service.Service.get_minimum_rpc_version') @mock.patch('cinder.objects.service.Service.get_minimum_obj_version') @@ -4785,7 +4784,7 @@ class VolumeMigrationTestCase(base.BaseVolumeTestCase): self.assertEqual('newhost', volume.host) self.assertEqual('success', volume.migration_status) - def _fake_create_volume(self, ctxt, volume, host, req_spec, filters, + def _fake_create_volume(self, ctxt, volume, req_spec, filters, allow_reschedule=True): return db.volume_update(ctxt, volume['id'], {'status': self.expected_status}) @@ -4880,7 +4879,7 @@ class VolumeMigrationTestCase(base.BaseVolumeTestCase): nova_api, create_volume, save): def fake_create_volume(*args, **kwargs): - context, volume, host, request_spec, filter_properties = args + context, volume, request_spec, filter_properties = args fake_db = mock.Mock() task = create_volume_manager.ExtractVolumeSpecTask(fake_db) specs = task.execute(context, volume, {}) @@ -4916,7 +4915,7 @@ class VolumeMigrationTestCase(base.BaseVolumeTestCase): migrate_volume_completion, nova_api, create_volume, save): def fake_create_volume(*args, **kwargs): - context, volume, host, request_spec, filter_properties = args + context, volume, request_spec, filter_properties = args fake_db = mock.Mock() task = create_volume_manager.ExtractVolumeSpecTask(fake_db) specs = task.execute(context, volume, {}) diff --git a/cinder/tests/unit/test_volume_rpcapi.py b/cinder/tests/unit/test_volume_rpcapi.py index 7cb308b9a4c..01fc9ef332b 100644 --- a/cinder/tests/unit/test_volume_rpcapi.py +++ b/cinder/tests/unit/test_volume_rpcapi.py @@ -26,6 +26,7 @@ from cinder.common import constants from cinder import context from cinder import db from cinder import objects +from cinder.objects import base as ovo_base from cinder.objects import fields from cinder import test from cinder.tests.unit.backup import fake_backup @@ -158,8 +159,12 @@ class VolumeRpcAPITestCase(test.TestCase): if 'dest_host' in expected_msg: dest_host = expected_msg.pop('dest_host') dest_host_dict = {'host': dest_host.host, + 'cluster_name': dest_host.cluster_name, 'capabilities': dest_host.capabilities} expected_msg['host'] = dest_host_dict + if 'force_copy' in expected_msg: + expected_msg['force_host_copy'] = expected_msg.pop('force_copy') + if 'new_volume' in expected_msg: volume = expected_msg['new_volume'] expected_msg['new_volume_id'] = volume['id'] @@ -229,26 +234,11 @@ class VolumeRpcAPITestCase(test.TestCase): self.assertEqual(expected_arg, arg) for kwarg, value in self.fake_kwargs.items(): - if isinstance(value, objects.Snapshot): - expected_snapshot = expected_msg[kwarg].obj_to_primitive() - snapshot = value.obj_to_primitive() - self.assertEqual(expected_snapshot, snapshot) - elif isinstance(value, objects.ConsistencyGroup): - expected_cg = expected_msg[kwarg].obj_to_primitive() - cg = value.obj_to_primitive() - self.assertEqual(expected_cg, cg) - elif isinstance(value, objects.CGSnapshot): - expected_cgsnapshot = expected_msg[kwarg].obj_to_primitive() - cgsnapshot = value.obj_to_primitive() - self.assertEqual(expected_cgsnapshot, cgsnapshot) - elif isinstance(value, objects.Volume): - expected_volume = expected_msg[kwarg].obj_to_primitive() - volume = value.obj_to_primitive() - self.assertDictEqual(expected_volume, volume) - elif isinstance(value, objects.Backup): - expected_backup = expected_msg[kwarg].obj_to_primitive() - backup = value.obj_to_primitive() - self.assertEqual(expected_backup, backup) + if isinstance(value, ovo_base.CinderObject): + expected = expected_msg[kwarg].obj_to_primitive() + primitive = value.obj_to_primitive() + self.assertEqual(expected, primitive) + else: self.assertEqual(expected_msg[kwarg], value) @@ -328,8 +318,7 @@ class VolumeRpcAPITestCase(test.TestCase): def test_create_consistencygroup(self): self._test_volume_api('create_consistencygroup', rpc_method='cast', - group=self.fake_cg, host='fake_host1', - version='3.0') + group=self.fake_cg, version='3.0') def test_delete_consistencygroup(self): self._test_volume_api('delete_consistencygroup', rpc_method='cast', @@ -358,7 +347,6 @@ class VolumeRpcAPITestCase(test.TestCase): self._test_volume_api('create_volume', rpc_method='cast', volume=self.fake_volume_obj, - host='fake_host1', request_spec='fake_request_spec', filter_properties='fake_properties', allow_reschedule=True, @@ -540,7 +528,13 @@ class VolumeRpcAPITestCase(test.TestCase): '-8ffd-0800200c9a66', version='3.0') - def test_extend_volume(self): + def _change_cluster_name(self, resource, cluster_name): + resource.cluster_name = cluster_name + resource.obj_reset_changes() + + @ddt.data(None, 'mycluster') + def test_extend_volume(self, cluster_name): + self._change_cluster_name(self.fake_volume_obj, cluster_name) self._test_volume_api('extend_volume', rpc_method='cast', volume=self.fake_volume_obj, @@ -548,10 +542,12 @@ class VolumeRpcAPITestCase(test.TestCase): reservations=self.fake_reservations, version='3.0') - def test_migrate_volume(self): + @mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True) + def test_migrate_volume(self, can_send_version): class FakeHost(object): def __init__(self): self.host = 'host' + self.cluster_name = 'cluster_name' self.capabilities = {} dest_host = FakeHost() self._test_volume_api('migrate_volume', @@ -559,7 +555,7 @@ class VolumeRpcAPITestCase(test.TestCase): volume=self.fake_volume_obj, dest_host=dest_host, force_host_copy=True, - version='3.0') + version='3.5') def test_migrate_volume_completion(self): self._test_volume_api('migrate_volume_completion', @@ -569,10 +565,12 @@ class VolumeRpcAPITestCase(test.TestCase): error=False, version='3.0') - def test_retype(self): + @mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True) + def test_retype(self, can_send_version): class FakeHost(object): def __init__(self): self.host = 'host' + self.cluster_name = 'cluster_name' self.capabilities = {} dest_host = FakeHost() self._test_volume_api('retype', @@ -583,7 +581,7 @@ class VolumeRpcAPITestCase(test.TestCase): migration_policy='never', reservations=self.fake_reservations, old_reservations=self.fake_reservations, - version='3.0') + version='3.5') def test_manage_existing(self): self._test_volume_api('manage_existing', @@ -685,8 +683,7 @@ class VolumeRpcAPITestCase(test.TestCase): def test_create_group(self): self._test_group_api('create_group', rpc_method='cast', - group=self.fake_group, host='fake_host1', - version='3.0') + group=self.fake_group, version='3.0') def test_delete_group(self): self._test_group_api('delete_group', rpc_method='cast', diff --git a/cinder/volume/api.py b/cinder/volume/api.py index 82e88d02f3a..0c85334e6cc 100644 --- a/cinder/volume/api.py +++ b/cinder/volume/api.py @@ -1344,32 +1344,47 @@ class API(base.Base): resource=volume) @wrap_check_policy - def migrate_volume(self, context, volume, host, force_host_copy, + def migrate_volume(self, context, volume, host, cluster_name, force_copy, lock_volume): - """Migrate the volume to the specified host.""" - # Make sure the host is in the list of available hosts + """Migrate the volume to the specified host or cluster.""" elevated = context.elevated() - topic = constants.VOLUME_TOPIC - services = objects.ServiceList.get_all_by_topic( - elevated, topic, disabled=False) - found = False - svc_host = volume_utils.extract_host(host, 'backend') - for service in services: - if service.is_up and service.host == svc_host: - found = True - break - if not found: - msg = _('No available service named %s') % host + + # If we received a request to migrate to a host + # Look for the service - must be up and enabled + svc_host = host and volume_utils.extract_host(host, 'backend') + svc_cluster = cluster_name and volume_utils.extract_host(cluster_name, + 'backend') + # NOTE(geguileo): Only svc_host or svc_cluster is set, so when we get + # a service from the DB we are getting either one specific service from + # a host or any service from a cluster that is up, which means that the + # cluster itself is also up. + try: + svc = objects.Service.get_by_id(elevated, None, is_up=True, + topic=constants.VOLUME_TOPIC, + host=svc_host, disabled=False, + cluster_name=svc_cluster, + backend_match_level='pool') + except exception.ServiceNotFound: + msg = _('No available service named %s') % cluster_name or host LOG.error(msg) raise exception.InvalidHost(reason=msg) + # Even if we were requested to do a migration to a host, if the host is + # in a cluster we will do a cluster migration. + cluster_name = svc.cluster_name # Build required conditions for conditional update expected = {'status': ('available', 'in-use'), 'migration_status': self.AVAILABLE_MIGRATION_STATUS, 'replication_status': (None, 'disabled'), 'consistencygroup_id': (None, ''), - 'group_id': (None, ''), - 'host': db.Not(host)} + 'group_id': (None, '')} + + # We want to make sure that the migration is to another host or + # another cluster. + if cluster_name: + expected['cluster_name'] = db.Not(cluster_name) + else: + expected['host'] = db.Not(host) filters = [~db.volume_has_snapshots_filter()] @@ -1392,8 +1407,8 @@ class API(base.Base): if not result: msg = _('Volume %s status must be available or in-use, must not ' 'be migrating, have snapshots, be replicated, be part of ' - 'a group and destination host must be different than the ' - 'current host') % {'vol_id': volume.id} + 'a group and destination host/cluster must be different ' + 'than the current one') % {'vol_id': volume.id} LOG.error(msg) raise exception.InvalidVolume(reason=msg) @@ -1406,11 +1421,11 @@ class API(base.Base): request_spec = {'volume_properties': volume, 'volume_type': volume_type, 'volume_id': volume.id} - self.scheduler_rpcapi.migrate_volume_to_host(context, - volume, - host, - force_host_copy, - request_spec) + self.scheduler_rpcapi.migrate_volume(context, + volume, + cluster_name or host, + force_copy, + request_spec) LOG.info(_LI("Migrate volume request issued successfully."), resource=volume) @@ -1556,19 +1571,31 @@ class API(base.Base): LOG.info(_LI("Retype volume request issued successfully."), resource=volume) - def _get_service_by_host(self, context, host, resource='volume'): + def _get_service_by_host_cluster(self, context, host, cluster_name, + resource='volume'): elevated = context.elevated() + + svc_cluster = cluster_name and volume_utils.extract_host(cluster_name, + 'backend') + svc_host = host and volume_utils.extract_host(host, 'backend') + + # NOTE(geguileo): Only svc_host or svc_cluster is set, so when we get + # a service from the DB we are getting either one specific service from + # a host or any service that is up from a cluster, which means that the + # cluster itself is also up. try: - svc_host = volume_utils.extract_host(host, 'backend') - service = objects.Service.get_by_args( - elevated, svc_host, 'cinder-volume') + service = objects.Service.get_by_id(elevated, None, host=svc_host, + binary='cinder-volume', + cluster_name=svc_cluster) except exception.ServiceNotFound: with excutils.save_and_reraise_exception(): LOG.error(_LE('Unable to find service: %(service)s for ' - 'given host: %(host)s.'), - {'service': constants.VOLUME_BINARY, 'host': host}) + 'given host: %(host)s and cluster %(cluster)s.'), + {'service': constants.VOLUME_BINARY, 'host': host, + 'cluster': cluster_name}) - if service.disabled: + if service.disabled and (not service.cluster_name or + service.cluster.disabled): LOG.error(_LE('Unable to manage existing %s on a disabled ' 'service.'), resource) raise exception.ServiceUnavailable() @@ -1580,15 +1607,16 @@ class API(base.Base): return service - def manage_existing(self, context, host, ref, name=None, description=None, - volume_type=None, metadata=None, + def manage_existing(self, context, host, cluster_name, ref, name=None, + description=None, volume_type=None, metadata=None, availability_zone=None, bootable=False): if volume_type and 'extra_specs' not in volume_type: extra_specs = volume_types.get_volume_type_extra_specs( volume_type['id']) volume_type['extra_specs'] = extra_specs - service = self._get_service_by_host(context, host) + service = self._get_service_by_host_cluster(context, host, + cluster_name) if availability_zone is None: availability_zone = service.availability_zone @@ -1597,7 +1625,8 @@ class API(base.Base): 'context': context, 'name': name, 'description': description, - 'host': host, + 'host': service.host, + 'cluster_name': service.cluster_name, 'ref': ref, 'volume_type': volume_type, 'metadata': metadata, @@ -1626,7 +1655,7 @@ class API(base.Base): def get_manageable_volumes(self, context, host, marker=None, limit=None, offset=None, sort_keys=None, sort_dirs=None): - self._get_service_by_host(context, host) + self._get_service_by_host_cluster(context, host, None) return self.volume_rpcapi.get_manageable_volumes(context, host, marker, limit, offset, sort_keys, @@ -1635,18 +1664,21 @@ class API(base.Base): def manage_existing_snapshot(self, context, ref, volume, name=None, description=None, metadata=None): - service = self._get_service_by_host(context, volume.host, 'snapshot') + service = self._get_service_by_host_cluster(context, volume.host, + volume.cluster_name, + 'snapshot') + snapshot_object = self.create_snapshot_in_db(context, volume, name, description, True, metadata, None, commit_quota=False) - self.volume_rpcapi.manage_existing_snapshot(context, snapshot_object, - ref, service.host) + self.volume_rpcapi.manage_existing_snapshot( + context, snapshot_object, ref, service.service_topic_queue) return snapshot_object def get_manageable_snapshots(self, context, host, marker=None, limit=None, offset=None, sort_keys=None, sort_dirs=None): - self._get_service_by_host(context, host, resource='snapshot') + self._get_service_by_host_cluster(context, host, None, 'snapshot') return self.volume_rpcapi.get_manageable_snapshots(context, host, marker, limit, offset, sort_keys, diff --git a/cinder/volume/driver.py b/cinder/volume/driver.py index a7379e4be31..b154d6a4cf7 100644 --- a/cinder/volume/driver.py +++ b/cinder/volume/driver.py @@ -339,6 +339,7 @@ class BaseVD(object): # NOTE(vish): db is set by Manager self.db = kwargs.get('db') self.host = kwargs.get('host') + self.cluster_name = kwargs.get('cluster_name') self.configuration = kwargs.get('configuration', None) if self.configuration: diff --git a/cinder/volume/flows/api/create_volume.py b/cinder/volume/flows/api/create_volume.py index c4f14eac3d0..f21255d84d6 100644 --- a/cinder/volume/flows/api/create_volume.py +++ b/cinder/volume/flows/api/create_volume.py @@ -703,13 +703,13 @@ class VolumeCastTask(flow_utils.CinderTask): self.db = db def _cast_create_volume(self, context, request_spec, filter_properties): - source_volid = request_spec['source_volid'] - source_replicaid = request_spec['source_replicaid'] + source_volume_ref = None + source_volid = (request_spec['source_volid'] or + request_spec['source_replicaid']) volume = request_spec['volume'] snapshot_id = request_spec['snapshot_id'] image_id = request_spec['image_id'] cgroup_id = request_spec['consistencygroup_id'] - host = None cgsnapshot_id = request_spec['cgsnapshot_id'] group_id = request_spec['group_id'] if cgroup_id: @@ -734,19 +734,11 @@ class VolumeCastTask(flow_utils.CinderTask): # snapshot resides instead of passing it through the scheduler, so # snapshot can be copied to the new volume. snapshot = objects.Snapshot.get_by_id(context, snapshot_id) - source_volume_ref = objects.Volume.get_by_id(context, - snapshot.volume_id) - host = source_volume_ref.host + source_volume_ref = snapshot.volume elif source_volid: - source_volume_ref = objects.Volume.get_by_id(context, - source_volid) - host = source_volume_ref.host - elif source_replicaid: - source_volume_ref = objects.Volume.get_by_id(context, - source_replicaid) - host = source_volume_ref.host + source_volume_ref = objects.Volume.get_by_id(context, source_volid) - if not host: + if not source_volume_ref: # Cast to the scheduler and let it handle whatever is needed # to select the target host for this volume. self.scheduler_rpcapi.create_volume( @@ -759,14 +751,14 @@ class VolumeCastTask(flow_utils.CinderTask): else: # Bypass the scheduler and send the request directly to the volume # manager. - volume.host = host + volume.host = source_volume_ref.host + volume.cluster_name = source_volume_ref.cluster_name volume.scheduled_at = timeutils.utcnow() volume.save() if not cgsnapshot_id: self.volume_rpcapi.create_volume( context, volume, - volume.host, request_spec, filter_properties, allow_reschedule=False) diff --git a/cinder/volume/flows/api/manage_existing.py b/cinder/volume/flows/api/manage_existing.py index a683d17261c..ca23c006e09 100644 --- a/cinder/volume/flows/api/manage_existing.py +++ b/cinder/volume/flows/api/manage_existing.py @@ -37,7 +37,8 @@ class EntryCreateTask(flow_utils.CinderTask): def __init__(self, db): requires = ['availability_zone', 'description', 'metadata', - 'name', 'host', 'bootable', 'volume_type', 'ref'] + 'name', 'host', 'cluster_name', 'bootable', 'volume_type', + 'ref'] super(EntryCreateTask, self).__init__(addons=[ACTION], requires=requires) self.db = db @@ -62,6 +63,7 @@ class EntryCreateTask(flow_utils.CinderTask): 'display_description': kwargs.pop('description'), 'display_name': kwargs.pop('name'), 'host': kwargs.pop('host'), + 'cluster_name': kwargs.pop('cluster_name'), 'availability_zone': kwargs.pop('availability_zone'), 'volume_type_id': volume_type_id, 'metadata': kwargs.pop('metadata') or {}, diff --git a/cinder/volume/manager.py b/cinder/volume/manager.py index f70c03f73b9..45e5e05c215 100644 --- a/cinder/volume/manager.py +++ b/cinder/volume/manager.py @@ -222,6 +222,7 @@ class VolumeManager(manager.CleanableManager, configuration=self.configuration, db=self.db, host=self.host, + cluster_name=self.cluster, is_vol_db_empty=vol_db_empty, active_backend_id=curr_active_backend_id) @@ -548,6 +549,12 @@ class VolumeManager(manager.CleanableManager, """ return self.driver.initialized + def _set_resource_host(self, resource): + """Set the host field on the DB to our own when we are clustered.""" + if resource.is_clustered and resource.host != self.host: + resource.host = self.host + resource.save() + @objects.Volume.set_workers def create_volume(self, context, volume, request_spec=None, filter_properties=None, allow_reschedule=True): @@ -555,6 +562,9 @@ class VolumeManager(manager.CleanableManager, # Log about unsupported drivers utils.log_unsupported_driver_warning(self.driver) + # Make sure the host in the DB matches our own when clustered + self._set_resource_host(volume) + context_elevated = context.elevated() if filter_properties is None: filter_properties = {} @@ -1683,12 +1693,13 @@ class VolumeManager(manager.CleanableManager, remote=src_remote, attach_encryptor=attach_encryptor) - def _migrate_volume_generic(self, ctxt, volume, host, new_type_id): + def _migrate_volume_generic(self, ctxt, volume, backend, new_type_id): rpcapi = volume_rpcapi.VolumeAPI() # Create new volume on remote host tmp_skip = {'snapshot_id', 'source_volid'} - skip = self._VOLUME_CLONE_SKIP_PROPERTIES | tmp_skip | {'host'} + skip = self._VOLUME_CLONE_SKIP_PROPERTIES | tmp_skip | {'host', + 'cluster_name'} new_vol_values = {k: volume[k] for k in set(volume.keys()) - skip} if new_type_id: new_vol_values['volume_type_id'] = new_type_id @@ -1700,15 +1711,16 @@ class VolumeManager(manager.CleanableManager, new_volume = objects.Volume( context=ctxt, - host=host['host'], + host=backend['host'], + cluster_name=backend.get('cluster_name'), status='creating', attach_status=fields.VolumeAttachStatus.DETACHED, migration_status='target:%s' % volume['id'], **new_vol_values ) new_volume.create() - rpcapi.create_volume(ctxt, new_volume, host['host'], - None, None, allow_reschedule=False) + rpcapi.create_volume(ctxt, new_volume, None, None, + allow_reschedule=False) # Wait for new_volume to become ready starttime = time.time() @@ -1720,13 +1732,13 @@ class VolumeManager(manager.CleanableManager, tries += 1 now = time.time() if new_volume.status == 'error': - msg = _("failed to create new_volume on destination host") + msg = _("failed to create new_volume on destination") self._clean_temporary_volume(ctxt, volume, new_volume, clean_db_only=True) raise exception.VolumeMigrationFailed(reason=msg) elif now > deadline: - msg = _("timeout creating new_volume on destination host") + msg = _("timeout creating new_volume on destination") self._clean_temporary_volume(ctxt, volume, new_volume, clean_db_only=True) @@ -1931,6 +1943,7 @@ class VolumeManager(manager.CleanableManager, host) if moved: updates = {'host': host['host'], + 'cluster_name': host.get('cluster_name'), 'migration_status': 'success', 'previous_status': volume.status} if status_update: @@ -1948,8 +1961,7 @@ class VolumeManager(manager.CleanableManager, volume.save() if not moved: try: - self._migrate_volume_generic(ctxt, volume, host, - new_type_id) + self._migrate_volume_generic(ctxt, volume, host, new_type_id) except Exception: with excutils.save_and_reraise_exception(): updates = {'migration_status': 'error'} @@ -2238,17 +2250,22 @@ class VolumeManager(manager.CleanableManager, # Call driver to try and change the type retype_model_update = None - # NOTE(jdg): Check to see if the destination host is the same - # as the current. If it's not don't call the driver.retype - # method, otherwise drivers that implement retype may report - # success, but it's invalid in the case of a migrate. + # NOTE(jdg): Check to see if the destination host or cluster (depending + # if it's the volume is in a clustered backend or not) is the same as + # the current. If it's not don't call the driver.retype method, + # otherwise drivers that implement retype may report success, but it's + # invalid in the case of a migrate. # We assume that those that support pools do this internally # so we strip off the pools designation + if (not retyped and not diff.get('encryption') and - vol_utils.hosts_are_equivalent(self.driver.host, - host['host'])): + ((not host.get('cluster_name') and + vol_utils.hosts_are_equivalent(self.driver.host, + host['host'])) or + (vol_utils.hosts_are_equivalent(self.driver.cluster_name, + host.get('cluster_name'))))): try: new_type = volume_types.get_volume_type(context, new_type_id) ret = self.driver.retype(context, @@ -2311,6 +2328,7 @@ class VolumeManager(manager.CleanableManager, else: model_update = {'volume_type_id': new_type_id, 'host': host['host'], + 'cluster_name': host.get('cluster_name'), 'status': status_update['status']} if retype_model_update: model_update.update(retype_model_update) @@ -2407,6 +2425,9 @@ class VolumeManager(manager.CleanableManager, def _create_group(self, context, group, is_generic_group=True): context = context.elevated() + # Make sure the host in the DB matches our own when clustered + self._set_resource_host(group) + status = fields.GroupStatus.AVAILABLE model_update = None diff --git a/cinder/volume/rpcapi.py b/cinder/volume/rpcapi.py index a1498fa228d..368660db368 100644 --- a/cinder/volume/rpcapi.py +++ b/cinder/volume/rpcapi.py @@ -115,9 +115,10 @@ class VolumeAPI(rpc.RPCAPI): get_backup_device(). 3.3 - Adds support for sending objects over RPC in attach_volume(). 3.4 - Adds support for sending objects over RPC in detach_volume(). + 3.5 - Adds support for cluster in retype and migrate_volume """ - RPC_API_VERSION = '3.4' + RPC_API_VERSION = '3.5' RPC_DEFAULT_VERSION = '3.0' TOPIC = constants.VOLUME_TOPIC BINARY = 'cinder-volume' @@ -125,10 +126,10 @@ class VolumeAPI(rpc.RPCAPI): def _get_cctxt(self, host=None, version=None, **kwargs): if host is not None: kwargs['server'] = utils.get_volume_rpc_host(host) - return super(VolumeAPI, self)._get_cctxt(version, **kwargs) + return super(VolumeAPI, self)._get_cctxt(version=version, **kwargs) - def create_consistencygroup(self, ctxt, group, host): - cctxt = self._get_cctxt(host) + def create_consistencygroup(self, ctxt, group): + cctxt = self._get_cctxt(group.service_topic_queue) cctxt.cast(ctxt, 'create_consistencygroup', group=group) def delete_consistencygroup(self, ctxt, group): @@ -145,7 +146,7 @@ class VolumeAPI(rpc.RPCAPI): def create_consistencygroup_from_src(self, ctxt, group, cgsnapshot=None, source_cg=None): - cctxt = self._get_cctxt(group.host) + cctxt = self._get_cctxt(group.service_topic_queue) cctxt.cast(ctxt, 'create_consistencygroup_from_src', group=group, cgsnapshot=cgsnapshot, @@ -159,9 +160,9 @@ class VolumeAPI(rpc.RPCAPI): cctxt = self._get_cctxt(cgsnapshot.service_topic_queue) cctxt.cast(ctxt, 'delete_cgsnapshot', cgsnapshot=cgsnapshot) - def create_volume(self, ctxt, volume, host, request_spec, - filter_properties, allow_reschedule=True): - cctxt = self._get_cctxt(host) + def create_volume(self, ctxt, volume, request_spec, filter_properties, + allow_reschedule=True): + cctxt = self._get_cctxt(volume.service_topic_queue) cctxt.cast(ctxt, 'create_volume', request_spec=request_spec, filter_properties=filter_properties, @@ -239,35 +240,48 @@ class VolumeAPI(rpc.RPCAPI): new_user=new_user, new_project=new_project) def extend_volume(self, ctxt, volume, new_size, reservations): - cctxt = self._get_cctxt(volume.host) + cctxt = self._get_cctxt(volume.service_topic_queue) cctxt.cast(ctxt, 'extend_volume', volume=volume, new_size=new_size, reservations=reservations) def migrate_volume(self, ctxt, volume, dest_host, force_host_copy): - host_p = {'host': dest_host.host, - 'capabilities': dest_host.capabilities} - cctxt = self._get_cctxt(volume.host) - cctxt.cast(ctxt, 'migrate_volume', volume=volume, host=host_p, + backend_p = {'host': dest_host.host, + 'cluster_name': dest_host.cluster_name, + 'capabilities': dest_host.capabilities} + + version = '3.5' + if not self.client.can_send_version(version): + version = '3.0' + del backend_p['cluster_name'] + + cctxt = self._get_cctxt(volume.service_topic_queue, version) + cctxt.cast(ctxt, 'migrate_volume', volume=volume, host=backend_p, force_host_copy=force_host_copy) def migrate_volume_completion(self, ctxt, volume, new_volume, error): - cctxt = self._get_cctxt(volume.host) + cctxt = self._get_cctxt(volume.service_topic_queue) return cctxt.call(ctxt, 'migrate_volume_completion', volume=volume, new_volume=new_volume, error=error,) def retype(self, ctxt, volume, new_type_id, dest_host, migration_policy='never', reservations=None, old_reservations=None): - host_p = {'host': dest_host.host, - 'capabilities': dest_host.capabilities} - cctxt = self._get_cctxt(volume.host) + backend_p = {'host': dest_host.host, + 'cluster_name': dest_host.cluster_name, + 'capabilities': dest_host.capabilities} + version = '3.5' + if not self.client.can_send_version(version): + version = '3.0' + del backend_p['cluster_name'] + + cctxt = self._get_cctxt(volume.service_topic_queue, version) cctxt.cast(ctxt, 'retype', volume=volume, new_type_id=new_type_id, - host=host_p, migration_policy=migration_policy, + host=backend_p, migration_policy=migration_policy, reservations=reservations, old_reservations=old_reservations) def manage_existing(self, ctxt, volume, ref): - cctxt = self._get_cctxt(volume.host) + cctxt = self._get_cctxt(volume.service_topic_queue) cctxt.cast(ctxt, 'manage_existing', ref=ref, volume=volume) def update_migrated_volume(self, ctxt, volume, new_volume, @@ -334,8 +348,8 @@ class VolumeAPI(rpc.RPCAPI): limit=limit, offset=offset, sort_keys=sort_keys, sort_dirs=sort_dirs) - def create_group(self, ctxt, group, host): - cctxt = self._get_cctxt(host) + def create_group(self, ctxt, group): + cctxt = self._get_cctxt(group.service_topic_queue) cctxt.cast(ctxt, 'create_group', group=group) def delete_group(self, ctxt, group): @@ -349,7 +363,7 @@ class VolumeAPI(rpc.RPCAPI): def create_group_from_src(self, ctxt, group, group_snapshot=None, source_group=None): - cctxt = self._get_cctxt(group.host) + cctxt = self._get_cctxt(group.service_topic_queue) cctxt.cast(ctxt, 'create_group_from_src', group=group, group_snapshot=group_snapshot, source_group=source_group) diff --git a/cinder/volume/utils.py b/cinder/volume/utils.py index 4f43a75f1a2..d87502079a8 100644 --- a/cinder/volume/utils.py +++ b/cinder/volume/utils.py @@ -752,6 +752,9 @@ def matching_backend_name(src_volume_type, volume_type): def hosts_are_equivalent(host_1, host_2): + # In case host_1 or host_2 are None + if not (host_1 and host_2): + return host_1 == host_2 return extract_host(host_1) == extract_host(host_2)