Calculate virtual free capacity and notify

It calculates virtual free capacity for
pool and backend which reports definite
values. It will also notify various
capacity info to Ceilometer service.
The notification only occurs when
volume status is updated. It will give
users a bit knowledge about the current
storage usage.

As there are multiple schedulers and volumes, the
patch contains the logic only to send changed
capacity info to ceilometer as possible as. It adds
a new rpc call named notify_service_capabilities()
which fanout=false in volume manager. This means
that only one scheduler will receive it and send
the notification.

Co-Authored-By: Gorka Eguileor <geguileo@redhat.com>
Co-Authored-By: lisali <xiaoyan.li@intel.com>

DocImpact
Implements: blueprint capacity-headroom
Change-Id: Iff033d1b591fb3d9c0a5b9732c0c17d96ddbf712
This commit is contained in:
XinXiaohui 2016-03-16 09:16:40 +08:00 committed by lisali
parent c633a5bfd2
commit b67a416bb9
10 changed files with 801 additions and 2 deletions

View File

@ -174,6 +174,11 @@ class SchedulerDependentManager(Manager):
self.service_name, self.service_name,
self.host, self.host,
self.last_capabilities) self.last_capabilities)
self.scheduler_rpcapi.notify_service_capabilities(
context,
self.service_name,
self.host,
self.last_capabilities)
def _add_to_threadpool(self, func, *args, **kwargs): def _add_to_threadpool(self, func, *args, **kwargs):
self._tp.spawn_n(func, *args, **kwargs) self._tp.spawn_n(func, *args, **kwargs)

View File

@ -103,6 +103,12 @@ class Scheduler(object):
host, host,
capabilities) capabilities)
def notify_service_capabilities(self, service_name, host, capabilities):
"""Notify capability update from a service node."""
self.host_manager.notify_service_capabilities(service_name,
host,
capabilities)
def host_passes_filters(self, context, host, request_spec, def host_passes_filters(self, context, host, request_spec,
filter_properties): filter_properties):
"""Check if the specified host passes the filters.""" """Check if the specified host passes the filters."""

View File

@ -28,6 +28,7 @@ from cinder.common import constants
from cinder import context as cinder_context from cinder import context as cinder_context
from cinder import exception from cinder import exception
from cinder import objects from cinder import objects
from cinder import utils
from cinder.i18n import _LI, _LW from cinder.i18n import _LI, _LW
from cinder.scheduler import filters from cinder.scheduler import filters
from cinder.volume import utils as vol_utils from cinder.volume import utils as vol_utils
@ -345,6 +346,17 @@ class HostManager(object):
host_state_cls = HostState host_state_cls = HostState
REQUIRED_KEYS = frozenset([
'pool_name',
'total_capacity_gb',
'free_capacity_gb',
'allocated_capacity_gb',
'provisioned_capacity_gb',
'thin_provisioning_support',
'thick_provisioning_support',
'max_over_subscription_ratio',
'reserved_percentage'])
def __init__(self): def __init__(self):
self.service_states = {} # { <host>: {<service>: {cap k : v}}} self.service_states = {} # { <host>: {<service>: {cap k : v}}}
self.host_state_map = {} self.host_state_map = {}
@ -358,6 +370,7 @@ class HostManager(object):
self._no_capabilities_hosts = set() # Hosts having no capabilities self._no_capabilities_hosts = set() # Hosts having no capabilities
self._update_host_state_map(cinder_context.get_admin_context()) self._update_host_state_map(cinder_context.get_admin_context())
self.service_states_last_update = {}
def _choose_host_filters(self, filter_cls_names): def _choose_host_filters(self, filter_cls_names):
"""Return a list of available filter names. """Return a list of available filter names.
@ -441,6 +454,24 @@ class HostManager(object):
# Copy the capabilities, so we don't modify the original dict # Copy the capabilities, so we don't modify the original dict
capab_copy = dict(capabilities) capab_copy = dict(capabilities)
capab_copy["timestamp"] = timeutils.utcnow() # Reported time capab_copy["timestamp"] = timeutils.utcnow() # Reported time
# Set the default capabilities in case None is set.
capab_old = self.service_states.get(host, {"timestamp": 0})
capab_last_update = self.service_states_last_update.get(
host, {"timestamp": 0})
# If the capabilites are not changed and the timestamp is older,
# record the capabilities.
# There are cases: capab_old has the capabilities set,
# but the timestamp may be None in it. So does capab_last_update.
if (not self._get_updated_pools(capab_old, capab_copy)) and (
(not capab_old.get("timestamp")) or
(not capab_last_update.get("timestamp")) or
(capab_last_update["timestamp"] < capab_old["timestamp"])):
self.service_states_last_update[host] = capab_old
self.service_states[host] = capab_copy self.service_states[host] = capab_copy
LOG.debug("Received %(service_name)s service update from " LOG.debug("Received %(service_name)s service update from "
@ -450,6 +481,34 @@ class HostManager(object):
self._no_capabilities_hosts.discard(host) self._no_capabilities_hosts.discard(host)
def notify_service_capabilities(self, service_name, host, capabilities):
"""Notify the ceilometer with updated volume stats"""
if service_name != 'volume':
return
updated = []
capa_new = self.service_states.get(host, {})
timestamp = timeutils.utcnow()
# Compare the capabilities and timestamps to decide notifying
if not capa_new:
updated = self._get_updated_pools(capa_new, capabilities)
else:
if timestamp > self.service_states[host]["timestamp"]:
updated = self._get_updated_pools(self.service_states[host],
capabilities)
if not updated:
updated = self._get_updated_pools(
self.service_states_last_update.get(host, {}),
self.service_states.get(host, {}))
if updated:
capab_copy = dict(capabilities)
capab_copy["timestamp"] = timestamp
# If capabilities changes, notify and record the capabilities.
self.service_states_last_update[host] = capab_copy
self.get_usage_and_notify(capabilities, updated, host, timestamp)
def has_all_capabilities(self): def has_all_capabilities(self):
return len(self._no_capabilities_hosts) == 0 return len(self._no_capabilities_hosts) == 0
@ -533,3 +592,120 @@ class HostManager(object):
all_pools.append(new_pool) all_pools.append(new_pool)
return all_pools return all_pools
def get_usage_and_notify(self, capa_new, updated_pools, host, timestamp):
context = cinder_context.get_admin_context()
usage = self._get_usage(capa_new, updated_pools, host, timestamp)
self._notify_capacity_usage(context, usage)
def _get_usage(self, capa_new, updated_pools, host, timestamp):
pools = capa_new.get('pools')
usage = []
if pools and isinstance(pools, list):
backend_usage = dict(type='backend',
name_to_id=host,
total=0,
free=0,
allocated=0,
provisioned=0,
virtual_free=0,
reported_at=timestamp)
# Process the usage.
for pool in pools:
pool_usage = self._get_pool_usage(pool, host, timestamp)
if pool_usage:
backend_usage["total"] += pool_usage["total"]
backend_usage["free"] += pool_usage["free"]
backend_usage["allocated"] += pool_usage["allocated"]
backend_usage["provisioned"] += pool_usage["provisioned"]
backend_usage["virtual_free"] += pool_usage["virtual_free"]
# Only the updated pool is reported.
if pool in updated_pools:
usage.append(pool_usage)
usage.append(backend_usage)
return usage
def _get_pool_usage(self, pool, host, timestamp):
total = pool["total_capacity_gb"]
free = pool["free_capacity_gb"]
unknowns = ["unknown", "infinite", None]
if (total in unknowns) or (free in unknowns):
return {}
allocated = pool["allocated_capacity_gb"]
provisioned = pool["provisioned_capacity_gb"]
reserved = pool["reserved_percentage"]
ratio = pool["max_over_subscription_ratio"]
support = pool["thin_provisioning_support"]
virtual_free = utils.calculate_virtual_free_capacity(
total,
free,
provisioned,
support,
ratio,
reserved,
support)
pool_usage = dict(
type='pool',
name_to_id='#'.join([host, pool['pool_name']]),
total=float(total),
free=float(free),
allocated=float(allocated),
provisioned=float(provisioned),
virtual_free=float(virtual_free),
reported_at=timestamp)
return pool_usage
def _get_updated_pools(self, old_capa, new_capa):
# Judge if the capabilities should be reported.
new_pools = new_capa.get('pools', [])
if not new_pools:
return []
if isinstance(new_pools, list):
# If the volume_stats is not well prepared, don't notify.
if not all(
self.REQUIRED_KEYS.issubset(pool) for pool in new_pools):
return []
else:
LOG.debug("The reported capabilities are not well structured...")
return []
old_pools = old_capa.get('pools', [])
if not old_pools:
return new_pools
updated_pools = []
newpools = {}
oldpools = {}
for new_pool in new_pools:
newpools[new_pool['pool_name']] = new_pool
for old_pool in old_pools:
oldpools[old_pool['pool_name']] = old_pool
for key in newpools.keys():
if key in oldpools.keys():
for k in self.REQUIRED_KEYS:
if newpools[key][k] != oldpools[key][k]:
updated_pools.append(newpools[key])
break
else:
updated_pools.append(newpools[key])
return updated_pools
def _notify_capacity_usage(self, context, usage):
if usage:
for u in usage:
vol_utils.notify_about_capacity_usage(
context, u, u['type'], None, None)
LOG.debug("Publish storage capacity: %s.", usage)

View File

@ -88,6 +88,15 @@ class SchedulerManager(manager.Manager):
host, host,
capabilities) capabilities)
def notify_service_capabilities(self, context, service_name,
host, capabilities):
"""Process a capability update from a service node."""
if capabilities is None:
capabilities = {}
self.driver.notify_service_capabilities(service_name,
host,
capabilities)
def _wait_for_scheduler(self): def _wait_for_scheduler(self):
# NOTE(dulek): We're waiting for scheduler to announce that it's ready # NOTE(dulek): We're waiting for scheduler to announce that it's ready
# or CONF.periodic_interval seconds from service startup has passed. # or CONF.periodic_interval seconds from service startup has passed.

View File

@ -58,9 +58,10 @@ class SchedulerAPI(rpc.RPCAPI):
set to 2.3. set to 2.3.
3.0 - Remove 2.x compatibility 3.0 - Remove 2.x compatibility
3.1 - Adds notify_service_capabilities()
""" """
RPC_API_VERSION = '3.0' RPC_API_VERSION = '3.1'
RPC_DEFAULT_VERSION = '3.0' RPC_DEFAULT_VERSION = '3.0'
TOPIC = constants.SCHEDULER_TOPIC TOPIC = constants.SCHEDULER_TOPIC
BINARY = 'cinder-scheduler' BINARY = 'cinder-scheduler'
@ -139,3 +140,10 @@ class SchedulerAPI(rpc.RPCAPI):
cctxt.cast(ctxt, 'update_service_capabilities', cctxt.cast(ctxt, 'update_service_capabilities',
service_name=service_name, host=host, service_name=service_name, host=host,
capabilities=capabilities) capabilities=capabilities)
def notify_service_capabilities(self, ctxt, service_name,
host, capabilities):
cctxt = self._get_cctxt(version='3.1')
cctxt.cast(ctxt, 'notify_service_capabilities',
service_name=service_name, host=host,
capabilities=capabilities)

View File

@ -48,6 +48,8 @@ class HostManagerTestCase(test.TestCase):
self.host_manager = host_manager.HostManager() self.host_manager = host_manager.HostManager()
self.fake_hosts = [host_manager.HostState('fake_host%s' % x) self.fake_hosts = [host_manager.HostState('fake_host%s' % x)
for x in range(1, 5)] for x in range(1, 5)]
# For a second scheduler service.
self.host_manager_1 = host_manager.HostManager()
def test_choose_host_filters_not_found(self): def test_choose_host_filters_not_found(self):
self.flags(scheduler_default_filters='FakeFilterClass3') self.flags(scheduler_default_filters='FakeFilterClass3')
@ -85,12 +87,15 @@ class HostManagerTestCase(test.TestCase):
self.assertEqual(expected, mock_func.call_args_list) self.assertEqual(expected, mock_func.call_args_list)
self.assertEqual(set(self.fake_hosts), set(result)) self.assertEqual(set(self.fake_hosts), set(result))
@mock.patch('cinder.scheduler.host_manager.HostManager._get_updated_pools')
@mock.patch('oslo_utils.timeutils.utcnow') @mock.patch('oslo_utils.timeutils.utcnow')
def test_update_service_capabilities(self, _mock_utcnow): def test_update_service_capabilities(self, _mock_utcnow,
_mock_get_updated_pools):
service_states = self.host_manager.service_states service_states = self.host_manager.service_states
self.assertDictMatch({}, service_states) self.assertDictMatch({}, service_states)
_mock_utcnow.side_effect = [31337, 31338, 31339] _mock_utcnow.side_effect = [31337, 31338, 31339]
_mock_get_updated_pools.return_value = []
host1_volume_capabs = dict(free_capacity_gb=4321, timestamp=1) host1_volume_capabs = dict(free_capacity_gb=4321, timestamp=1)
host2_volume_capabs = dict(free_capacity_gb=5432, timestamp=1) host2_volume_capabs = dict(free_capacity_gb=5432, timestamp=1)
host3_volume_capabs = dict(free_capacity_gb=6543, timestamp=1) host3_volume_capabs = dict(free_capacity_gb=6543, timestamp=1)
@ -117,6 +122,355 @@ class HostManagerTestCase(test.TestCase):
'host3': host3_volume_capabs} 'host3': host3_volume_capabs}
self.assertDictMatch(expected, service_states) self.assertDictMatch(expected, service_states)
@mock.patch(
'cinder.scheduler.host_manager.HostManager.get_usage_and_notify')
@mock.patch('oslo_utils.timeutils.utcnow')
def test_update_and_notify_service_capabilities_case1(
self, _mock_utcnow,
_mock_get_usage_and_notify):
_mock_utcnow.side_effect = [31337, 31338, 31339]
service_name = 'volume'
capab1 = {'pools': [{
'pool_name': 'pool1', 'thick_provisioning_support': True,
'thin_provisioning_support': False, 'total_capacity_gb': 10,
'free_capacity_gb': 10, 'max_over_subscription_ratio': 1,
'provisioned_capacity_gb': 0, 'allocated_capacity_gb': 0,
'reserved_percentage': 0}]}
# Run 1:
# capa: capa1
# S0: update_service_capabilities()
# S0: notify_service_capabilities()
# S1: update_service_capabilities()
#
# notify capab1 to ceilometer by S0
#
# S0: update_service_capabilities()
self.host_manager.update_service_capabilities(service_name, 'host1',
capab1)
self.assertDictMatch(dict(dict(timestamp=31337), **capab1),
self.host_manager.service_states['host1'])
# S0: notify_service_capabilities()
self.host_manager.notify_service_capabilities(service_name, 'host1',
capab1)
self.assertDictMatch(dict(dict(timestamp=31337), **capab1),
self.host_manager.service_states['host1'])
self.assertDictMatch(
dict(dict(timestamp=31338), **capab1),
self.host_manager.service_states_last_update['host1'])
# notify capab1 to ceilometer by S0
self.assertTrue(1, _mock_get_usage_and_notify.call_count)
# S1: update_service_capabilities()
self.host_manager_1.update_service_capabilities(service_name, 'host1',
capab1)
self.assertDictMatch(dict(dict(timestamp=31339), **capab1),
self.host_manager_1.service_states['host1'])
@mock.patch(
'cinder.scheduler.host_manager.HostManager.get_usage_and_notify')
@mock.patch('oslo_utils.timeutils.utcnow')
def test_update_and_notify_service_capabilities_case2(
self, _mock_utcnow,
_mock_get_usage_and_notify):
_mock_utcnow.side_effect = [31340, 31341, 31342]
service_name = 'volume'
capab1 = {'pools': [{
'pool_name': 'pool1', 'thick_provisioning_support': True,
'thin_provisioning_support': False, 'total_capacity_gb': 10,
'free_capacity_gb': 10, 'max_over_subscription_ratio': 1,
'provisioned_capacity_gb': 0, 'allocated_capacity_gb': 0,
'reserved_percentage': 0}]}
self.host_manager.service_states['host1'] = (
dict(dict(timestamp=31337), **capab1))
self.host_manager.service_states_last_update['host1'] = (
dict(dict(timestamp=31338), **capab1))
self.host_manager_1.service_states['host1'] = (
dict(dict(timestamp=31339), **capab1))
# Run 2:
# capa: capa1
# S0: update_service_capabilities()
# S1: update_service_capabilities()
# S1: notify_service_capabilities()
#
# Don't notify capab1 to ceilometer.
# S0: update_service_capabilities()
self.host_manager.update_service_capabilities(service_name, 'host1',
capab1)
self.assertDictMatch(dict(dict(timestamp=31340), **capab1),
self.host_manager.service_states['host1'])
self.assertDictMatch(
dict(dict(timestamp=31338), **capab1),
self.host_manager.service_states_last_update['host1'])
# S1: update_service_capabilities()
self.host_manager_1.update_service_capabilities(service_name, 'host1',
capab1)
self.assertDictMatch(dict(dict(timestamp=31341), **capab1),
self.host_manager_1.service_states['host1'])
self.assertDictMatch(
dict(dict(timestamp=31339), **capab1),
self.host_manager_1.service_states_last_update['host1'])
# S1: notify_service_capabilities()
self.host_manager_1.notify_service_capabilities(service_name, 'host1',
capab1)
self.assertDictMatch(dict(dict(timestamp=31341), **capab1),
self.host_manager_1.service_states['host1'])
self.assertDictMatch(
self.host_manager_1.service_states_last_update['host1'],
dict(dict(timestamp=31339), **capab1))
# Don't notify capab1 to ceilometer.
self.assertTrue(1, _mock_get_usage_and_notify.call_count)
@mock.patch(
'cinder.scheduler.host_manager.HostManager.get_usage_and_notify')
@mock.patch('oslo_utils.timeutils.utcnow')
def test_update_and_notify_service_capabilities_case3(
self, _mock_utcnow,
_mock_get_usage_and_notify):
_mock_utcnow.side_effect = [31343, 31344, 31345]
service_name = 'volume'
capab1 = {'pools': [{
'pool_name': 'pool1', 'thick_provisioning_support': True,
'thin_provisioning_support': False, 'total_capacity_gb': 10,
'free_capacity_gb': 10, 'max_over_subscription_ratio': 1,
'provisioned_capacity_gb': 0, 'allocated_capacity_gb': 0,
'reserved_percentage': 0}]}
self.host_manager.service_states['host1'] = (
dict(dict(timestamp=31340), **capab1))
self.host_manager.service_states_last_update['host1'] = (
dict(dict(timestamp=31338), **capab1))
self.host_manager_1.service_states['host1'] = (
dict(dict(timestamp=31341), **capab1))
self.host_manager_1.service_states_last_update['host1'] = (
dict(dict(timestamp=31339), **capab1))
# Run 3:
# capa: capab1
# S0: notify_service_capabilities()
# S0: update_service_capabilities()
# S1: update_service_capabilities()
#
# Don't notify capab1 to ceilometer.
# S0: notify_service_capabilities()
self.host_manager.notify_service_capabilities(service_name, 'host1',
capab1)
self.assertDictMatch(
dict(dict(timestamp=31338), **capab1),
self.host_manager.service_states_last_update['host1'])
self.assertDictMatch(dict(dict(timestamp=31340), **capab1),
self.host_manager.service_states['host1'])
# Don't notify capab1 to ceilometer.
self.assertTrue(1, _mock_get_usage_and_notify.call_count)
# S0: update_service_capabilities()
self.host_manager.update_service_capabilities(service_name, 'host1',
capab1)
self.assertDictMatch(
dict(dict(timestamp=31340), **capab1),
self.host_manager.service_states_last_update['host1'])
self.assertDictMatch(dict(dict(timestamp=31344), **capab1),
self.host_manager.service_states['host1'])
# S1: update_service_capabilities()
self.host_manager_1.update_service_capabilities(service_name, 'host1',
capab1)
self.assertDictMatch(dict(dict(timestamp=31345), **capab1),
self.host_manager_1.service_states['host1'])
self.assertDictMatch(
dict(dict(timestamp=31341), **capab1),
self.host_manager_1.service_states_last_update['host1'])
@mock.patch(
'cinder.scheduler.host_manager.HostManager.get_usage_and_notify')
@mock.patch('oslo_utils.timeutils.utcnow')
def test_update_and_notify_service_capabilities_case4(
self, _mock_utcnow,
_mock_get_usage_and_notify):
_mock_utcnow.side_effect = [31346, 31347, 31348]
service_name = 'volume'
capab1 = {'pools': [{
'pool_name': 'pool1', 'thick_provisioning_support': True,
'thin_provisioning_support': False, 'total_capacity_gb': 10,
'free_capacity_gb': 10, 'max_over_subscription_ratio': 1,
'provisioned_capacity_gb': 0, 'allocated_capacity_gb': 0,
'reserved_percentage': 0}]}
self.host_manager.service_states['host1'] = (
dict(dict(timestamp=31344), **capab1))
self.host_manager.service_states_last_update['host1'] = (
dict(dict(timestamp=31340), **capab1))
self.host_manager_1.service_states['host1'] = (
dict(dict(timestamp=31345), **capab1))
self.host_manager_1.service_states_last_update['host1'] = (
dict(dict(timestamp=31341), **capab1))
capab2 = {'pools': [{
'pool_name': 'pool1', 'thick_provisioning_support': True,
'thin_provisioning_support': False, 'total_capacity_gb': 10,
'free_capacity_gb': 9, 'max_over_subscription_ratio': 1,
'provisioned_capacity_gb': 1, 'allocated_capacity_gb': 1,
'reserved_percentage': 0}]}
# Run 4:
# capa: capab2
# S0: update_service_capabilities()
# S1: notify_service_capabilities()
# S1: update_service_capabilities()
#
# notify capab2 to ceilometer.
# S0: update_service_capabilities()
self.host_manager.update_service_capabilities(service_name, 'host1',
capab2)
self.assertDictMatch(
dict(dict(timestamp=31340), **capab1),
self.host_manager.service_states_last_update['host1'])
self.assertDictMatch(dict(dict(timestamp=31346), **capab2),
self.host_manager.service_states['host1'])
# S1: notify_service_capabilities()
self.host_manager_1.notify_service_capabilities(service_name, 'host1',
capab2)
self.assertDictMatch(dict(dict(timestamp=31345), **capab1),
self.host_manager_1.service_states['host1'])
self.assertDictMatch(
dict(dict(timestamp=31347), **capab2),
self.host_manager_1.service_states_last_update['host1'])
# notify capab2 to ceilometer.
self.assertTrue(2, _mock_get_usage_and_notify.call_count)
# S1: update_service_capabilities()
self.host_manager_1.update_service_capabilities(service_name, 'host1',
capab2)
self.assertDictMatch(dict(dict(timestamp=31348), **capab2),
self.host_manager_1.service_states['host1'])
self.assertDictMatch(
dict(dict(timestamp=31347), **capab2),
self.host_manager_1.service_states_last_update['host1'])
@mock.patch(
'cinder.scheduler.host_manager.HostManager.get_usage_and_notify')
@mock.patch('oslo_utils.timeutils.utcnow')
def test_update_and_notify_service_capabilities_case5(
self, _mock_utcnow,
_mock_get_usage_and_notify):
_mock_utcnow.side_effect = [31349, 31350, 31351]
service_name = 'volume'
capab1 = {'pools': [{
'pool_name': 'pool1', 'thick_provisioning_support': True,
'thin_provisioning_support': False, 'total_capacity_gb': 10,
'free_capacity_gb': 10, 'max_over_subscription_ratio': 1,
'provisioned_capacity_gb': 0, 'allocated_capacity_gb': 0,
'reserved_percentage': 0}]}
capab2 = {'pools': [{
'pool_name': 'pool1', 'thick_provisioning_support': True,
'thin_provisioning_support': False, 'total_capacity_gb': 10,
'free_capacity_gb': 9, 'max_over_subscription_ratio': 1,
'provisioned_capacity_gb': 1, 'allocated_capacity_gb': 1,
'reserved_percentage': 0}]}
self.host_manager.service_states['host1'] = (
dict(dict(timestamp=31346), **capab2))
self.host_manager.service_states_last_update['host1'] = (
dict(dict(timestamp=31340), **capab1))
self.host_manager_1.service_states['host1'] = (
dict(dict(timestamp=31348), **capab2))
self.host_manager_1.service_states_last_update['host1'] = (
dict(dict(timestamp=31347), **capab2))
# Run 5:
# capa: capa2
# S0: notify_service_capabilities()
# S0: update_service_capabilities()
# S1: update_service_capabilities()
#
# This is the special case not handled.
# 1) capab is changed (from capab1 to capab2)
# 2) S1 has already notify the capab2 in Run 4.
# 3) S0 just got update_service_capabilities() in Run 4.
# 4) S0 got notify_service_capabilities() immediately in next run,
# here is Run 5.
# S0 has no ways to know whether other scheduler (here is S1) who
# has noitified the changed capab2 or not. S0 just thinks it's his
# own turn to notify the changed capab2.
# In this case, we have notified the same capabilities twice.
#
# S0: notify_service_capabilities()
self.host_manager.notify_service_capabilities(service_name, 'host1',
capab2)
self.assertDictMatch(
dict(dict(timestamp=31349), **capab2),
self.host_manager.service_states_last_update['host1'])
self.assertDictMatch(dict(dict(timestamp=31346), **capab2),
self.host_manager.service_states['host1'])
# S0 notify capab2 to ceilometer.
self.assertTrue(3, _mock_get_usage_and_notify.call_count)
# S0: update_service_capabilities()
self.host_manager.update_service_capabilities(service_name, 'host1',
capab2)
self.assertDictMatch(
dict(dict(timestamp=31349), **capab2),
self.host_manager.service_states_last_update['host1'])
self.assertDictMatch(dict(dict(timestamp=31350), **capab2),
self.host_manager.service_states['host1'])
# S1: update_service_capabilities()
self.host_manager_1.update_service_capabilities(service_name, 'host1',
capab2)
self.assertDictMatch(
dict(dict(timestamp=31348), **capab2),
self.host_manager_1.service_states_last_update['host1'])
self.assertDictMatch(dict(dict(timestamp=31351), **capab2),
self.host_manager_1.service_states['host1'])
@mock.patch('cinder.objects.service.Service.is_up', @mock.patch('cinder.objects.service.Service.is_up',
new_callable=mock.PropertyMock) new_callable=mock.PropertyMock)
@mock.patch('cinder.db.service_get_all') @mock.patch('cinder.db.service_get_all')
@ -397,6 +751,137 @@ class HostManagerTestCase(test.TestCase):
self.assertEqual(sorted(expected, key=sort_func), self.assertEqual(sorted(expected, key=sort_func),
sorted(res, key=sort_func)) sorted(res, key=sort_func))
def test_get_usage(self):
host = "host1@backend1"
timestamp = 40000
volume_stats1 = {'pools': [
{'pool_name': 'pool1',
'total_capacity_gb': 30.01,
'free_capacity_gb': 28.01,
'allocated_capacity_gb': 2.0,
'provisioned_capacity_gb': 2.0,
'max_over_subscription_ratio': 1.0,
'thin_provisioning_support': False,
'thick_provisioning_support': True,
'reserved_percentage': 5},
{'pool_name': 'pool2',
'total_capacity_gb': 20.01,
'free_capacity_gb': 18.01,
'allocated_capacity_gb': 2.0,
'provisioned_capacity_gb': 2.0,
'max_over_subscription_ratio': 2.0,
'thin_provisioning_support': True,
'thick_provisioning_support': False,
'reserved_percentage': 5}]}
updated_pools1 = [{'pool_name': 'pool1',
'total_capacity_gb': 30.01,
'free_capacity_gb': 28.01,
'allocated_capacity_gb': 2.0,
'provisioned_capacity_gb': 2.0,
'max_over_subscription_ratio': 1.0,
'thin_provisioning_support': False,
'thick_provisioning_support': True,
'reserved_percentage': 5},
{'pool_name': 'pool2',
'total_capacity_gb': 20.01,
'free_capacity_gb': 18.01,
'allocated_capacity_gb': 2.0,
'provisioned_capacity_gb': 2.0,
'max_over_subscription_ratio': 2.0,
'thin_provisioning_support': True,
'thick_provisioning_support': False,
'reserved_percentage': 5}]
volume_stats2 = {'pools': [
{'pool_name': 'pool1',
'total_capacity_gb': 30.01,
'free_capacity_gb': 28.01,
'allocated_capacity_gb': 2.0,
'provisioned_capacity_gb': 2.0,
'max_over_subscription_ratio': 2.0,
'thin_provisioning_support': True,
'thick_provisioning_support': False,
'reserved_percentage': 0},
{'pool_name': 'pool2',
'total_capacity_gb': 20.01,
'free_capacity_gb': 18.01,
'allocated_capacity_gb': 2.0,
'provisioned_capacity_gb': 2.0,
'max_over_subscription_ratio': 2.0,
'thin_provisioning_support': True,
'thick_provisioning_support': False,
'reserved_percentage': 5}]}
updated_pools2 = [{'pool_name': 'pool1',
'total_capacity_gb': 30.01,
'free_capacity_gb': 28.01,
'allocated_capacity_gb': 2.0,
'provisioned_capacity_gb': 2.0,
'max_over_subscription_ratio': 2.0,
'thin_provisioning_support': True,
'thick_provisioning_support': False,
'reserved_percentage': 0}]
expected1 = [
{"name_to_id": 'host1@backend1#pool1',
"type": "pool",
"total": 30.01,
"free": 28.01,
"allocated": 2.0,
"provisioned": 2.0,
"virtual_free": 27.01,
"reported_at": 40000},
{"name_to_id": 'host1@backend1#pool2',
"type": "pool",
"total": 20.01,
"free": 18.01,
"allocated": 2.0,
"provisioned": 2.0,
"virtual_free": 37.02,
"reported_at": 40000},
{"name_to_id": 'host1@backend1',
"type": "backend",
"total": 50.02,
"free": 46.02,
"allocated": 4.0,
"provisioned": 4.0,
"virtual_free": 64.03,
"reported_at": 40000}]
expected2 = [
{"name_to_id": 'host1@backend1#pool1',
"type": "pool",
"total": 30.01,
"free": 28.01,
"allocated": 2.0,
"provisioned": 2.0,
"virtual_free": 58.02,
"reported_at": 40000},
{"name_to_id": 'host1@backend1',
"type": "backend",
"total": 50.02,
"free": 46.02,
"allocated": 4.0,
"provisioned": 4.0,
"virtual_free": 95.04,
"reported_at": 40000}]
def sort_func(data):
return data['name_to_id']
res1 = self.host_manager._get_usage(volume_stats1,
updated_pools1, host, timestamp)
self.assertEqual(len(expected1), len(res1))
self.assertEqual(sorted(expected1, key=sort_func),
sorted(res1, key=sort_func))
res2 = self.host_manager._get_usage(volume_stats2,
updated_pools2, host, timestamp)
self.assertEqual(len(expected2), len(res2))
self.assertEqual(sorted(expected2, key=sort_func),
sorted(res2, key=sort_func))
class HostStateTestCase(test.TestCase): class HostStateTestCase(test.TestCase):
"""Test case for HostState class.""" """Test case for HostState class."""

View File

@ -96,6 +96,16 @@ class SchedulerRpcAPITestCase(test.TestCase):
version='3.0') version='3.0')
create_worker_mock.assert_called_once() create_worker_mock.assert_called_once()
def test_notify_service_capabilities(self):
capabilities = {'host': 'fake_host',
'total': '10.01', }
self._test_scheduler_api('notify_service_capabilities',
rpc_method='cast',
service_name='fake_name',
host='fake_host',
capabilities=capabilities,
version='3.1')
def test_create_volume_serialization(self): def test_create_volume_serialization(self):
volume = fake_volume.fake_volume_obj(self.context) volume = fake_volume.fake_volume_obj(self.context)
create_worker_mock = self.mock_object(volume, 'create_worker') create_worker_mock = self.mock_object(volume, 'create_worker')

View File

@ -400,6 +400,70 @@ class LVMVolumeDriverTestCase(test.TestCase):
bs = volume_utils._check_blocksize('ABM') bs = volume_utils._check_blocksize('ABM')
self.assertEqual('1M', bs) self.assertEqual('1M', bs)
@mock.patch('cinder.volume.utils._usage_from_capacity')
@mock.patch('cinder.volume.utils.CONF')
@mock.patch('cinder.volume.utils.rpc')
def test_notify_about_capacity_usage(self, mock_rpc,
mock_conf, mock_usage):
mock_conf.host = 'host1'
output = volume_utils.notify_about_capacity_usage(
mock.sentinel.context,
mock.sentinel.capacity,
'test_suffix')
self.assertIsNone(output)
mock_usage.assert_called_once_with(mock.sentinel.capacity)
mock_rpc.get_notifier.assert_called_once_with('capacity', 'host1')
mock_rpc.get_notifier.return_value.info.assert_called_once_with(
mock.sentinel.context,
'capacity.test_suffix',
mock_usage.return_value)
@mock.patch('cinder.volume.utils._usage_from_capacity')
@mock.patch('cinder.volume.utils.CONF')
@mock.patch('cinder.volume.utils.rpc')
def test_notify_about_capacity_usage_with_kwargs(self, mock_rpc, mock_conf,
mock_usage):
mock_conf.host = 'host1'
output = volume_utils.notify_about_capacity_usage(
mock.sentinel.context,
mock.sentinel.capacity,
'test_suffix',
extra_usage_info={'a': 'b', 'c': 'd'},
host='host2')
self.assertIsNone(output)
mock_usage.assert_called_once_with(mock.sentinel.capacity,
a='b', c='d')
mock_rpc.get_notifier.assert_called_once_with('capacity', 'host2')
mock_rpc.get_notifier.return_value.info.assert_called_once_with(
mock.sentinel.context,
'capacity.test_suffix',
mock_usage.return_value)
def test_usage_from_capacity(self):
test_capacity = {
'name_to_id': 'host1@backend1#pool1',
'type': 'pool',
'total': '10.01',
'free': '8.01',
'allocated': '2',
'provisioned': '2',
'virtual_free': '8.01',
'reported_at': '2014-12-11T10:10:00',
}
usage_info = volume_utils._usage_from_capacity(
test_capacity)
expected_capacity = {
'name_to_id': 'host1@backend1#pool1',
'total': '10.01',
'free': '8.01',
'allocated': '2',
'provisioned': '2',
'virtual_free': '8.01',
'reported_at': '2014-12-11T10:10:00',
}
self.assertEqual(expected_capacity, usage_info)
class OdirectSupportTestCase(test.TestCase): class OdirectSupportTestCase(test.TestCase):
@mock.patch('cinder.utils.execute') @mock.patch('cinder.utils.execute')

View File

@ -197,6 +197,37 @@ def notify_about_snapshot_usage(context, snapshot, event_suffix,
usage_info) usage_info)
def _usage_from_capacity(capacity, **extra_usage_info):
capacity_info = {
'name_to_id': capacity['name_to_id'],
'total': capacity['total'],
'free': capacity['free'],
'allocated': capacity['allocated'],
'provisioned': capacity['provisioned'],
'virtual_free': capacity['virtual_free'],
'reported_at': capacity['reported_at']
}
capacity_info.update(extra_usage_info)
return capacity_info
def notify_about_capacity_usage(context, capacity, suffix,
extra_usage_info=None, host=None):
if not host:
host = CONF.host
if not extra_usage_info:
extra_usage_info = {}
usage_info = _usage_from_capacity(capacity, **extra_usage_info)
rpc.get_notifier('capacity', host).info(context,
'capacity.%s' % suffix,
usage_info)
def notify_about_replication_usage(context, volume, suffix, def notify_about_replication_usage(context, volume, suffix,
extra_usage_info=None, host=None): extra_usage_info=None, host=None):
if not host: if not host:

View File

@ -0,0 +1,5 @@
---
features:
- Cinder is now collecting capacity data, including
virtual free capacity etc from the backends. A notification
which includes that data is periodically emitted.