Implement filter scheduler

In order to do more sophisticated scheduling (e.g. schedule based on volume
type), filter scheduler is introduced. Several changes are made to make this
possible, some of them are similar to the counterpart in Nova:

- add HostState class to host_manager in order to store volume capabilities
- implement get_volume_stats() method of iSCSIDriver as an example to
demonstrate how volume backend driver reports capabilities as well as status
- add scheduler_options.py and 'scheduler_json_config_location' flag to be
allow loading json configuration file for scheduler at run time
- port common filters/weights from oslo
- add capacity weigher (scheduler/weights/capacity.py) for picking up
target volume backend by weighing free capacity of the host. The default
behavior is to spread volumes across hosts; by changing the
'capacity_weight_multiplier' to negative number, volume placing behavior will
become stacking.
- add capacity filter which filters those hosts have insufficient storage space
to serve the request.
- add 'reserved_percentage' config option to indicate how much space is
reserved. Capacity reservation is needed when volume resize is enabled.
- add 'publish_service_capabilities()' method to volume RPC API to allow
scheduler to have volume service report capabilities. This bumps volume RPC
API to version 1.2
- remove 'volume_force_update_capabilities' config option, volume status will be
report to scheduler in every checking.

The implication of this change to storage/backend driver developer:
- implementation of get_volume_stats() of the driver is now a *MUST*, filter
scheduler heavily relies on the status/capabilities reported by backend driver
to makeplacement decision.  To ensure Cinder works seamlessly on the storage
system, driver should at least report following capabilities/status:
----------------------+---------------------------+---------------------------
  Capability/Status   |      Description          |         Example
----------------------+---------------------------+---------------------------
 'volume_backend_name'| back-end name, string     | 'Example_Storage_Backend'
----------------------+---------------------------+---------------------------
  'vendor_name'       | vendor name, string       | 'OpenStackCinder'
----------------------+---------------------------+---------------------------
  'driver_version'    | version, string           |  '1.0a'
----------------------+---------------------------+---------------------------
  'storage_protocol'  | supported protocols,      | 'iSCSI', 'RBD', 'FC', 'NFS'
                      | string or list of strings | ['iSCSI', 'NFS', 'FC']
----------------------+---------------------------+---------------------------
  'total_capacity_gb' | capacity in GB, integer   |  102400
----------------------+---------------------------+---------------------------
  'free_capacity_gb'  | available capacity in GB, |  1000
                      | integer                   |
----------------------+---------------------------+---------------------------
'reserved_percentage' | reserved space in         |  0, 10
                      | percentage, integer       |
----------------------+---------------------------+---------------------------

The implication of this change to Cinder administrator:
- the default setting for filter scheduler should work well with the benefits
of:
  * being able to fully utilize capacity of backends (driver now has to report
  actul total space and space utilization and scheduler uses these info) not
  limited by the 'max_gigabytes' config option any more;
  * being able to choose placement policy between spreading & stacking for
  volume creation (by modifying the 'capacity_weight_multiplier' in
  CapacityWeigher)
- with filter scheduler, Cinder is now able to utilize the advanced features/
capabilities provided by different storage back-ends via: defining different
volume types with proper extra_specs. Volume types can be considered as sets
of back-end capabilities requirement.
 For example, a volume type which has 'storage_protocol':'FC' key/value pair
definition in its extra_spec can only be served by those back-ends who report
they support FiberChannel protocol. Another example is volume type has 'QoS'
requirement can only be served by back-ends support QoS.

Note/TODO:
* Currently scheduler makes its decision based on the status and capabilities
information reported by volume nodes, and these information is stored in memory
of scheduler process. More sophisticated way may be add on table in DB to
record status/capabilities of all volume nodes, like Nova does for compute nodes.

implement bp volume-type-scheduler

DocImpact

Change-Id: I296b3727db8de0d4cf085fac602d122a7b474842
This commit is contained in:
Zhiteng Huang 2012-10-06 00:08:09 +08:00
parent 9e3a254f6d
commit 643f9169c4
30 changed files with 1934 additions and 70 deletions

View File

@ -314,6 +314,14 @@ class HostNotFound(NotFound):
message = _("Host %(host)s could not be found.")
class SchedulerHostFilterNotFound(NotFound):
message = _("Scheduler Host Filter %(filter_name)s could not be found.")
class SchedulerHostWeigherNotFound(NotFound):
message = _("Scheduler Host Weigher %(weigher_name)s could not be found.")
class HostBinaryNotFound(NotFound):
message = _("Could not find binary %(binary)s on host %(host)s.")

View File

@ -0,0 +1,71 @@
# Copyright (c) 2011-2012 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Filter support
"""
import inspect
from stevedore import extension
class BaseFilter(object):
"""Base class for all filter classes."""
def _filter_one(self, obj, filter_properties):
"""Return True if it passes the filter, False otherwise.
Override this in a subclass.
"""
return True
def filter_all(self, filter_obj_list, filter_properties):
"""Yield objects that pass the filter.
Can be overriden in a subclass, if you need to base filtering
decisions on all objects. Otherwise, one can just override
_filter_one() to filter a single object.
"""
for obj in filter_obj_list:
if self._filter_one(obj, filter_properties):
yield obj
class BaseFilterHandler(object):
""" Base class to handle loading filter classes.
This class should be subclassed where one needs to use filters.
"""
def __init__(self, filter_class_type, filter_namespace):
self.namespace = filter_namespace
self.filter_class_type = filter_class_type
self.filter_manager = extension.ExtensionManager(filter_namespace)
def _is_correct_class(self, obj):
"""Return whether an object is a class of the correct type and
is not prefixed with an underscore.
"""
return (inspect.isclass(obj) and
not obj.__name__.startswith('_') and
issubclass(obj, self.filter_class_type))
def get_all_classes(self):
return [x.plugin for x in self.filter_manager
if self._is_correct_class(x.plugin)]
def get_filtered_objects(self, filter_classes, objs,
filter_properties):
for filter_cls in filter_classes:
objs = filter_cls().filter_all(objs, filter_properties)
return list(objs)

View File

@ -0,0 +1,41 @@
# Copyright (c) 2011 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Scheduler host filters
"""
from cinder.openstack.common import log as logging
from cinder.openstack.common.scheduler import filter
LOG = logging.getLogger(__name__)
class BaseHostFilter(filter.BaseFilter):
"""Base class for host filters."""
def _filter_one(self, obj, filter_properties):
"""Return True if the object passes the filter, otherwise False."""
return self.host_passes(obj, filter_properties)
def host_passes(self, host_state, filter_properties):
"""Return True if the HostState passes the filter, otherwise False.
Override this in a subclass.
"""
raise NotImplementedError()
class HostFilterHandler(filter.BaseFilterHandler):
def __init__(self, namespace):
super(HostFilterHandler, self).__init__(BaseHostFilter, namespace)

View File

@ -0,0 +1,30 @@
# Copyright (c) 2011-2012 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinder.openstack.common.scheduler import filters
class AvailabilityZoneFilter(filters.BaseHostFilter):
"""Filters Hosts by availability zone."""
def host_passes(self, host_state, filter_properties):
spec = filter_properties.get('request_spec', {})
props = spec.get('resource_properties', [])
availability_zone = props.get('availability_zone')
if availability_zone:
return availability_zone == host_state.service['availability_zone']
return True

View File

@ -0,0 +1,63 @@
# Copyright (c) 2011 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinder.openstack.common import log as logging
from cinder.openstack.common.scheduler import filters
from cinder.openstack.common.scheduler.filters import extra_specs_ops
LOG = logging.getLogger(__name__)
class CapabilitiesFilter(filters.BaseHostFilter):
"""HostFilter to work with resource (instance & volume) type records."""
def _satisfies_extra_specs(self, capabilities, resource_type):
"""Check that the capabilities provided by the services
satisfy the extra specs associated with the instance type"""
extra_specs = resource_type.get('extra_specs', [])
if not extra_specs:
return True
for key, req in extra_specs.iteritems():
# Either not scope format, or in capabilities scope
scope = key.split(':')
if len(scope) > 1 and scope[0] != "capabilities":
continue
elif scope[0] == "capabilities":
del scope[0]
cap = capabilities
for index in range(0, len(scope)):
try:
cap = cap.get(scope[index], None)
except AttributeError:
return False
if cap is None:
return False
if not extra_specs_ops.match(cap, req):
return False
return True
def host_passes(self, host_state, filter_properties):
"""Return a list of hosts that can create instance_type."""
# Note(zhiteng) Currently only Cinder and Nova are using
# this filter, so the resource type is either instance or
# volume.
resource_type = filter_properties.get('resource_type')
if not self._satisfies_extra_specs(host_state.capabilities,
resource_type):
return False
return True

View File

@ -0,0 +1,68 @@
# Copyright (c) 2011 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import operator
# 1. The following operations are supported:
# =, s==, s!=, s>=, s>, s<=, s<, <in>, <or>, ==, !=, >=, <=
# 2. Note that <or> is handled in a different way below.
# 3. If the first word in the extra_specs is not one of the operators,
# it is ignored.
_op_methods = {'=': lambda x, y: float(x) >= float(y),
'<in>': lambda x, y: y in x,
'==': lambda x, y: float(x) == float(y),
'!=': lambda x, y: float(x) != float(y),
'>=': lambda x, y: float(x) >= float(y),
'<=': lambda x, y: float(x) <= float(y),
's==': operator.eq,
's!=': operator.ne,
's<': operator.lt,
's<=': operator.le,
's>': operator.gt,
's>=': operator.ge}
def match(value, req):
words = req.split()
op = method = None
if words:
op = words.pop(0)
method = _op_methods.get(op)
if op != '<or>' and not method:
return value == req
if value is None:
return False
if op == '<or>': # Ex: <or> v1 <or> v2 <or> v3
while True:
if words.pop(0) == value:
return True
if not words:
break
op = words.pop(0) # remove a keyword <or>
if not words:
break
return False
try:
if words and method(value, words[0]):
return True
except ValueError:
pass
return False

View File

@ -0,0 +1,150 @@
# Copyright (c) 2011 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import operator
from cinder.openstack.common import jsonutils
from cinder.openstack.common.scheduler import filters
class JsonFilter(filters.BaseHostFilter):
"""Host Filter to allow simple JSON-based grammar for
selecting hosts.
"""
def _op_compare(self, args, op):
"""Returns True if the specified operator can successfully
compare the first item in the args with all the rest. Will
return False if only one item is in the list.
"""
if len(args) < 2:
return False
if op is operator.contains:
bad = not args[0] in args[1:]
else:
bad = [arg for arg in args[1:]
if not op(args[0], arg)]
return not bool(bad)
def _equals(self, args):
"""First term is == all the other terms."""
return self._op_compare(args, operator.eq)
def _less_than(self, args):
"""First term is < all the other terms."""
return self._op_compare(args, operator.lt)
def _greater_than(self, args):
"""First term is > all the other terms."""
return self._op_compare(args, operator.gt)
def _in(self, args):
"""First term is in set of remaining terms"""
return self._op_compare(args, operator.contains)
def _less_than_equal(self, args):
"""First term is <= all the other terms."""
return self._op_compare(args, operator.le)
def _greater_than_equal(self, args):
"""First term is >= all the other terms."""
return self._op_compare(args, operator.ge)
def _not(self, args):
"""Flip each of the arguments."""
return [not arg for arg in args]
def _or(self, args):
"""True if any arg is True."""
return any(args)
def _and(self, args):
"""True if all args are True."""
return all(args)
commands = {
'=': _equals,
'<': _less_than,
'>': _greater_than,
'in': _in,
'<=': _less_than_equal,
'>=': _greater_than_equal,
'not': _not,
'or': _or,
'and': _and,
}
def _parse_string(self, string, host_state):
"""Strings prefixed with $ are capability lookups in the
form '$variable' where 'variable' is an attribute in the
HostState class. If $variable is a dictionary, you may
use: $variable.dictkey
"""
if not string:
return None
if not string.startswith("$"):
return string
path = string[1:].split(".")
obj = getattr(host_state, path[0], None)
if obj is None:
return None
for item in path[1:]:
obj = obj.get(item, None)
if obj is None:
return None
return obj
def _process_filter(self, query, host_state):
"""Recursively parse the query structure."""
if not query:
return True
cmd = query[0]
method = self.commands[cmd]
cooked_args = []
for arg in query[1:]:
if isinstance(arg, list):
arg = self._process_filter(arg, host_state)
elif isinstance(arg, basestring):
arg = self._parse_string(arg, host_state)
if arg is not None:
cooked_args.append(arg)
result = method(self, cooked_args)
return result
def host_passes(self, host_state, filter_properties):
"""Return a list of hosts that can fulfill the requirements
specified in the query.
"""
# TODO(zhiteng) Add description for filter_properties structure
# and scheduler_hints.
try:
query = filter_properties['scheduler_hints']['query']
except KeyError:
query = None
if not query:
return True
# NOTE(comstud): Not checking capabilities or service for
# enabled/disabled so that a provided json filter can decide
result = self._process_filter(jsonutils.loads(query), host_state)
if isinstance(result, list):
# If any succeeded, include the host
result = any(result)
if result:
# Filter it out.
return True
return False

View File

@ -0,0 +1,91 @@
# Copyright (c) 2011-2012 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Pluggable Weighing support
"""
import inspect
from stevedore import extension
class WeighedObject(object):
"""Object with weight information."""
def __init__(self, obj, weight):
self.obj = obj
self.weight = weight
def __repr__(self):
return "<WeighedObject '%s': %s>" % (self.obj, self.weight)
class BaseWeigher(object):
"""Base class for pluggable weighers."""
def _weight_multiplier(self):
"""How weighted this weigher should be. Normally this would
be overriden in a subclass based on a config value.
"""
return 1.0
def _weigh_object(self, obj, weight_properties):
"""Override in a subclass to specify a weight for a specific
object.
"""
return 0.0
def weigh_objects(self, weighed_obj_list, weight_properties):
"""Weigh multiple objects. Override in a subclass if you need
need access to all objects in order to manipulate weights.
"""
constant = self._weight_multiplier()
for obj in weighed_obj_list:
obj.weight += (constant *
self._weigh_object(obj.obj, weight_properties))
class BaseWeightHandler(object):
object_class = WeighedObject
def __init__(self, weighed_object_type, weight_namespace):
self.namespace = weight_namespace
self.weighed_object_type = weighed_object_type
self.weight_manager = extension.ExtensionManager(weight_namespace)
def _is_correct_class(self, obj):
"""Return whether an object is a class of the correct type and
is not prefixed with an underscore.
"""
return (inspect.isclass(obj) and
not obj.__name__.startswith('_') and
issubclass(obj, self.weighed_object_type))
def get_all_classes(self):
return [x.plugin for x in self.weight_manager
if self._is_correct_class(x.plugin)]
def get_weighed_objects(self, weigher_classes, obj_list,
weighing_properties):
"""Return a sorted (highest score first) list of WeighedObjects."""
if not obj_list:
return []
weighed_objs = [self.object_class(obj, 0.0) for obj in obj_list]
for weigher_cls in weigher_classes:
weigher = weigher_cls()
weigher.weigh_objects(weighed_objs, weighing_properties)
return sorted(weighed_objs, key=lambda x: x.weight, reverse=True)

View File

@ -0,0 +1,45 @@
# Copyright (c) 2011 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Scheduler host weights
"""
from cinder.openstack.common.scheduler import weight
class WeighedHost(weight.WeighedObject):
def to_dict(self):
return {
'weight': self.weight,
'host': self.obj.host,
}
def __repr__(self):
return ("WeighedHost [host: %s, weight: %s]" %
(self.obj.host, self.weight))
class BaseHostWeigher(weight.BaseWeigher):
"""Base class for host weights."""
pass
class HostWeightHandler(weight.BaseWeightHandler):
object_class = WeighedHost
def __init__(self, namespace):
super(HostWeightHandler, self).__init__(BaseHostWeigher, namespace)

View File

@ -0,0 +1,130 @@
# Copyright (c) 2011 Intel Corporation
# Copyright (c) 2011 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The FilterScheduler is for creating volumes.
You can customize this scheduler by specifying your own volume Filters and
Weighing Functions.
"""
import operator
from cinder import exception
from cinder import flags
from cinder.openstack.common import importutils
from cinder.openstack.common import log as logging
from cinder.scheduler import driver
from cinder.scheduler import scheduler_options
FLAGS = flags.FLAGS
LOG = logging.getLogger(__name__)
class FilterScheduler(driver.Scheduler):
"""Scheduler that can be used for filtering and weighing."""
def __init__(self, *args, **kwargs):
super(FilterScheduler, self).__init__(*args, **kwargs)
self.cost_function_cache = None
self.options = scheduler_options.SchedulerOptions()
def schedule(self, context, topic, method, *args, **kwargs):
"""The schedule() contract requires we return the one
best-suited host for this request.
"""
self._schedule(context, topic, *args, **kwargs)
def _get_configuration_options(self):
"""Fetch options dictionary. Broken out for testing."""
return self.options.get_configuration()
def populate_filter_properties(self, request_spec, filter_properties):
"""Stuff things into filter_properties. Can be overridden in a
subclass to add more data.
"""
vol = request_spec['volume_properties']
filter_properties['size'] = vol['size']
filter_properties['availability_zone'] = vol.get('availability_zone')
filter_properties['user_id'] = vol.get('user_id')
filter_properties['metadata'] = vol.get('metadata')
def schedule_create_volume(self, context, request_spec, filter_properties):
weighed_host = self._schedule(context, request_spec,
filter_properties)
if not weighed_host:
raise exception.NoValidHost(reason="")
host = weighed_host.obj.host
volume_id = request_spec['volume_id']
snapshot_id = request_spec['snapshot_id']
image_id = request_spec['image_id']
updated_volume = driver.volume_update_db(context, volume_id, host)
self.volume_rpcapi.create_volume(context, updated_volume, host,
snapshot_id, image_id)
def _schedule(self, context, request_spec, filter_properties=None):
"""Returns a list of hosts that meet the required specs,
ordered by their fitness.
"""
elevated = context.elevated()
volume_properties = request_spec['volume_properties']
# Since Nova is using mixed filters from Oslo and it's own, which
# takes 'resource_XX' and 'instance_XX' as input respectively, copying
# 'instance_XX' to 'resource_XX' will make both filters happy.
resource_properties = volume_properties.copy()
volume_type = request_spec.get("volume_type", None)
resource_type = request_spec.get("volume_type", None)
request_spec.update({'resource_properties': resource_properties})
config_options = self._get_configuration_options()
if filter_properties is None:
filter_properties = {}
filter_properties.update({'context': context,
'request_spec': request_spec,
'config_options': config_options,
'volume_type': volume_type,
'resource_type': resource_type})
self.populate_filter_properties(request_spec,
filter_properties)
# Find our local list of acceptable hosts by filtering and
# weighing our options. we virtually consume resources on
# it so subsequent selections can adjust accordingly.
# Note: remember, we are using an iterator here. So only
# traverse this list once.
hosts = self.host_manager.get_all_host_states(elevated)
# Filter local hosts based on requirements ...
hosts = self.host_manager.get_filtered_hosts(hosts,
filter_properties)
if not hosts:
return None
LOG.debug(_("Filtered %(hosts)s") % locals())
# weighted_host = WeightedHost() ... the best
# host for the job.
weighed_hosts = self.host_manager.get_weighed_hosts(hosts,
filter_properties)
best_host = weighed_hosts[0]
LOG.debug(_("Choosing %(best_host)s") % locals())
best_host.obj.consume_from_volume(volume_properties)
return best_host

View File

@ -0,0 +1,14 @@
# Copyright (c) 2013 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,44 @@
# Copyright (c) 2012 Intel
# Copyright (c) 2012 OpenStack, LLC.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import math
from cinder.openstack.common import log as logging
from cinder.openstack.common.scheduler import filters
LOG = logging.getLogger(__name__)
class CapacityFilter(filters.BaseHostFilter):
"""CapacityFilter filters based on volume host's capacity utilization."""
def host_passes(self, host_state, filter_properties):
"""Return True if host has sufficient capacity."""
volume_size = filter_properties.get('size')
if not host_state.free_capacity_gb:
# Fail Safe
LOG.warning(_("Free capacity not set;"
"volume node info collection broken."))
return False
reserved = float(host_state.reserved_percentage) / 100
free = math.floor(host_state.free_capacity_gb * (1 - reserved))
return free >= volume_size

View File

@ -17,20 +17,261 @@
Manage hosts in the current zone.
"""
# FIXME(ja): this code was written only for compute. re-implement for volumes
import UserDict
from cinder import db
from cinder import exception
from cinder import flags
from cinder.openstack.common import cfg
from cinder.openstack.common import log as logging
from cinder.openstack.common.scheduler import filters
from cinder.openstack.common.scheduler import weights
from cinder.openstack.common import timeutils
from cinder import utils
host_manager_opts = [
cfg.ListOpt('scheduler_default_filters',
default=[
'AvailabilityZoneFilter',
'CapacityFilter',
'CapabilitiesFilter'
],
help='Which filter class names to use for filtering hosts '
'when not specified in the request.'),
cfg.ListOpt('scheduler_default_weighers',
default=[
'CapacityWeigher'
],
help='Which weigher class names to use for weighing hosts.')
]
FLAGS = flags.FLAGS
FLAGS.register_opts(host_manager_opts)
LOG = logging.getLogger(__name__)
class ReadOnlyDict(UserDict.IterableUserDict):
"""A read-only dict."""
def __init__(self, source=None):
self.data = {}
self.update(source)
def __setitem__(self, key, item):
raise TypeError
def __delitem__(self, key):
raise TypeError
def clear(self):
raise TypeError
def pop(self, key, *args):
raise TypeError
def popitem(self):
raise TypeError
def update(self, source=None):
if source is None:
return
elif isinstance(source, UserDict.UserDict):
self.data = source.data
elif isinstance(source, type({})):
self.data = source
else:
raise TypeError
class HostState(object):
pass
"""Mutable and immutable information tracked for a host."""
def __init__(self, host, capabilities=None, service=None):
self.host = host
self.update_capabilities(capabilities, service)
self.volume_backend_name = None
self.vendor_name = None
self.driver_version = 0
self.storage_protocol = None
self.QoS_support = False
# Mutable available resources.
# These will change as resources are virtually "consumed".
self.total_capacity_gb = 0
self.free_capacity_gb = 0
self.reserved_percentage = 0
self.updated = None
def update_capabilities(self, capabilities=None, service=None):
# Read-only capability dicts
if capabilities is None:
capabilities = {}
self.capabilities = ReadOnlyDict(capabilities)
if service is None:
service = {}
self.service = ReadOnlyDict(service)
def update_from_volume_capability(self, capability):
"""Update information about a host from its volume_node info."""
if self.updated and self.updated > capability['timestamp']:
return
if capability:
self.volume_backend = capability.get('volume_backend_name', None)
self.vendor_name = capability.get('vendor_name', None)
self.driver_version = capability.get('driver_version', None)
self.storage_protocol = capability.get('storage_protocol', None)
self.QoS_support = capability.get('QoS_support', False)
self.total_capacity_gb = capability['total_capacity_gb']
self.free_capacity_gb = capability['free_capacity_gb']
self.reserved_percentage = capability['reserved_percentage']
self.updated = capability['timestamp']
def consume_from_volume(self, volume):
"""Incrementally update host state from an volume"""
volume_gb = volume['size']
self.free_capacity_gb -= volume_gb
self.updated = timeutils.utcnow()
def __repr__(self):
return ("host '%s': free_capacity_gb: %s" %
(self.host, self.free_capacity_gb))
class HostManager(object):
"""Base HostManager class."""
def get_host_list(self, *args):
pass
host_state_cls = HostState
def update_service_capabilities(self, *args):
pass
def __init__(self):
self.service_states = {} # { <host>: {<service>: {cap k : v}}}
self.host_state_map = {}
self.filter_handler = filters.HostFilterHandler('cinder.scheduler.'
'filters')
self.filter_classes = self.filter_handler.get_all_classes()
self.weight_handler = weights.HostWeightHandler('cinder.scheduler.'
'weights')
self.weight_classes = self.weight_handler.get_all_classes()
def get_service_capabilities(self, *args):
pass
def _choose_host_filters(self, filter_cls_names):
"""Since the caller may specify which filters to use we need
to have an authoritative list of what is permissible. This
function checks the filter names against a predefined set
of acceptable filters.
"""
if filter_cls_names is None:
filter_cls_names = FLAGS.scheduler_default_filters
if not isinstance(filter_cls_names, (list, tuple)):
filter_cls_names = [filter_cls_names]
good_filters = []
bad_filters = []
for filter_name in filter_cls_names:
found_class = False
for cls in self.filter_classes:
if cls.__name__ == filter_name:
found_class = True
good_filters.append(cls)
break
if not found_class:
bad_filters.append(filter_name)
if bad_filters:
msg = ", ".join(bad_filters)
raise exception.SchedulerHostFilterNotFound(filter_name=msg)
return good_filters
def _choose_host_weighers(self, weight_cls_names):
"""Since the caller may specify which weighers to use, we need
to have an authoritative list of what is permissible. This
function checks the weigher names against a predefined set
of acceptable weighers.
"""
if weight_cls_names is None:
weight_cls_names = FLAGS.scheduler_default_weighers
if not isinstance(weight_cls_names, (list, tuple)):
weight_cls_names = [weight_cls_names]
good_weighers = []
bad_weighers = []
for weigher_name in weight_cls_names:
found_class = False
for cls in self.weight_classes:
if cls.__name__ == weigher_name:
good_weighers.append(cls)
found_class = True
break
if not found_class:
bad_weighers.append(weigher_name)
if bad_weighers:
msg = ", ".join(bad_weighers)
raise exception.SchedulerHostWeigherNotFound(weigher_name=msg)
return good_weighers
def get_filtered_hosts(self, hosts, filter_properties,
filter_class_names=None):
"""Filter hosts and return only ones passing all filters"""
filter_classes = self._choose_host_filters(filter_class_names)
return self.filter_handler.get_filtered_objects(filter_classes,
hosts,
filter_properties)
def get_weighed_hosts(self, hosts, weight_properties,
weigher_class_names=None):
"""Weigh the hosts"""
weigher_classes = self._choose_host_weighers(weigher_class_names)
return self.weight_handler.get_weighed_objects(weigher_classes,
hosts,
weight_properties)
def update_service_capabilities(self, service_name, host, capabilities):
"""Update the per-service capabilities based on this notification."""
if service_name != 'volume':
LOG.debug(_('Ignoring %(service_name)s service update '
'from %(host)s'), locals())
return
LOG.debug(_("Received %(service_name)s service update from "
"%(host)s.") % locals())
# Copy the capabilities, so we don't modify the original dict
capab_copy = dict(capabilities)
capab_copy["timestamp"] = timeutils.utcnow() # Reported time
self.service_states[host] = capab_copy
def get_all_host_states(self, context):
"""Returns a dict of all the hosts the HostManager
knows about. Also, each of the consumable resources in HostState
are pre-populated and adjusted based on data in the db.
For example:
{'192.168.1.100': HostState(), ...}
"""
# Get resource usage across the available volume nodes:
topic = FLAGS.volume_topic
volume_services = db.service_get_all_by_topic(context, topic)
for service in volume_services:
if not utils.service_is_up(service) or service['disabled']:
LOG.warn(_("service is down or disabled."))
continue
host = service['host']
capabilities = self.service_states.get(host, None)
host_state = self.host_state_map.get(host)
if host_state:
# copy capabilities to host_state.capabilities
host_state.update_capabilities(capabilities,
dict(service.iteritems()))
else:
host_state = self.host_state_cls(host,
capabilities=capabilities,
service=
dict(service.iteritems()))
self.host_state_map[host] = host_state
# update host_state
host_state.update_from_volume_capability(capabilities)
return self.host_state_map.itervalues()

View File

@ -21,8 +21,7 @@
Scheduler Service
"""
import functools
from cinder import context
from cinder import db
from cinder import exception
from cinder import flags
@ -32,14 +31,15 @@ from cinder.openstack.common import excutils
from cinder.openstack.common import importutils
from cinder.openstack.common import log as logging
from cinder.openstack.common.notifier import api as notifier
from cinder.volume import rpcapi as volume_rpcapi
LOG = logging.getLogger(__name__)
scheduler_driver_opt = cfg.StrOpt(
'scheduler_driver',
default='cinder.scheduler.simple.SimpleScheduler',
help='Default driver to use for the scheduler')
scheduler_driver_opt = cfg.StrOpt('scheduler_driver',
default='cinder.scheduler.simple.'
'SimpleScheduler',
help='Default scheduler driver to use')
FLAGS = flags.FLAGS
FLAGS.register_opt(scheduler_driver_opt)
@ -56,6 +56,10 @@ class SchedulerManager(manager.Manager):
self.driver = importutils.import_object(scheduler_driver)
super(SchedulerManager, self).__init__(*args, **kwargs)
def init_host(self):
ctxt = context.get_admin_context()
self.request_service_capabilities(ctxt)
def get_host_list(self, context):
"""Get a list of hosts from the HostManager."""
return self.driver.get_host_list()
@ -130,3 +134,6 @@ class SchedulerManager(manager.Manager):
notifier.notify(context, notifier.publisher_id("scheduler"),
'scheduler.' + method, notifier.ERROR, payload)
def request_service_capabilities(self, context):
volume_rpcapi.VolumeAPI().publish_service_capabilities(context)

View File

@ -0,0 +1,105 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
SchedulerOptions monitors a local .json file for changes and loads
it if needed. This file is converted to a data structure and passed
into the filtering and weighing functions which can use it for
dynamic configuration.
"""
import datetime
import json
import os
from cinder import flags
from cinder.openstack.common import cfg
from cinder.openstack.common import log as logging
from cinder.openstack.common import timeutils
scheduler_json_config_location_opt = cfg.StrOpt(
'scheduler_json_config_location',
default='',
help='Absolute path to scheduler configuration JSON file.')
FLAGS = flags.FLAGS
FLAGS.register_opt(scheduler_json_config_location_opt)
LOG = logging.getLogger(__name__)
class SchedulerOptions(object):
"""
SchedulerOptions monitors a local .json file for changes and loads it
if needed. This file is converted to a data structure and passed into
the filtering and weighing functions which can use it for dynamic
configuration.
"""
def __init__(self):
super(SchedulerOptions, self).__init__()
self.data = {}
self.last_modified = None
self.last_checked = None
def _get_file_handle(self, filename):
"""Get file handle. Broken out for testing."""
return open(filename)
def _get_file_timestamp(self, filename):
"""Get the last modified datetime. Broken out for testing."""
try:
return os.path.getmtime(filename)
except os.error, e:
LOG.exception(_("Could not stat scheduler options file "
"%(filename)s: '%(e)s'"), locals())
raise
def _load_file(self, handle):
"""Decode the JSON file. Broken out for testing."""
try:
return json.load(handle)
except ValueError, e:
LOG.exception(_("Could not decode scheduler options: "
"'%(e)s'") % locals())
return {}
def _get_time_now(self):
"""Get current UTC. Broken out for testing."""
return timeutils.utcnow()
def get_configuration(self, filename=None):
"""Check the json file for changes and load it if needed."""
if not filename:
filename = FLAGS.scheduler_json_config_location
if not filename:
return self.data
if self.last_checked:
now = self._get_time_now()
if now - self.last_checked < datetime.timedelta(minutes=5):
return self.data
last_modified = self._get_file_timestamp(filename)
if (not last_modified or not self.last_modified or
last_modified > self.last_modified):
self.data = self._load_file(self._get_file_handle(filename))
self.last_modified = last_modified
if not self.data:
self.data = {}
return self.data

View File

@ -0,0 +1,14 @@
# Copyright (c) 2013 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,50 @@
# Copyright (c) 2012 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Capacity Weigher. Weigh hosts by their available capacity.
The default is to spread volumes across all hosts evenly. If you prefer
stacking, you can set the 'capacity_weight_multiplier' option to a negative
number and the weighing has the opposite effect of the default.
"""
import math
from cinder import flags
from cinder.openstack.common import cfg
from cinder.openstack.common.scheduler import weights
capacity_weight_opts = [
cfg.FloatOpt('capacity_weight_multiplier',
default=1.0,
help='Multiplier used for weighing volume capacity. '
'Negative numbers mean to stack vs spread.'),
]
FLAGS = flags.FLAGS
FLAGS.register_opts(capacity_weight_opts)
class CapacityWeigher(weights.BaseHostWeigher):
def _weight_multiplier(self):
"""Override the weight multiplier."""
return FLAGS.capacity_weight_multiplier
def _weigh_object(self, host_state, weight_properties):
"""Higher weights win. We want spreading to be the default."""
reserved = float(host_state.reserved_percentage) / 100
free = math.floor(host_state.free_capacity_gb * (1 - reserved))
return free

View File

@ -16,45 +16,68 @@
Fakes For Scheduler tests.
"""
import mox
from cinder import db
from cinder.openstack.common import timeutils
from cinder.scheduler import filter_scheduler
from cinder.scheduler import host_manager
class FakeHostManager(host_manager.HostManager):
"""host1: free_ram_mb=1024-512-512=0, free_disk_gb=1024-512-512=0
host2: free_ram_mb=2048-512=1536 free_disk_gb=2048-512=1536
host3: free_ram_mb=4096-1024=3072 free_disk_gb=4096-1024=3072
host4: free_ram_mb=8192 free_disk_gb=8192"""
VOLUME_SERVICES = [
dict(id=1, host='host1', topic='volume', disabled=False,
availability_zone='zone1', updated_at=timeutils.utcnow()),
dict(id=2, host='host2', topic='volume', disabled=False,
availability_zone='zone1', updated_at=timeutils.utcnow()),
dict(id=3, host='host3', topic='volume', disabled=False,
availability_zone='zone2', updated_at=timeutils.utcnow()),
dict(id=4, host='host4', topic='volume', disabled=False,
availability_zone='zone3', updated_at=timeutils.utcnow()),
# service on host5 is disabled
dict(id=5, host='host5', topic='volume', disabled=True,
availability_zone='zone4', updated_at=timeutils.utcnow()),
]
class FakeFilterScheduler(filter_scheduler.FilterScheduler):
def __init__(self, *args, **kwargs):
super(FakeFilterScheduler, self).__init__(*args, **kwargs)
self.host_manager = host_manager.HostManager()
class FakeHostManager(host_manager.HostManager):
def __init__(self):
super(FakeHostManager, self).__init__()
self.service_states = {
'host1': {
'compute': {'host_memory_free': 1073741824},
},
'host2': {
'compute': {'host_memory_free': 2147483648},
},
'host3': {
'compute': {'host_memory_free': 3221225472},
},
'host4': {
'compute': {'host_memory_free': 999999999},
},
'host1': {'total_capacity_gb': 1024,
'free_capacity_gb': 1024,
'reserved_percentage': 10,
'timestamp': None},
'host2': {'total_capacity_gb': 2048,
'free_capacity_gb': 300,
'reserved_percentage': 10,
'timestamp': None},
'host3': {'total_capacity_gb': 512,
'free_capacity_gb': 512,
'reserved_percentage': 0,
'timestamp': None},
'host4': {'total_capacity_gb': 2048,
'free_capacity_gb': 200,
'reserved_percentage': 5,
'timestamp': None},
}
def get_host_list_from_db(self, context):
return [
('host1', dict(free_disk_gb=1024, free_ram_mb=1024)),
('host2', dict(free_disk_gb=2048, free_ram_mb=2048)),
('host3', dict(free_disk_gb=4096, free_ram_mb=4096)),
('host4', dict(free_disk_gb=8192, free_ram_mb=8192)),
]
class FakeHostState(host_manager.HostState):
def __init__(self, host, topic, attribute_dict):
super(FakeHostState, self).__init__(host, topic)
def __init__(self, host, attribute_dict):
super(FakeHostState, self).__init__(host)
for (key, val) in attribute_dict.iteritems():
setattr(self, key, val)
def mox_host_manager_db_calls(mock, context):
mock.StubOutWithMock(db, 'service_get_all_by_topic')
db.service_get_all_by_topic(mox.IgnoreArg(),
mox.IgnoreArg()).AndReturn(VOLUME_SERVICES)

View File

@ -0,0 +1,87 @@
# Copyright 2011-2012 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For Capacity Weigher.
"""
from cinder import context
from cinder.openstack.common.scheduler.weights import HostWeightHandler
from cinder import test
from cinder.tests.scheduler import fakes
class CapacityWeigherTestCase(test.TestCase):
def setUp(self):
super(CapacityWeigherTestCase, self).setUp()
self.host_manager = fakes.FakeHostManager()
self.weight_handler = HostWeightHandler('cinder.scheduler.weights')
self.weight_classes = self.weight_handler.get_all_classes()
def _get_weighed_host(self, hosts, weight_properties=None):
if weight_properties is None:
weight_properties = {}
return self.weight_handler.get_weighed_objects(self.weight_classes,
hosts,
weight_properties)[0]
def _get_all_hosts(self):
ctxt = context.get_admin_context()
fakes.mox_host_manager_db_calls(self.mox, ctxt)
self.mox.ReplayAll()
host_states = self.host_manager.get_all_host_states(ctxt)
self.mox.VerifyAll()
self.mox.ResetAll()
return host_states
def test_default_of_spreading_first(self):
hostinfo_list = self._get_all_hosts()
# host1: free_capacity_gb=1024, free=1024*(1-0.1)
# host2: free_capacity_gb=300, free=300*(1-0.1)
# host3: free_capacity_gb=512, free=512
# host4: free_capacity_gb=200, free=200*(1-0.05)
# so, host1 should win:
weighed_host = self._get_weighed_host(hostinfo_list)
self.assertEqual(weighed_host.weight, 921.0)
self.assertEqual(weighed_host.obj.host, 'host1')
def test_capacity_weight_multiplier1(self):
self.flags(capacity_weight_multiplier=-1.0)
hostinfo_list = self._get_all_hosts()
# host1: free_capacity_gb=1024, free=-1024*(1-0.1)
# host2: free_capacity_gb=300, free=-300*(1-0.1)
# host3: free_capacity_gb=512, free=-512
# host4: free_capacity_gb=200, free=-200*(1-0.05)
# so, host4 should win:
weighed_host = self._get_weighed_host(hostinfo_list)
self.assertEqual(weighed_host.weight, -190.0)
self.assertEqual(weighed_host.obj.host, 'host4')
def test_capacity_weight_multiplier2(self):
self.flags(capacity_weight_multiplier=2.0)
hostinfo_list = self._get_all_hosts()
# host1: free_capacity_gb=1024, free=1024*(1-0.1)*2
# host2: free_capacity_gb=300, free=300*(1-0.1)*2
# host3: free_capacity_gb=512, free=512*2
# host4: free_capacity_gb=200, free=200*(1-0.05)*2
# so, host1 should win:
weighed_host = self._get_weighed_host(hostinfo_list)
self.assertEqual(weighed_host.weight, 921.0 * 2)
self.assertEqual(weighed_host.obj.host, 'host1')

View File

@ -0,0 +1,107 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For Filter Scheduler.
"""
import mox
from cinder import context
from cinder import exception
from cinder.openstack.common.scheduler import weights
from cinder.scheduler import driver
from cinder.scheduler import filter_scheduler
from cinder.scheduler import host_manager
from cinder.tests.scheduler import fakes
from cinder.tests.scheduler import test_scheduler
def fake_get_filtered_hosts(hosts, filter_properties):
return list(hosts)
class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
"""Test case for Filter Scheduler."""
driver_cls = filter_scheduler.FilterScheduler
def test_create_volume_no_hosts(self):
"""
Ensure empty hosts & child_zones result in NoValidHosts exception.
"""
def _fake_empty_call_zone_method(*args, **kwargs):
return []
sched = fakes.FakeFilterScheduler()
fake_context = context.RequestContext('user', 'project')
request_spec = {'volume_properties': {'project_id': 1,
'size': 1},
'volume_type': {'name': 'LVM_iSCSI'},
'volume_id': ['fake-id1']}
self.assertRaises(exception.NoValidHost, sched.schedule_create_volume,
fake_context, request_spec, None)
def test_create_volume_non_admin(self):
"""Test creating an instance locally using run_instance, passing
a non-admin context. DB actions should work."""
self.was_admin = False
def fake_get(context, *args, **kwargs):
# make sure this is called with admin context, even though
# we're using user context below
self.was_admin = context.is_admin
return {}
sched = fakes.FakeFilterScheduler()
self.stubs.Set(sched.host_manager, 'get_all_host_states', fake_get)
fake_context = context.RequestContext('user', 'project')
request_spec = {'volume_properties': {'project_id': 1,
'size': 1},
'volume_type': {'name': 'LVM_iSCSI'},
'volume_id': ['fake-id1']}
self.assertRaises(exception.NoValidHost, sched.schedule_create_volume,
fake_context, request_spec, None)
self.assertTrue(self.was_admin)
def test_schedule_happy_day(self):
"""Make sure there's nothing glaringly wrong with _schedule()
by doing a happy day pass through."""
self.next_weight = 1.0
def _fake_weigh_objects(_self, functions, hosts, options):
self.next_weight += 2.0
host_state = hosts[0]
return [weights.WeighedHost(host_state, self.next_weight)]
sched = fakes.FakeFilterScheduler()
fake_context = context.RequestContext('user', 'project',
is_admin=True)
self.stubs.Set(sched.host_manager, 'get_filtered_hosts',
fake_get_filtered_hosts)
self.stubs.Set(weights.HostWeightHandler,
'get_weighed_objects', _fake_weigh_objects)
fakes.mox_host_manager_db_calls(self.mox, fake_context)
request_spec = {'volume_type': {'name': 'LVM_iSCSI'},
'volume_properties': {'project_id': 1,
'size': 1}}
self.mox.ReplayAll()
weighed_host = sched._schedule(fake_context, request_spec, {})
self.assertTrue(weighed_host.obj is not None)

View File

@ -0,0 +1,99 @@
# Copyright 2011 OpenStack LLC. # All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For Scheduler Host Filters.
"""
import httplib
import stubout
from cinder import context
from cinder import db
from cinder import exception
from cinder.openstack.common import jsonutils
from cinder.openstack.common.scheduler import filters
from cinder import test
from cinder.tests.scheduler import fakes
from cinder import utils
DATA = ''
def stub_out_https_backend(stubs):
"""
Stubs out the httplib.HTTPRequest.getresponse to return
faked-out data instead of grabbing actual contents of a resource
The stubbed getresponse() returns an iterator over
the data "I am a teapot, short and stout\n"
:param stubs: Set of stubout stubs
"""
class FakeHTTPResponse(object):
def read(self):
return DATA
def fake_do_request(self, *args, **kwargs):
return httplib.OK, FakeHTTPResponse()
class HostFiltersTestCase(test.TestCase):
"""Test case for host filters."""
def setUp(self):
super(HostFiltersTestCase, self).setUp()
self.stubs = stubout.StubOutForTesting()
stub_out_https_backend(self.stubs)
self.context = context.RequestContext('fake', 'fake')
self.json_query = jsonutils.dumps(
['and', ['>=', '$free_capacity_gb', 1024],
['>=', '$total_capacity_gb', 10 * 1024]])
# This has a side effect of testing 'get_filter_classes'
# when specifying a method (in this case, our standard filters)
filter_handler = filters.HostFilterHandler('cinder.scheduler.filters')
classes = filter_handler.get_all_classes()
self.class_map = {}
for cls in classes:
self.class_map[cls.__name__] = cls
def _stub_service_is_up(self, ret_value):
def fake_service_is_up(service):
return ret_value
self.stubs.Set(utils, 'service_is_up', fake_service_is_up)
def test_capacity_filter_passes(self):
self._stub_service_is_up(True)
filt_cls = self.class_map['CapacityFilter']()
filter_properties = {'size': 100}
service = {'disabled': False}
host = fakes.FakeHostState('host1',
{'free_capacity_gb': 200,
'updated_at': None,
'service': service})
self.assertTrue(filt_cls.host_passes(host, filter_properties))
def test_capacity_filter_fails(self):
self._stub_service_is_up(True)
filt_cls = self.class_map['CapacityFilter']()
filter_properties = {'size': 100}
service = {'disabled': False}
host = fakes.FakeHostState('host1',
{'free_capacity_gb': 120,
'reserved_percentage': 20,
'updated_at': None,
'service': service})
self.assertFalse(filt_cls.host_passes(host, filter_properties))

View File

@ -0,0 +1,176 @@
# Copyright (c) 2011 OpenStack, LLC
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For HostManager
"""
from cinder import db
from cinder import exception
from cinder import flags
from cinder.openstack.common.scheduler import filters
from cinder.openstack.common import timeutils
from cinder.scheduler import host_manager
from cinder import test
from cinder.tests.scheduler import fakes
FLAGS = flags.FLAGS
class FakeFilterClass1(filters.BaseHostFilter):
def host_passes(self, host_state, filter_properties):
pass
class FakeFilterClass2(filters.BaseHostFilter):
def host_passes(self, host_state, filter_properties):
pass
class HostManagerTestCase(test.TestCase):
"""Test case for HostManager class"""
def setUp(self):
super(HostManagerTestCase, self).setUp()
self.host_manager = host_manager.HostManager()
self.fake_hosts = [host_manager.HostState('fake_host%s' % x)
for x in xrange(1, 5)]
def test_choose_host_filters_not_found(self):
self.flags(scheduler_default_filters='FakeFilterClass3')
self.host_manager.filter_classes = [FakeFilterClass1,
FakeFilterClass2]
self.assertRaises(exception.SchedulerHostFilterNotFound,
self.host_manager._choose_host_filters, None)
def test_choose_host_filters(self):
self.flags(scheduler_default_filters=['FakeFilterClass2'])
self.host_manager.filter_classes = [FakeFilterClass1,
FakeFilterClass2]
# Test 'volume' returns 1 correct function
filter_classes = self.host_manager._choose_host_filters(None)
self.assertEqual(len(filter_classes), 1)
self.assertEqual(filter_classes[0].__name__, 'FakeFilterClass2')
def _mock_get_filtered_hosts(self, info, specified_filters=None):
self.mox.StubOutWithMock(self.host_manager, '_choose_host_filters')
info['got_objs'] = []
info['got_fprops'] = []
def fake_filter_one(_self, obj, filter_props):
info['got_objs'].append(obj)
info['got_fprops'].append(filter_props)
return True
self.stubs.Set(FakeFilterClass1, '_filter_one', fake_filter_one)
self.host_manager._choose_host_filters(specified_filters).AndReturn(
[FakeFilterClass1])
def _verify_result(self, info, result):
for x in info['got_fprops']:
self.assertEqual(x, info['expected_fprops'])
self.assertEqual(set(info['expected_objs']), set(info['got_objs']))
self.assertEqual(set(result), set(info['got_objs']))
def test_get_filtered_hosts(self):
fake_properties = {'moo': 1, 'cow': 2}
info = {'expected_objs': self.fake_hosts,
'expected_fprops': fake_properties}
self._mock_get_filtered_hosts(info)
self.mox.ReplayAll()
result = self.host_manager.get_filtered_hosts(self.fake_hosts,
fake_properties)
self._verify_result(info, result)
def test_update_service_capabilities(self):
service_states = self.host_manager.service_states
self.assertDictMatch(service_states, {})
self.mox.StubOutWithMock(timeutils, 'utcnow')
timeutils.utcnow().AndReturn(31337)
timeutils.utcnow().AndReturn(31338)
timeutils.utcnow().AndReturn(31339)
host1_volume_capabs = dict(free_capacity_gb=4321, timestamp=1)
host2_volume_capabs = dict(free_capacity_gb=5432, timestamp=1)
host3_volume_capabs = dict(free_capacity_gb=6543, timestamp=1)
self.mox.ReplayAll()
service_name = 'volume'
self.host_manager.update_service_capabilities(service_name, 'host1',
host1_volume_capabs)
self.host_manager.update_service_capabilities(service_name, 'host2',
host2_volume_capabs)
self.host_manager.update_service_capabilities(service_name, 'host3',
host3_volume_capabs)
# Make sure dictionary isn't re-assigned
self.assertEqual(self.host_manager.service_states, service_states)
# Make sure original dictionary wasn't copied
self.assertEqual(host1_volume_capabs['timestamp'], 1)
host1_volume_capabs['timestamp'] = 31337
host2_volume_capabs['timestamp'] = 31338
host3_volume_capabs['timestamp'] = 31339
expected = {'host1': host1_volume_capabs,
'host2': host2_volume_capabs,
'host3': host3_volume_capabs}
self.assertDictMatch(service_states, expected)
def test_get_all_host_states(self):
context = 'fake_context'
topic = FLAGS.volume_topic
self.mox.StubOutWithMock(db, 'service_get_all_by_topic')
self.mox.StubOutWithMock(host_manager.LOG, 'warn')
ret_services = fakes.VOLUME_SERVICES
db.service_get_all_by_topic(context, topic).AndReturn(ret_services)
# Disabled service
host_manager.LOG.warn("service is down or disabled.")
self.mox.ReplayAll()
self.host_manager.get_all_host_states(context)
host_state_map = self.host_manager.host_state_map
self.assertEqual(len(host_state_map), 4)
# Check that service is up
for i in xrange(4):
volume_node = fakes.VOLUME_SERVICES[i]
host = volume_node['host']
self.assertEqual(host_state_map[host].service,
volume_node)
class HostStateTestCase(test.TestCase):
"""Test case for HostState class"""
def test_update_from_volume_capability(self):
fake_host = host_manager.HostState('host1')
self.assertEqual(fake_host.free_capacity_gb, 0)
volume_capability = {'total_capacity_gb': 1024,
'free_capacity_gb': 512,
'reserved_percentage': 0,
'timestamp': None}
fake_host.update_from_volume_capability(volume_capability)
self.assertEqual(fake_host.free_capacity_gb, 512)

View File

@ -0,0 +1,138 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For PickledScheduler.
"""
import datetime
import StringIO
from cinder.openstack.common import jsonutils
from cinder.scheduler import scheduler_options
from cinder import test
class FakeSchedulerOptions(scheduler_options.SchedulerOptions):
def __init__(self, last_checked, now, file_old, file_now, data, filedata):
super(FakeSchedulerOptions, self).__init__()
# Change internals ...
self.last_modified = file_old
self.last_checked = last_checked
self.data = data
# For overrides ...
self._time_now = now
self._file_now = file_now
self._file_data = filedata
self.file_was_loaded = False
def _get_file_timestamp(self, filename):
return self._file_now
def _get_file_handle(self, filename):
self.file_was_loaded = True
return StringIO.StringIO(self._file_data)
def _get_time_now(self):
return self._time_now
class SchedulerOptionsTestCase(test.TestCase):
def test_get_configuration_first_time_no_flag(self):
last_checked = None
now = datetime.datetime(2012, 1, 1, 1, 1, 1)
file_old = None
file_now = datetime.datetime(2012, 1, 1, 1, 1, 1)
data = dict(a=1, b=2, c=3)
jdata = jsonutils.dumps(data)
fake = FakeSchedulerOptions(last_checked, now, file_old, file_now,
{}, jdata)
self.assertEquals({}, fake.get_configuration())
self.assertFalse(fake.file_was_loaded)
def test_get_configuration_first_time_empty_file(self):
last_checked = None
now = datetime.datetime(2012, 1, 1, 1, 1, 1)
file_old = None
file_now = datetime.datetime(2012, 1, 1, 1, 1, 1)
data = dict(a=1, b=2, c=3)
jdata = ""
fake = FakeSchedulerOptions(last_checked, now, file_old, file_now,
{}, jdata)
self.assertEquals({}, fake.get_configuration('foo.json'))
self.assertTrue(fake.file_was_loaded)
def test_get_configuration_first_time_happy_day(self):
last_checked = None
now = datetime.datetime(2012, 1, 1, 1, 1, 1)
file_old = None
file_now = datetime.datetime(2012, 1, 1, 1, 1, 1)
data = dict(a=1, b=2, c=3)
jdata = jsonutils.dumps(data)
fake = FakeSchedulerOptions(last_checked, now, file_old, file_now,
{}, jdata)
self.assertEquals(data, fake.get_configuration('foo.json'))
self.assertTrue(fake.file_was_loaded)
def test_get_configuration_second_time_no_change(self):
last_checked = datetime.datetime(2011, 1, 1, 1, 1, 1)
now = datetime.datetime(2012, 1, 1, 1, 1, 1)
file_old = datetime.datetime(2012, 1, 1, 1, 1, 1)
file_now = datetime.datetime(2012, 1, 1, 1, 1, 1)
data = dict(a=1, b=2, c=3)
jdata = jsonutils.dumps(data)
fake = FakeSchedulerOptions(last_checked, now, file_old, file_now,
data, jdata)
self.assertEquals(data, fake.get_configuration('foo.json'))
self.assertFalse(fake.file_was_loaded)
def test_get_configuration_second_time_too_fast(self):
last_checked = datetime.datetime(2011, 1, 1, 1, 1, 1)
now = datetime.datetime(2011, 1, 1, 1, 1, 2)
file_old = datetime.datetime(2012, 1, 1, 1, 1, 1)
file_now = datetime.datetime(2013, 1, 1, 1, 1, 1)
old_data = dict(a=1, b=2, c=3)
data = dict(a=11, b=12, c=13)
jdata = jsonutils.dumps(data)
fake = FakeSchedulerOptions(last_checked, now, file_old, file_now,
old_data, jdata)
self.assertEquals(old_data, fake.get_configuration('foo.json'))
self.assertFalse(fake.file_was_loaded)
def test_get_configuration_second_time_change(self):
last_checked = datetime.datetime(2011, 1, 1, 1, 1, 1)
now = datetime.datetime(2012, 1, 1, 1, 1, 1)
file_old = datetime.datetime(2012, 1, 1, 1, 1, 1)
file_now = datetime.datetime(2013, 1, 1, 1, 1, 1)
old_data = dict(a=1, b=2, c=3)
data = dict(a=11, b=12, c=13)
jdata = jsonutils.dumps(data)
fake = FakeSchedulerOptions(last_checked, now, file_old, file_now,
old_data, jdata)
self.assertEquals(data, fake.get_configuration('foo.json'))
self.assertTrue(fake.file_was_loaded)

View File

@ -30,6 +30,7 @@ from cinder import flags
from cinder.image import image_utils
from cinder.openstack.common import cfg
from cinder.openstack.common import log as logging
from cinder.openstack.common import timeutils
from cinder import utils
from cinder.volume import iscsi
@ -61,7 +62,11 @@ volume_opts = [
help='use this ip for iscsi'),
cfg.IntOpt('iscsi_port',
default=3260,
help='The port that the iSCSI daemon is listening on'), ]
help='The port that the iSCSI daemon is listening on'),
cfg.IntOpt('reserved_percentage',
default=0,
help='The percentage of backend capacity is reserved'),
]
FLAGS = flags.FLAGS
FLAGS.register_opts(volume_opts)
@ -73,6 +78,7 @@ class VolumeDriver(object):
# NOTE(vish): db is set by Manager
self.db = None
self.set_execute(execute)
self._stats = {}
def set_execute(self, execute):
self._execute = execute
@ -619,6 +625,49 @@ class ISCSIDriver(VolumeDriver):
def terminate_connection(self, volume, connector, **kwargs):
pass
def get_volume_stats(self, refresh=False):
"""Get volume status.
If 'refresh' is True, run update the stats first."""
if refresh:
self._update_volume_status()
return self._stats
def _update_volume_status(self):
"""Retrieve status info from volume group."""
LOG.debug(_("Updating volume status"))
data = {}
# Note(zhiteng): These information are driver/backend specific,
# each driver may define these values in its own config options
# or fetch from driver specific configuration file.
data["volume_backend_name"] = 'LVM_iSCSI'
data["vendor_name"] = 'Open Source'
data["driver_version"] = '1.0'
data["storage_protocol"] = 'iSCSI'
data['total_capacity_gb'] = 0
data['free_capacity_gb'] = 0
data['reserved_percentage'] = FLAGS.reserved_percentage
data['QoS_support'] = False
try:
out, err = self._execute('vgs', '--noheadings', '--nosuffix',
'--unit=G', '-o', 'name,size,free',
FLAGS.volume_group, run_as_root=True)
except exception.ProcessExecutionError as exc:
LOG.error(_("Error retrieving volume status: "), exc.stderr)
out = False
if out:
volume = out.split()
data['total_capacity_gb'] = float(volume[1])
data['free_capacity_gb'] = float(volume[2])
self._stats = data
def copy_image_to_volume(self, context, volume, image_service, image_id):
"""Fetch the image from image_service and write it to the volume."""
image_utils.fetch_to_raw(context,

View File

@ -61,9 +61,7 @@ volume_manager_opts = [
cfg.StrOpt('volume_driver',
default='cinder.volume.driver.ISCSIDriver',
help='Driver to use for volume creation'),
cfg.BoolOpt('volume_force_update_capabilities',
default=False,
help='if True will force update capabilities on each check'), ]
]
FLAGS = flags.FLAGS
FLAGS.register_opts(volume_manager_opts)
@ -103,7 +101,7 @@ MAPPING = {
class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""
RPC_API_VERSION = '1.1'
RPC_API_VERSION = '1.2'
def __init__(self, volume_driver=None, *args, **kwargs):
"""Load the driver from the one specified in args, or from flags."""
@ -120,7 +118,6 @@ class VolumeManager(manager.SchedulerDependentManager):
# NOTE(vish): Implementation specific db handling is done
# by the driver.
self.driver.db = self.db
self._last_volume_stats = []
def init_host(self):
"""Do any initialization that needs to be run if this is a
@ -144,6 +141,9 @@ class VolumeManager(manager.SchedulerDependentManager):
LOG.info(_('Resuming delete on volume: %s') % volume['id'])
self.delete_volume(ctxt, volume['id'])
# collect and publish service capabilities
self.publish_service_capabilities(ctxt)
def create_volume(self, context, volume_id, snapshot_id=None,
image_id=None, source_volid=None):
"""Creates and exports the volume."""
@ -490,33 +490,19 @@ class VolumeManager(manager.SchedulerDependentManager):
volume_ref = self.db.volume_get(context, volume_id)
self.driver.terminate_connection(volume_ref, connector, force=force)
def _volume_stats_changed(self, stat1, stat2):
if FLAGS.volume_force_update_capabilities:
return True
if len(stat1) != len(stat2):
return True
for (k, v) in stat1.iteritems():
if (k, v) not in stat2.iteritems():
return True
return False
@manager.periodic_task
def _report_driver_status(self, context):
LOG.info(_("Updating volume status"))
volume_stats = self.driver.get_volume_stats(refresh=True)
if volume_stats:
LOG.info(_("Checking volume capabilities"))
# This will grab info about the host and queue it
# to be sent to the Schedulers.
self.update_service_capabilities(volume_stats)
if self._volume_stats_changed(self._last_volume_stats,
volume_stats):
LOG.info(_("New capabilities found: %s"), volume_stats)
self._last_volume_stats = volume_stats
# This will grab info about the host and queue it
# to be sent to the Schedulers.
self.update_service_capabilities(self._last_volume_stats)
else:
# avoid repeating fanouts
self.update_service_capabilities(None)
def publish_service_capabilities(self, context):
""" Collect driver status and then publish """
self._report_driver_status(context)
self._publish_service_capabilities(context)
def _reset_stats(self):
LOG.info(_("Clear capabilities"))

View File

@ -34,6 +34,7 @@ class VolumeAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
1.0 - Initial version.
1.1 - Adds clone volume option to create_volume.
1.2 - Add publish_service_capabilities() method.
'''
BASE_RPC_API_VERSION = '1.0'
@ -114,3 +115,7 @@ class VolumeAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
topic=rpc.queue_get_for(ctxt,
self.topic,
volume['host']))
def publish_service_capabilities(self, ctxt):
self.fanout_cast(ctxt, self.make_msg('publish_service_capabilities'),
version='1.2')

View File

@ -1,7 +1,7 @@
[DEFAULT]
# The list of modules to copy from openstack-common
modules=cfg,exception,excutils,gettextutils,importutils,iniparser,jsonutils,local,rpc,timeutils,log,setup,notifier,context,network_utils,policy,uuidutils,lockutils,fileutils,gettextutils
modules=cfg,exception,excutils,gettextutils,importutils,iniparser,jsonutils,local,rpc,timeutils,log,setup,notifier,context,network_utils,policy,uuidutils,lockutils,fileutils,gettextutils,scheduler,scheduler.filters,scheduler.weights
# The base module to hold the copy of openstack.common
base=cinder

View File

@ -23,6 +23,23 @@ from cinder import version
requires = common_setup.parse_requirements()
filters = [
"AvailabilityZoneFilter = "
"cinder.openstack.common.scheduler.filters."
"availability_zone_filter:AvailabilityZoneFilter",
"CapabilitiesFilter = "
"cinder.openstack.common.scheduler.filters."
"capabilities_filter:CapabilitiesFilter",
"CapacityFilter = "
"cinder.scheduler.filters.capacity_filter:CapacityFilter",
"JsonFilter = "
"cinder.openstack.common.scheduler.filters.json_filter:JsonFilter",
]
weights = [
"CapacityWeigher = cinder.scheduler.weights.capacity:CapacityWeigher",
]
setuptools.setup(
name='cinder',
version=version.canonical_version_string(),
@ -43,6 +60,10 @@ setuptools.setup(
cmdclass=common_setup.get_cmdclass(),
packages=setuptools.find_packages(exclude=['bin', 'smoketests']),
install_requires=requires,
entry_points={
'cinder.scheduler.filters': filters,
'cinder.scheduler.weights': weights,
},
include_package_data=True,
test_suite='nose.collector',
setup_requires=['setuptools_git>=0.4'],

View File

@ -12,6 +12,7 @@ greenlet>=0.3.1
PasteDeploy==1.5.0
paste
sqlalchemy-migrate>=0.7.2
stevedore>=0.8.0
suds==0.4
paramiko
Babel>=0.9.6