removed volumes from scheduler
This commit is contained in:
parent
373bea0fe1
commit
49a5d0b461
@ -58,19 +58,6 @@ class ChanceScheduler(driver.Scheduler):
|
||||
|
||||
return hosts[int(random.random() * len(hosts))]
|
||||
|
||||
def schedule_create_volume(self, context, request_spec, filter_properties):
|
||||
"""Picks a host that is up at random."""
|
||||
topic = FLAGS.volume_topic
|
||||
host = self._schedule(context, topic, request_spec,
|
||||
filter_properties=filter_properties)
|
||||
volume_id = request_spec['volume_id']
|
||||
snapshot_id = request_spec['snapshot_id']
|
||||
image_id = request_spec['image_id']
|
||||
|
||||
updated_volume = driver.volume_update_db(context, volume_id, host)
|
||||
self.volume_rpcapi.create_volume(context, updated_volume, host,
|
||||
snapshot_id, image_id)
|
||||
|
||||
def schedule_create_share(self, context, request_spec, filter_properties):
|
||||
"""Picks a host that is up at random."""
|
||||
topic = FLAGS.share_topic
|
||||
|
@ -29,7 +29,6 @@ from manila.openstack.common import importutils
|
||||
from manila.openstack.common import timeutils
|
||||
from manila.share import rpcapi as share_rpcapi
|
||||
from manila import utils
|
||||
from manila.volume import rpcapi as volume_rpcapi
|
||||
|
||||
scheduler_driver_opts = [
|
||||
cfg.StrOpt('scheduler_host_manager',
|
||||
@ -37,7 +36,7 @@ scheduler_driver_opts = [
|
||||
help='The scheduler host manager class to use'),
|
||||
cfg.IntOpt('scheduler_max_attempts',
|
||||
default=3,
|
||||
help='Maximum number of attempts to schedule an volume'),
|
||||
help='Maximum number of attempts to schedule a share'),
|
||||
]
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
@ -54,16 +53,6 @@ def share_update_db(context, share_id, host):
|
||||
return db.share_update(context, share_id, values)
|
||||
|
||||
|
||||
def volume_update_db(context, volume_id, host):
|
||||
'''Set the host and set the scheduled_at field of a volume.
|
||||
|
||||
:returns: A Volume with the updated fields set properly.
|
||||
'''
|
||||
now = timeutils.utcnow()
|
||||
values = {'host': host, 'scheduled_at': now}
|
||||
return db.volume_update(context, volume_id, values)
|
||||
|
||||
|
||||
class Scheduler(object):
|
||||
"""The base class that all Scheduler classes should inherit from."""
|
||||
|
||||
@ -71,7 +60,6 @@ class Scheduler(object):
|
||||
self.host_manager = importutils.import_object(
|
||||
FLAGS.scheduler_host_manager)
|
||||
self.share_rpcapi = share_rpcapi.ShareAPI()
|
||||
self.volume_rpcapi = volume_rpcapi.VolumeAPI()
|
||||
|
||||
def get_host_list(self):
|
||||
"""Get a list of hosts from the HostManager."""
|
||||
@ -100,10 +88,6 @@ class Scheduler(object):
|
||||
"""Must override schedule method for scheduler to work."""
|
||||
raise NotImplementedError(_("Must implement a fallback schedule"))
|
||||
|
||||
def schedule_create_volume(self, context, request_spec, filter_properties):
|
||||
"""Must override schedule method for scheduler to work."""
|
||||
raise NotImplementedError(_("Must implement schedule_create_volume"))
|
||||
|
||||
def schedule_create_share(self, context, request_spec, filter_properties):
|
||||
"""Must override schedule method for scheduler to work."""
|
||||
raise NotImplementedError(_("Must implement schedule_create_share"))
|
||||
|
@ -15,8 +15,8 @@
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
The FilterScheduler is for creating volumes.
|
||||
You can customize this scheduler by specifying your own volume Filters and
|
||||
The FilterScheduler is for creating shares.
|
||||
You can customize this scheduler by specifying your own share Filters and
|
||||
Weighing Functions.
|
||||
"""
|
||||
|
||||
@ -52,42 +52,6 @@ class FilterScheduler(driver.Scheduler):
|
||||
"""Fetch options dictionary. Broken out for testing."""
|
||||
return self.options.get_configuration()
|
||||
|
||||
def populate_filter_properties(self, request_spec, filter_properties):
|
||||
"""Stuff things into filter_properties. Can be overridden in a
|
||||
subclass to add more data.
|
||||
"""
|
||||
vol = request_spec['volume_properties']
|
||||
filter_properties['size'] = vol['size']
|
||||
filter_properties['availability_zone'] = vol.get('availability_zone')
|
||||
filter_properties['user_id'] = vol.get('user_id')
|
||||
filter_properties['metadata'] = vol.get('metadata')
|
||||
|
||||
def schedule_create_volume(self, context, request_spec, filter_properties):
|
||||
weighed_host = self._schedule(context, request_spec,
|
||||
filter_properties)
|
||||
|
||||
if not weighed_host:
|
||||
raise exception.NoValidHost(reason="")
|
||||
|
||||
host = weighed_host.obj.host
|
||||
volume_id = request_spec['volume_id']
|
||||
snapshot_id = request_spec['snapshot_id']
|
||||
image_id = request_spec['image_id']
|
||||
|
||||
updated_volume = driver.volume_update_db(context, volume_id, host)
|
||||
self._post_select_populate_filter_properties(filter_properties,
|
||||
weighed_host.obj)
|
||||
|
||||
# context is not serializable
|
||||
filter_properties.pop('context', None)
|
||||
|
||||
self.volume_rpcapi.create_volume(context, updated_volume, host,
|
||||
request_spec=request_spec,
|
||||
filter_properties=filter_properties,
|
||||
allow_reschedule=True,
|
||||
snapshot_id=snapshot_id,
|
||||
image_id=image_id)
|
||||
|
||||
def _post_select_populate_filter_properties(self, filter_properties,
|
||||
host_state):
|
||||
"""Add additional information to the filter properties after a host has
|
||||
@ -115,108 +79,6 @@ class FilterScheduler(driver.Scheduler):
|
||||
raise exception.InvalidParameterValue(err=msg)
|
||||
return max_attempts
|
||||
|
||||
def _log_volume_error(self, volume_id, retry):
|
||||
"""If the request contained an exception from a previous volume
|
||||
create operation, log it to aid debugging.
|
||||
"""
|
||||
exc = retry.pop('exc', None) # string-ified exception from volume
|
||||
if not exc:
|
||||
return # no exception info from a previous attempt, skip
|
||||
|
||||
hosts = retry.get('hosts', None)
|
||||
if not hosts:
|
||||
return # no previously attempted hosts, skip
|
||||
|
||||
last_host = hosts[-1]
|
||||
msg = _("Error scheduling %(volume_id)s from last vol-service: "
|
||||
"%(last_host)s : %(exc)s") % locals()
|
||||
LOG.error(msg)
|
||||
|
||||
def _populate_retry(self, filter_properties, properties):
|
||||
"""Populate filter properties with history of retries for this
|
||||
request. If maximum retries is exceeded, raise NoValidHost.
|
||||
"""
|
||||
max_attempts = self.max_attempts
|
||||
retry = filter_properties.pop('retry', {})
|
||||
|
||||
if max_attempts == 1:
|
||||
# re-scheduling is disabled.
|
||||
return
|
||||
|
||||
# retry is enabled, update attempt count:
|
||||
if retry:
|
||||
retry['num_attempts'] += 1
|
||||
else:
|
||||
retry = {
|
||||
'num_attempts': 1,
|
||||
'hosts': [] # list of volume service hosts tried
|
||||
}
|
||||
filter_properties['retry'] = retry
|
||||
|
||||
volume_id = properties.get('volume_id')
|
||||
self._log_volume_error(volume_id, retry)
|
||||
|
||||
if retry['num_attempts'] > max_attempts:
|
||||
msg = _("Exceeded max scheduling attempts %(max_attempts)d for "
|
||||
"volume %(volume_id)s") % locals()
|
||||
raise exception.NoValidHost(reason=msg)
|
||||
|
||||
def _schedule(self, context, request_spec, filter_properties=None):
|
||||
"""Returns a list of hosts that meet the required specs,
|
||||
ordered by their fitness.
|
||||
"""
|
||||
elevated = context.elevated()
|
||||
|
||||
volume_properties = request_spec['volume_properties']
|
||||
# Since Cinder is using mixed filters from Oslo and it's own, which
|
||||
# takes 'resource_XX' and 'volume_XX' as input respectively, copying
|
||||
# 'volume_XX' to 'resource_XX' will make both filters happy.
|
||||
resource_properties = volume_properties.copy()
|
||||
volume_type = request_spec.get("volume_type", None)
|
||||
resource_type = request_spec.get("volume_type", None)
|
||||
request_spec.update({'resource_properties': resource_properties})
|
||||
|
||||
config_options = self._get_configuration_options()
|
||||
|
||||
if filter_properties is None:
|
||||
filter_properties = {}
|
||||
self._populate_retry(filter_properties, resource_properties)
|
||||
|
||||
filter_properties.update({'context': context,
|
||||
'request_spec': request_spec,
|
||||
'config_options': config_options,
|
||||
'volume_type': volume_type,
|
||||
'resource_type': resource_type})
|
||||
|
||||
self.populate_filter_properties(request_spec,
|
||||
filter_properties)
|
||||
|
||||
# Find our local list of acceptable hosts by filtering and
|
||||
# weighing our options. we virtually consume resources on
|
||||
# it so subsequent selections can adjust accordingly.
|
||||
|
||||
# Note: remember, we are using an iterator here. So only
|
||||
# traverse this list once.
|
||||
hosts = self.host_manager.get_all_host_states(elevated)
|
||||
|
||||
# Filter local hosts based on requirements ...
|
||||
hosts = self.host_manager.get_filtered_hosts(hosts,
|
||||
filter_properties)
|
||||
if not hosts:
|
||||
return None
|
||||
|
||||
LOG.debug(_("Filtered %(hosts)s") % locals())
|
||||
# weighted_host = WeightedHost() ... the best
|
||||
# host for the job.
|
||||
weighed_hosts = self.host_manager.get_weighed_hosts(hosts,
|
||||
filter_properties)
|
||||
best_host = weighed_hosts[0]
|
||||
LOG.debug(_("Choosing %(best_host)s") % locals())
|
||||
best_host.obj.consume_from_volume(volume_properties)
|
||||
return best_host
|
||||
|
||||
#NOTE(rushiagr): Methods for scheduling shares
|
||||
|
||||
def schedule_create_share(self, context, request_spec, filter_properties):
|
||||
weighed_host = self._schedule_share(context,
|
||||
request_spec,
|
||||
|
@ -91,7 +91,6 @@ class HostState(object):
|
||||
self.host = host
|
||||
self.update_capabilities(capabilities, service)
|
||||
|
||||
self.volume_backend_name = None
|
||||
self.share_backend_name = None
|
||||
self.vendor_name = None
|
||||
self.driver_version = 0
|
||||
@ -115,24 +114,6 @@ class HostState(object):
|
||||
service = {}
|
||||
self.service = ReadOnlyDict(service)
|
||||
|
||||
def update_from_volume_capability(self, capability):
|
||||
"""Update information about a host from its volume_node info."""
|
||||
if capability:
|
||||
if self.updated and self.updated > capability['timestamp']:
|
||||
return
|
||||
|
||||
self.volume_backend = capability.get('volume_backend_name', None)
|
||||
self.vendor_name = capability.get('vendor_name', None)
|
||||
self.driver_version = capability.get('driver_version', None)
|
||||
self.storage_protocol = capability.get('storage_protocol', None)
|
||||
self.QoS_support = capability.get('QoS_support', False)
|
||||
|
||||
self.total_capacity_gb = capability['total_capacity_gb']
|
||||
self.free_capacity_gb = capability['free_capacity_gb']
|
||||
self.reserved_percentage = capability['reserved_percentage']
|
||||
|
||||
self.updated = capability['timestamp']
|
||||
|
||||
def update_from_share_capability(self, capability):
|
||||
"""Update information about a host from its volume_node info."""
|
||||
if capability:
|
||||
@ -151,24 +132,6 @@ class HostState(object):
|
||||
|
||||
self.updated = capability['timestamp']
|
||||
|
||||
def consume_from_volume(self, volume):
|
||||
"""Incrementally update host state from an volume."""
|
||||
volume_gb = volume['size']
|
||||
if self.free_capacity_gb == 'infinite':
|
||||
# There's virtually infinite space on back-end
|
||||
pass
|
||||
elif self.free_capacity_gb == 'unknown':
|
||||
# Unable to determine the actual free space on back-end
|
||||
pass
|
||||
else:
|
||||
self.free_capacity_gb -= volume_gb
|
||||
self.updated = timeutils.utcnow()
|
||||
|
||||
def __repr__(self):
|
||||
return ("host '%s': free_capacity_gb: %s" %
|
||||
(self.host, self.free_capacity_gb))
|
||||
|
||||
|
||||
class HostManager(object):
|
||||
"""Base HostManager class."""
|
||||
|
||||
@ -255,7 +218,7 @@ class HostManager(object):
|
||||
|
||||
def update_service_capabilities(self, service_name, host, capabilities):
|
||||
"""Update the per-service capabilities based on this notification."""
|
||||
if service_name not in ('volume', 'share'):
|
||||
if service_name not in ('share'):
|
||||
LOG.debug(_('Ignoring %(service_name)s service update '
|
||||
'from %(host)s'), locals())
|
||||
return
|
||||
@ -268,40 +231,6 @@ class HostManager(object):
|
||||
capab_copy["timestamp"] = timeutils.utcnow() # Reported time
|
||||
self.service_states[host] = capab_copy
|
||||
|
||||
def get_all_host_states(self, context):
|
||||
"""Returns a dict of all the hosts the HostManager
|
||||
knows about. Also, each of the consumable resources in HostState
|
||||
are pre-populated and adjusted based on data in the db.
|
||||
|
||||
For example:
|
||||
{'192.168.1.100': HostState(), ...}
|
||||
"""
|
||||
|
||||
# Get resource usage across the available volume nodes:
|
||||
topic = FLAGS.volume_topic
|
||||
volume_services = db.service_get_all_by_topic(context, topic)
|
||||
for service in volume_services:
|
||||
if not utils.service_is_up(service) or service['disabled']:
|
||||
LOG.warn(_("service is down or disabled."))
|
||||
continue
|
||||
host = service['host']
|
||||
capabilities = self.service_states.get(host, None)
|
||||
host_state = self.host_state_map.get(host)
|
||||
if host_state:
|
||||
# copy capabilities to host_state.capabilities
|
||||
host_state.update_capabilities(capabilities,
|
||||
dict(service.iteritems()))
|
||||
else:
|
||||
host_state = self.host_state_cls(host,
|
||||
capabilities=capabilities,
|
||||
service=
|
||||
dict(service.iteritems()))
|
||||
self.host_state_map[host] = host_state
|
||||
# update host_state
|
||||
host_state.update_from_volume_capability(capabilities)
|
||||
|
||||
return self.host_state_map.itervalues()
|
||||
|
||||
def get_all_host_states_share(self, context):
|
||||
"""Returns a dict of all the hosts the HostManager
|
||||
knows about. Also, each of the consumable resources in HostState
|
||||
|
@ -33,7 +33,6 @@ from manila.openstack.common import importutils
|
||||
from manila.openstack.common import log as logging
|
||||
from manila.openstack.common.notifier import api as notifier
|
||||
from manila.share import rpcapi as share_rpcapi
|
||||
from manila.volume import rpcapi as volume_rpcapi
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -47,7 +46,7 @@ FLAGS.register_opt(scheduler_driver_opt)
|
||||
|
||||
|
||||
class SchedulerManager(manager.Manager):
|
||||
"""Chooses a host to create volumes."""
|
||||
"""Chooses a host to create shares."""
|
||||
|
||||
RPC_API_VERSION = '1.3'
|
||||
|
||||
@ -79,42 +78,6 @@ class SchedulerManager(manager.Manager):
|
||||
host,
|
||||
capabilities)
|
||||
|
||||
def create_volume(self, context, topic, volume_id, snapshot_id=None,
|
||||
image_id=None, request_spec=None,
|
||||
filter_properties=None):
|
||||
try:
|
||||
if request_spec is None:
|
||||
# For RPC version < 1.2 backward compatibility
|
||||
request_spec = {}
|
||||
volume_ref = db.volume_get(context, volume_id)
|
||||
size = volume_ref.get('size')
|
||||
availability_zone = volume_ref.get('availability_zone')
|
||||
volume_type_id = volume_ref.get('volume_type_id')
|
||||
vol_type = db.volume_type_get(context, volume_type_id)
|
||||
volume_properties = {'size': size,
|
||||
'availability_zone': availability_zone,
|
||||
'volume_type_id': volume_type_id}
|
||||
request_spec.update(
|
||||
{'volume_id': volume_id,
|
||||
'snapshot_id': snapshot_id,
|
||||
'image_id': image_id,
|
||||
'volume_properties': volume_properties,
|
||||
'volume_type': dict(vol_type).iteritems()})
|
||||
|
||||
self.driver.schedule_create_volume(context, request_spec,
|
||||
filter_properties)
|
||||
except exception.NoValidHost as ex:
|
||||
volume_state = {'volume_state': {'status': 'error'}}
|
||||
self._set_volume_state_and_notify('create_volume',
|
||||
volume_state,
|
||||
context, ex, request_spec)
|
||||
except Exception as ex:
|
||||
with excutils.save_and_reraise_exception():
|
||||
volume_state = {'volume_state': {'status': 'error'}}
|
||||
self._set_volume_state_and_notify('create_volume',
|
||||
volume_state,
|
||||
context, ex, request_spec)
|
||||
|
||||
def create_share(self, context, topic, share_id, snapshot_id=None,
|
||||
request_spec=None, filter_properties=None):
|
||||
try:
|
||||
@ -151,28 +114,5 @@ class SchedulerManager(manager.Manager):
|
||||
notifier.notify(context, notifier.publisher_id("scheduler"),
|
||||
'scheduler.' + method, notifier.ERROR, payload)
|
||||
|
||||
def _set_volume_state_and_notify(self, method, updates, context, ex,
|
||||
request_spec):
|
||||
LOG.error(_("Failed to schedule_%(method)s: %(ex)s") % locals())
|
||||
|
||||
volume_state = updates['volume_state']
|
||||
properties = request_spec.get('volume_properties', {})
|
||||
|
||||
volume_id = request_spec.get('volume_id', None)
|
||||
|
||||
if volume_id:
|
||||
db.volume_update(context, volume_id, volume_state)
|
||||
|
||||
payload = dict(request_spec=request_spec,
|
||||
volume_properties=properties,
|
||||
volume_id=volume_id,
|
||||
state=volume_state,
|
||||
method=method,
|
||||
reason=ex)
|
||||
|
||||
notifier.notify(context, notifier.publisher_id("scheduler"),
|
||||
'scheduler.' + method, notifier.ERROR, payload)
|
||||
|
||||
def request_service_capabilities(self, context):
|
||||
volume_rpcapi.VolumeAPI().publish_service_capabilities(context)
|
||||
share_rpcapi.ShareAPI().publish_service_capabilities(context)
|
||||
|
@ -45,20 +45,6 @@ class SchedulerAPI(manila.openstack.common.rpc.proxy.RpcProxy):
|
||||
topic=FLAGS.scheduler_topic,
|
||||
default_version=self.RPC_API_VERSION)
|
||||
|
||||
def create_volume(self, ctxt, topic, volume_id, snapshot_id=None,
|
||||
image_id=None, request_spec=None,
|
||||
filter_properties=None):
|
||||
request_spec_p = jsonutils.to_primitive(request_spec)
|
||||
return self.cast(ctxt, self.make_msg(
|
||||
'create_volume',
|
||||
topic=topic,
|
||||
volume_id=volume_id,
|
||||
snapshot_id=snapshot_id,
|
||||
image_id=image_id,
|
||||
request_spec=request_spec_p,
|
||||
filter_properties=filter_properties),
|
||||
version='1.2')
|
||||
|
||||
def create_share(self, ctxt, topic, share_id, snapshot_id=None,
|
||||
request_spec=None, filter_properties=None):
|
||||
request_spec_p = jsonutils.to_primitive(request_spec)
|
||||
|
@ -42,54 +42,6 @@ FLAGS.register_opts(simple_scheduler_opts)
|
||||
class SimpleScheduler(chance.ChanceScheduler):
|
||||
"""Implements Naive Scheduler that tries to find least loaded host."""
|
||||
|
||||
def schedule_create_volume(self, context, request_spec, filter_properties):
|
||||
"""Picks a host that is up and has the fewest volumes."""
|
||||
elevated = context.elevated()
|
||||
|
||||
volume_id = request_spec.get('volume_id')
|
||||
snapshot_id = request_spec.get('snapshot_id')
|
||||
image_id = request_spec.get('image_id')
|
||||
volume_properties = request_spec.get('volume_properties')
|
||||
volume_size = volume_properties.get('size')
|
||||
availability_zone = volume_properties.get('availability_zone')
|
||||
|
||||
zone, host = None, None
|
||||
if availability_zone:
|
||||
zone, _x, host = availability_zone.partition(':')
|
||||
if host and context.is_admin:
|
||||
topic = FLAGS.volume_topic
|
||||
service = db.service_get_by_args(elevated, host, topic)
|
||||
if not utils.service_is_up(service):
|
||||
raise exception.WillNotSchedule(host=host)
|
||||
updated_volume = driver.volume_update_db(context, volume_id, host)
|
||||
self.volume_rpcapi.create_volume(context,
|
||||
updated_volume,
|
||||
host,
|
||||
snapshot_id,
|
||||
image_id)
|
||||
return None
|
||||
|
||||
results = db.service_get_all_volume_sorted(elevated)
|
||||
if zone:
|
||||
results = [(service, gigs) for (service, gigs) in results
|
||||
if service['availability_zone'] == zone]
|
||||
for result in results:
|
||||
(service, volume_gigabytes) = result
|
||||
if volume_gigabytes + volume_size > FLAGS.max_gigabytes:
|
||||
msg = _("Not enough allocatable volume gigabytes remaining")
|
||||
raise exception.NoValidHost(reason=msg)
|
||||
if utils.service_is_up(service) and not service['disabled']:
|
||||
updated_volume = driver.volume_update_db(context, volume_id,
|
||||
service['host'])
|
||||
self.volume_rpcapi.create_volume(context,
|
||||
updated_volume,
|
||||
service['host'],
|
||||
snapshot_id,
|
||||
image_id)
|
||||
return None
|
||||
msg = _("Is the appropriate service running?")
|
||||
raise exception.NoValidHost(reason=msg)
|
||||
|
||||
def schedule_create_share(self, context, request_spec, filter_properties):
|
||||
"""Picks a host that is up and has the fewest shares."""
|
||||
#TODO(rushiagr) - pick only hosts that run shares
|
||||
|
Loading…
x
Reference in New Issue
Block a user