Workload balance migration strategy implementation

This is one of the algorithm of Intel thermal POC.
It's based on the VM workloads of hypervisors.

Change-Id: I45ab0cf0f05786e6f68025bdd315f38381900a68
blueprint: workload-balance-migration-strategy
This commit is contained in:
junjie huang
2016-03-13 18:02:27 +00:00
parent dcb5c1f9fc
commit 30bdf29002
5 changed files with 572 additions and 1 deletions

View File

@@ -51,6 +51,7 @@ watcher_strategies =
outlet_temp_control = watcher.decision_engine.strategy.strategies.outlet_temp_control:OutletTempControl
vm_workload_consolidation = watcher.decision_engine.strategy.strategies.vm_workload_consolidation:VMWorkloadConsolidation
workload_stabilization = watcher.decision_engine.strategy.strategies.workload_stabilization:WorkloadStabilization
workload_balance = watcher.decision_engine.strategy.strategies.workload_balance:WorkloadBalance
watcher_actions =
migrate = watcher.applier.actions.migration:Migrate

View File

@@ -0,0 +1,324 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 Intel Corp
#
# Authors: Junjie-Huang <junjie.huang@intel.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from oslo_log import log
from watcher._i18n import _LE, _LI, _LW
from watcher.common import exception as wexc
from watcher.decision_engine.model import resource
from watcher.decision_engine.model import vm_state
from watcher.decision_engine.strategy.strategies import base
from watcher.metrics_engine.cluster_history import ceilometer as ceil
LOG = log.getLogger(__name__)
class WorkloadBalance(base.BaseStrategy):
"""[PoC]Workload balance using live migration
*Description*
It is a migration strategy based on the VM workload of physical
servers. It generates solutions to move a workload whenever a server's
CPU utilization % is higher than the specified threshold.
The VM to be moved should make the host close to average workload
of all hypervisors.
*Requirements*
* Hardware: compute node should use the same physical CPUs
* Software: Ceilometer component ceilometer-agent-compute running
in each compute node, and Ceilometer API can report such telemetry
"cpu_util" successfully.
* You must have at least 2 physical compute nodes to run this strategy
*Limitations*
- This is a proof of concept that is not meant to be used in production
- We cannot forecast how many servers should be migrated. This is the
reason why we only plan a single virtual machine migration at a time.
So it's better to use this algorithm with `CONTINUOUS` audits.
- It assume that live migrations are possible
"""
# The meter to report CPU utilization % of VM in ceilometer
METER_NAME = "cpu_util"
# Unit: %, value range is [0 , 100]
# TODO(Junjie): make it configurable
THRESHOLD = 25.0
# choose 300 seconds as the default duration of meter aggregation
# TODO(Junjie): make it configurable
PERIOD = 300
MIGRATION = "migrate"
def __init__(self, osc=None):
"""Using live migration
:param osc: an OpenStackClients object
"""
super(WorkloadBalance, self).__init__(osc)
# the migration plan will be triggered when the CPU utlization %
# reaches threshold
# TODO(Junjie): Threshold should be configurable for each audit
self.threshold = self.THRESHOLD
self._meter = self.METER_NAME
self._ceilometer = None
self._period = self.PERIOD
@property
def ceilometer(self):
if self._ceilometer is None:
self._ceilometer = ceil.CeilometerClusterHistory(osc=self.osc)
return self._ceilometer
@ceilometer.setter
def ceilometer(self, c):
self._ceilometer = c
@classmethod
def get_name(cls):
return "workload_balance"
@classmethod
def get_display_name(cls):
return _("workload balance migration strategy")
@classmethod
def get_translatable_display_name(cls):
return "workload balance migration strategy"
@classmethod
def get_goal_name(cls):
return "WORKLOAD_OPTIMIZATION"
@classmethod
def get_goal_display_name(cls):
return _("Workload optimization")
@classmethod
def get_translatable_goal_display_name(cls):
return "Workload optimization"
def calculate_used_resource(self, model, hypervisor, cap_cores, cap_mem,
cap_disk):
'''calculate the used vcpus, memory and disk based on VM flavors'''
vms = model.get_mapping().get_node_vms(hypervisor)
vcpus_used = 0
memory_mb_used = 0
disk_gb_used = 0
for vm_id in vms:
vm = model.get_vm_from_id(vm_id)
vcpus_used += cap_cores.get_capacity(vm)
memory_mb_used += cap_mem.get_capacity(vm)
disk_gb_used += cap_disk.get_capacity(vm)
return vcpus_used, memory_mb_used, disk_gb_used
def choose_vm_to_migrate(self, model, hosts, avg_workload, workload_cache):
"""pick up an active vm instance to migrate from provided hosts
:param model: it's the origin_model passed from 'execute' function
:param hosts: the array of dict which contains hypervisor object
:param avg_workload: the average workload value of all hypervisors
:param workload_cache: the map contains vm to workload mapping
"""
for hvmap in hosts:
source_hypervisor = hvmap['hv']
source_vms = model.get_mapping().get_node_vms(source_hypervisor)
if source_vms:
delta_workload = hvmap['workload'] - avg_workload
min_delta = 1000000
instance_id = None
for vm_id in source_vms:
try:
# select the first active VM to migrate
vm = model.get_vm_from_id(vm_id)
if vm.state != vm_state.VMState.ACTIVE.value:
LOG.debug("VM not active, skipped: %s",
vm.uuid)
continue
current_delta = delta_workload - workload_cache[vm_id]
if 0 <= current_delta < min_delta:
min_delta = current_delta
instance_id = vm_id
except wexc.InstanceNotFound:
LOG.error(_LE("VM not found Error: %s"), vm_id)
if instance_id:
return source_hypervisor, model.get_vm_from_id(instance_id)
else:
LOG.info(_LI("VM not found from hypervisor: %s"),
source_hypervisor.uuid)
def filter_destination_hosts(self, model, hosts, vm_to_migrate,
avg_workload, workload_cache):
'''Only return hosts with sufficient available resources'''
cap_cores = model.get_resource_from_id(resource.ResourceType.cpu_cores)
cap_disk = model.get_resource_from_id(resource.ResourceType.disk)
cap_mem = model.get_resource_from_id(resource.ResourceType.memory)
required_cores = cap_cores.get_capacity(vm_to_migrate)
required_disk = cap_disk.get_capacity(vm_to_migrate)
required_mem = cap_mem.get_capacity(vm_to_migrate)
# filter hypervisors without enough resource
destination_hosts = []
src_vm_workload = workload_cache[vm_to_migrate.uuid]
for hvmap in hosts:
host = hvmap['hv']
workload = hvmap['workload']
# calculate the available resources
cores_used, mem_used, disk_used = self.calculate_used_resource(
model, host, cap_cores, cap_mem, cap_disk)
cores_available = cap_cores.get_capacity(host) - cores_used
disk_available = cap_disk.get_capacity(host) - disk_used
mem_available = cap_mem.get_capacity(host) - mem_used
if (cores_available >= required_cores and
disk_available >= required_disk and
mem_available >= required_mem and
(src_vm_workload + workload) < self.threshold / 100 *
cap_cores.get_capacity(host)):
destination_hosts.append(hvmap)
return destination_hosts
def group_hosts_by_cpu_util(self, model):
"""Calculate the workloads of each hypervisor
try to find out the hypervisors which have reached threshold
and the hypervisors which are under threshold.
and also calculate the average workload value of all hypervisors.
and also generate the VM workload map.
"""
hypervisors = model.get_all_hypervisors()
cluster_size = len(hypervisors)
if not hypervisors:
raise wexc.ClusterEmpty()
# get cpu cores capacity of hypervisors and vms
cap_cores = model.get_resource_from_id(resource.ResourceType.cpu_cores)
overload_hosts = []
nonoverload_hosts = []
# total workload of cluster
# it's the total core numbers being utilized in a cluster.
cluster_workload = 0.0
# use workload_cache to store the workload of VMs for reuse purpose
workload_cache = {}
for hypervisor_id in hypervisors:
hypervisor = model.get_hypervisor_from_id(hypervisor_id)
vms = model.get_mapping().get_node_vms(hypervisor)
hypervisor_workload = 0.0
for vm_id in vms:
vm = model.get_vm_from_id(vm_id)
try:
cpu_util = self.ceilometer.statistic_aggregation(
resource_id=vm_id,
meter_name=self._meter,
period=self._period,
aggregate='avg')
except Exception as e:
LOG.error(_LE("Can not get cpu_util: %s"), e.message)
continue
if cpu_util is None:
LOG.debug("%s: cpu_util is None", vm_id)
continue
vm_cores = cap_cores.get_capacity(vm)
workload_cache[vm_id] = cpu_util * vm_cores / 100
hypervisor_workload += workload_cache[vm_id]
LOG.debug("%s: cpu_util %f", vm_id, cpu_util)
hypervisor_cores = cap_cores.get_capacity(hypervisor)
hy_cpu_util = hypervisor_workload / hypervisor_cores * 100
cluster_workload += hypervisor_workload
hvmap = {'hv': hypervisor, "cpu_util": hy_cpu_util, 'workload':
hypervisor_workload}
if hy_cpu_util >= self.threshold:
# mark the hypervisor to release resources
overload_hosts.append(hvmap)
else:
nonoverload_hosts.append(hvmap)
avg_workload = cluster_workload / cluster_size
return overload_hosts, nonoverload_hosts, avg_workload, workload_cache
def execute(self, origin_model):
LOG.info(_LI("Initializing Workload Balance Strategy"))
if origin_model is None:
raise wexc.ClusterStateNotDefined()
current_model = origin_model
src_hypervisors, target_hypervisors, avg_workload, workload_cache = (
self.group_hosts_by_cpu_util(current_model))
if not src_hypervisors:
LOG.debug("No hosts require optimization")
return self.solution
if not target_hypervisors:
LOG.warning(_LW("No hosts current have CPU utilization under %s "
"percent, therefore there are no possible target "
"hosts for any migration"),
self.threshold)
return self.solution
# choose the server with largest cpu_util
src_hypervisors = sorted(src_hypervisors,
reverse=True,
key=lambda x: (x[self.METER_NAME]))
vm_to_migrate = self.choose_vm_to_migrate(current_model,
src_hypervisors,
avg_workload,
workload_cache)
if not vm_to_migrate:
return self.solution
source_hypervisor, vm_src = vm_to_migrate
# find the hosts that have enough resource for the VM to be migrated
destination_hosts = self.filter_destination_hosts(current_model,
target_hypervisors,
vm_src,
avg_workload,
workload_cache)
# sort the filtered result by workload
# pick up the lowest one as dest server
if not destination_hosts:
# for instance.
LOG.warning(_LW("No proper target host could be found, it might "
"be because of there's no enough CPU/Memory/DISK"))
return self.solution
destination_hosts = sorted(destination_hosts,
key=lambda x: (x["cpu_util"]))
# always use the host with lowerest CPU utilization
mig_dst_hypervisor = destination_hosts[0]['hv']
# generate solution to migrate the vm to the dest server,
if current_model.get_mapping().migrate_vm(vm_src,
source_hypervisor,
mig_dst_hypervisor):
parameters = {'migration_type': 'live',
'src_hypervisor': source_hypervisor.uuid,
'dst_hypervisor': mig_dst_hypervisor.uuid}
self.solution.add_action(action_type=self.MIGRATION,
resource_id=vm_src.uuid,
input_parameters=parameters)
self.solution.model = current_model
return self.solution

View File

@@ -235,3 +235,73 @@ class FakerModelCollector(base.BaseClusterModelCollector):
current_state_cluster.get_vm_from_id("VM_0"))
return current_state_cluster
def generate_scenario_6_with_2_hypervisors(self):
vms = []
root = modelroot.ModelRoot()
# number of nodes
count_node = 2
# define ressouce ( CPU, MEM disk, ... )
mem = resource.Resource(resource.ResourceType.memory)
# 2199.954 Mhz
num_cores = resource.Resource(resource.ResourceType.cpu_cores)
disk = resource.Resource(resource.ResourceType.disk)
root.create_resource(mem)
root.create_resource(num_cores)
root.create_resource(disk)
for i in range(0, count_node):
node_uuid = "Node_{0}".format(i)
node = hypervisor.Hypervisor()
node.uuid = node_uuid
node.hostname = "hostname_{0}".format(i)
mem.set_capacity(node, 132)
disk.set_capacity(node, 250)
num_cores.set_capacity(node, 40)
root.add_hypervisor(node)
vm1 = modelvm.VM()
vm1.uuid = "VM_1"
mem.set_capacity(vm1, 2)
disk.set_capacity(vm1, 20)
num_cores.set_capacity(vm1, 10)
vms.append(vm1)
root.add_vm(vm1)
vm11 = modelvm.VM()
vm11.uuid = "73b09e16-35b7-4922-804e-e8f5d9b740fc"
mem.set_capacity(vm11, 2)
disk.set_capacity(vm11, 20)
num_cores.set_capacity(vm11, 10)
vms.append(vm11)
root.add_vm(vm11)
vm2 = modelvm.VM()
vm2.uuid = "VM_3"
mem.set_capacity(vm2, 2)
disk.set_capacity(vm2, 20)
num_cores.set_capacity(vm2, 10)
vms.append(vm2)
root.add_vm(vm2)
vm21 = modelvm.VM()
vm21.uuid = "VM_4"
mem.set_capacity(vm21, 2)
disk.set_capacity(vm21, 20)
num_cores.set_capacity(vm21, 10)
vms.append(vm21)
root.add_vm(vm21)
root.get_mapping().map(root.get_hypervisor_from_id("Node_0"),
root.get_vm_from_id(str(vm1.uuid)))
root.get_mapping().map(root.get_hypervisor_from_id("Node_0"),
root.get_vm_from_id(str(vm11.uuid)))
root.get_mapping().map(root.get_hypervisor_from_id("Node_1"),
root.get_vm_from_id(str(vm2.uuid)))
root.get_mapping().map(root.get_hypervisor_from_id("Node_1"),
root.get_vm_from_id(str(vm21.uuid)))
return root

View File

@@ -44,6 +44,13 @@ class FakerMetricsCollector(object):
result = self.get_average_outlet_temperature(resource_id)
return result
def mock_get_statistics_wb(self, resource_id, meter_name, period,
aggregate='avg'):
result = 0
if meter_name == "cpu_util":
result = self.get_average_usage_vm_cpu_wb(resource_id)
return result
def get_average_outlet_temperature(self, uuid):
"""The average outlet temperature for host"""
mock = {}
@@ -52,7 +59,6 @@ class FakerMetricsCollector(object):
mock['Node_1'] = 100
if uuid not in mock.keys():
mock[uuid] = 100
return mock[str(uuid)]
def get_usage_node_ram(self, uuid):
@@ -109,6 +115,26 @@ class FakerMetricsCollector(object):
return float(mock[str(uuid)])
def get_average_usage_vm_cpu_wb(self, uuid):
"""The last VM CPU usage values to average
:param uuid:00
:return:
"""
# query influxdb stream
# compute in stream
# Normalize
mock = {}
# node 0
mock['VM_1'] = 80
mock['73b09e16-35b7-4922-804e-e8f5d9b740fc'] = 50
# node 1
mock['VM_3'] = 20
mock['VM_4'] = 10
return float(mock[str(uuid)])
def get_average_usage_vm_cpu(self, uuid):
"""The last VM CPU usage values to average

View File

@@ -0,0 +1,150 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 Intel Corp
#
# Authors: Junjie-Huang <junjie.huang@intel.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import collections
import mock
from watcher.applier.actions.loading import default
from watcher.common import exception
from watcher.decision_engine.model import model_root
from watcher.decision_engine.model import resource
from watcher.decision_engine.strategy.strategies import workload_balance
from watcher.tests import base
from watcher.tests.decision_engine.strategy.strategies \
import faker_cluster_state
from watcher.tests.decision_engine.strategy.strategies \
import faker_metrics_collector
class TestWorkloadBalance(base.BaseTestCase):
# fake metrics
fake_metrics = faker_metrics_collector.FakerMetricsCollector()
# fake cluster
fake_cluster = faker_cluster_state.FakerModelCollector()
def test_calc_used_res(self):
model = self.fake_cluster.generate_scenario_6_with_2_hypervisors()
strategy = workload_balance.WorkloadBalance()
hypervisor = model.get_hypervisor_from_id('Node_0')
cap_cores = model.get_resource_from_id(resource.ResourceType.cpu_cores)
cap_mem = model.get_resource_from_id(resource.ResourceType.memory)
cap_disk = model.get_resource_from_id(resource.ResourceType.disk)
cores_used, mem_used, disk_used = strategy.calculate_used_resource(
model, hypervisor, cap_cores, cap_mem, cap_disk)
self.assertEqual((cores_used, mem_used, disk_used), (20, 4, 40))
def test_group_hosts_by_cpu_util(self):
model = self.fake_cluster.generate_scenario_6_with_2_hypervisors()
strategy = workload_balance.WorkloadBalance()
strategy.threshold = 30
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
h1, h2, avg, w_map = strategy.group_hosts_by_cpu_util(model)
# print h1, h2, avg, w_map
self.assertEqual(h1[0]['hv'].uuid, 'Node_0')
self.assertEqual(h2[0]['hv'].uuid, 'Node_1')
self.assertEqual(avg, 8.0)
def test_choose_vm_to_migrate(self):
model = self.fake_cluster.generate_scenario_6_with_2_hypervisors()
strategy = workload_balance.WorkloadBalance()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
h1, h2, avg, w_map = strategy.group_hosts_by_cpu_util(model)
vm_to_mig = strategy.choose_vm_to_migrate(model, h1, avg, w_map)
self.assertEqual(vm_to_mig[0].uuid, 'Node_0')
self.assertEqual(vm_to_mig[1].uuid,
"73b09e16-35b7-4922-804e-e8f5d9b740fc")
def test_choose_vm_notfound(self):
model = self.fake_cluster.generate_scenario_6_with_2_hypervisors()
strategy = workload_balance.WorkloadBalance()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
h1, h2, avg, w_map = strategy.group_hosts_by_cpu_util(model)
vms = model.get_all_vms()
vms.clear()
vm_to_mig = strategy.choose_vm_to_migrate(model, h1, avg, w_map)
self.assertEqual(vm_to_mig, None)
def test_filter_destination_hosts(self):
model = self.fake_cluster.generate_scenario_6_with_2_hypervisors()
strategy = workload_balance.WorkloadBalance()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
h1, h2, avg, w_map = strategy.group_hosts_by_cpu_util(model)
vm_to_mig = strategy.choose_vm_to_migrate(model, h1, avg, w_map)
dest_hosts = strategy.filter_destination_hosts(model, h2, vm_to_mig[1],
avg, w_map)
self.assertEqual(len(dest_hosts), 1)
self.assertEqual(dest_hosts[0]['hv'].uuid, 'Node_1')
def test_exception_model(self):
strategy = workload_balance.WorkloadBalance()
self.assertRaises(exception.ClusterStateNotDefined, strategy.execute,
None)
def test_exception_cluster_empty(self):
strategy = workload_balance.WorkloadBalance()
model = model_root.ModelRoot()
self.assertRaises(exception.ClusterEmpty, strategy.execute, model)
def test_execute_cluster_empty(self):
strategy = workload_balance.WorkloadBalance()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
model = model_root.ModelRoot()
self.assertRaises(exception.ClusterEmpty, strategy.execute, model)
def test_execute_no_workload(self):
strategy = workload_balance.WorkloadBalance()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
current_state_cluster = faker_cluster_state.FakerModelCollector()
model = current_state_cluster. \
generate_scenario_4_with_1_hypervisor_no_vm()
solution = strategy.execute(model)
self.assertEqual(solution.actions, [])
def test_execute(self):
strategy = workload_balance.WorkloadBalance()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
model = self.fake_cluster.generate_scenario_6_with_2_hypervisors()
solution = strategy.execute(model)
actions_counter = collections.Counter(
[action.get('action_type') for action in solution.actions])
num_migrations = actions_counter.get("migrate", 0)
self.assertEqual(num_migrations, 1)
def test_check_parameters(self):
strategy = workload_balance.WorkloadBalance()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics_wb)
model = self.fake_cluster.generate_scenario_6_with_2_hypervisors()
solution = strategy.execute(model)
loader = default.DefaultActionLoader()
for action in solution.actions:
loaded_action = loader.load(action['action_type'])
loaded_action.input_parameters = action['input_parameters']
loaded_action.validate_parameters()