From d433d6b3c8cf3547eb5813fbd6f06206d90a7872 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vincent=20Fran=C3=A7oise?= Date: Tue, 10 Jan 2017 16:06:06 +0100 Subject: [PATCH] Graph cluster model instead of mapping one In this changeset, I use https://review.openstack.org/#/c/362730/ as an example to make the existing ModelRoot fully graph-based. Change-Id: I3a1ec8674b885d75221035459233722c18972f67 Implements: blueprint graph-based-cluster-model --- requirements.txt | 2 + watcher/common/nova_helper.py | 9 + watcher/decision_engine/model/base.py | 4 + .../decision_engine/model/collector/nova.py | 363 +++++++++++++++--- watcher/decision_engine/model/element/base.py | 40 +- .../model/element/compute_resource.py | 36 +- .../decision_engine/model/element/instance.py | 22 +- watcher/decision_engine/model/element/node.py | 35 +- .../decision_engine/model/element/resource.py | 3 +- watcher/decision_engine/model/mapping.py | 101 ----- watcher/decision_engine/model/model_root.py | 250 ++++++------ .../model/notification/nova.py | 105 +++-- watcher/decision_engine/scope/default.py | 42 +- .../strategies/basic_consolidation.py | 36 +- .../strategies/outlet_temp_control.py | 47 +-- .../strategy/strategies/uniform_airflow.py | 44 +-- .../strategies/vm_workload_consolidation.py | 37 +- .../strategy/strategies/workload_balance.py | 32 +- .../strategies/workload_stabilization.py | 17 +- watcher/objects/fields.py | 4 + .../decision_engine/cluster/test_nova_cdmc.py | 24 +- .../decision_engine/model/data/scenario_1.xml | 80 ++-- .../model/data/scenario_1_with_metrics.xml | 8 +- .../model/data/scenario_2_with_metrics.xml | 20 +- .../model/data/scenario_3_with_2_nodes.xml | 8 +- .../model/data/scenario_3_with_metrics.xml | 12 +- .../scenario_4_with_1_node_no_instance.xml | 2 +- .../data/scenario_5_with_instance_disk_0.xml | 4 +- .../model/data/scenario_6_with_2_nodes.xml | 12 +- .../model/data/scenario_7_with_2_nodes.xml | 12 +- .../model/data/scenario_8_with_4_nodes.xml | 20 +- ..._9_with_3_active_plus_1_disabled_nodes.xml | 20 +- .../model/faker_cluster_and_metrics.py | 7 +- .../model/faker_cluster_state.py | 100 ++--- .../notification/test_nova_notifications.py | 59 +-- .../decision_engine/model/test_disk_info.py | 33 -- .../decision_engine/model/test_element.py | 72 ++++ .../decision_engine/model/test_instance.py | 30 -- .../decision_engine/model/test_mapping.py | 113 ------ .../tests/decision_engine/model/test_model.py | 6 +- .../decision_engine/scope/test_default.py | 111 +++--- .../strategies/test_uniform_airflow.py | 2 +- .../test_vm_workload_consolidation.py | 6 +- .../strategies/test_workload_balance.py | 4 +- 44 files changed, 1001 insertions(+), 993 deletions(-) delete mode 100644 watcher/decision_engine/model/mapping.py delete mode 100644 watcher/tests/decision_engine/model/test_disk_info.py create mode 100644 watcher/tests/decision_engine/model/test_element.py delete mode 100644 watcher/tests/decision_engine/model/test_instance.py delete mode 100644 watcher/tests/decision_engine/model/test_mapping.py diff --git a/requirements.txt b/requirements.txt index 719bc03ac..6d4efbe1d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -41,3 +41,5 @@ stevedore>=1.17.1 # Apache-2.0 taskflow>=2.7.0 # Apache-2.0 WebOb>=1.6.0 # MIT WSME>=0.8 # MIT +networkx>=1.10 # BSD + diff --git a/watcher/common/nova_helper.py b/watcher/common/nova_helper.py index b6432f78f..a58575b0e 100644 --- a/watcher/common/nova_helper.py +++ b/watcher/common/nova_helper.py @@ -64,6 +64,15 @@ class NovaHelper(object): LOG.exception(exc) raise exception.ComputeNodeNotFound(name=node_hostname) + def get_instance_list(self): + return self.nova.servers.list(search_opts={'all_tenants': True}) + + def get_service(self, service_id): + return self.nova.services.find(id=service_id) + + def get_flavor(self, flavor_id): + return self.nova.flavors.get(flavor_id) + def get_aggregate_list(self): return self.nova.aggregates.list() diff --git a/watcher/decision_engine/model/base.py b/watcher/decision_engine/model/base.py index 629cd800a..8629d0530 100644 --- a/watcher/decision_engine/model/base.py +++ b/watcher/decision_engine/model/base.py @@ -34,3 +34,7 @@ class Model(object): @abc.abstractmethod def to_string(self): raise NotImplementedError() + + @abc.abstractmethod + def to_xml(self): + raise NotImplementedError() diff --git a/watcher/decision_engine/model/collector/nova.py b/watcher/decision_engine/model/collector/nova.py index 4fefee7f1..1b8559497 100644 --- a/watcher/decision_engine/model/collector/nova.py +++ b/watcher/decision_engine/model/collector/nova.py @@ -1,23 +1,21 @@ # -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Authors: Jean-Emile DARTOIS +# Copyright (c) 2017 Intel Innovation and Research Ireland Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from oslo_log import log +from watcher.common import exception from watcher.common import nova_helper from watcher.decision_engine.model.collector import base from watcher.decision_engine.model import element @@ -30,13 +28,12 @@ LOG = log.getLogger(__name__) class NovaClusterDataModelCollector(base.BaseClusterDataModelCollector): """Nova cluster data model collector - The Nova cluster data model collector creates an in-memory - representation of the resources exposed by the compute service. + The Nova cluster data model collector creates an in-memory + representation of the resources exposed by the compute service. """ def __init__(self, config, osc=None): super(NovaClusterDataModelCollector, self).__init__(config, osc) - self.wrapper = nova_helper.NovaHelper(osc=self.osc) @property def notification_endpoints(self): @@ -62,49 +59,313 @@ class NovaClusterDataModelCollector(base.BaseClusterDataModelCollector): """Build the compute cluster data model""" LOG.debug("Building latest Nova cluster data model") - model = model_root.ModelRoot() - mem = element.Resource(element.ResourceType.memory) - num_cores = element.Resource(element.ResourceType.cpu_cores) - disk = element.Resource(element.ResourceType.disk) - disk_capacity = element.Resource(element.ResourceType.disk_capacity) - model.create_resource(mem) - model.create_resource(num_cores) - model.create_resource(disk) - model.create_resource(disk_capacity) + builder = ModelBuilder(self.osc) + return builder.execute() - flavor_cache = {} - nodes = self.wrapper.get_compute_node_list() - for n in nodes: - service = self.wrapper.nova.services.find(id=n.service['id']) - # create node in cluster_model_collector - node = element.ComputeNode(n.id) - node.uuid = service.host - node.hostname = n.hypervisor_hostname - # set capacity - mem.set_capacity(node, n.memory_mb) - disk.set_capacity(node, n.free_disk_gb) - disk_capacity.set_capacity(node, n.local_gb) - num_cores.set_capacity(node, n.vcpus) - node.state = n.state - node.status = n.status - model.add_node(node) - instances = self.wrapper.get_instances_by_node(str(service.host)) - for v in instances: - # create VM in cluster_model_collector - instance = element.Instance() - instance.uuid = v.id - # nova/nova/compute/instance_states.py - instance.state = getattr(v, 'OS-EXT-STS:vm_state') - # set capacity - self.wrapper.get_flavor_instance(v, flavor_cache) - mem.set_capacity(instance, v.flavor['ram']) - # FIXME: update all strategies to use disk_capacity - # for instances instead of disk - disk.set_capacity(instance, v.flavor['disk']) - disk_capacity.set_capacity(instance, v.flavor['disk']) - num_cores.set_capacity(instance, v.flavor['vcpus']) +class ModelBuilder(object): + """Build the graph-based model - model.map_instance(instance, node) + This model builder adds the following data" - return model + - Compute-related knowledge (Nova) + - TODO(v-francoise): Storage-related knowledge (Cinder) + - TODO(v-francoise): Network-related knowledge (Neutron) + + NOTE(v-francoise): This model builder is meant to be extended in the future + to also include both storage and network information respectively coming + from Cinder and Neutron. Some prelimary work has been done in this + direction in https://review.openstack.org/#/c/362730 but since we cannot + guarantee a sufficient level of consistency for neither the storage nor the + network part before the end of the Ocata cycle, this work has been + re-scheduled for Pike. In the meantime, all the associated code has been + commented out. + """ + def __init__(self, osc): + self.osc = osc + self.model = model_root.ModelRoot() + self.nova = osc.nova() + self.nova_helper = nova_helper.NovaHelper(osc=self.osc) + # self.neutron = osc.neutron() + # self.cinder = osc.cinder() + + def _add_physical_layer(self): + """Add the physical layer of the graph. + + This includes components which represent actual infrastructure + hardware. + """ + for cnode in self.nova_helper.get_compute_node_list(): + self.add_compute_node(cnode) + + def add_compute_node(self, node): + # Build and add base node. + compute_node = self.build_compute_node(node) + self.model.add_node(compute_node) + + # NOTE(v-francoise): we can encapsulate capabilities of the node + # (special instruction sets of CPUs) in the attributes; as well as + # sub-nodes can be added re-presenting e.g. GPUs/Accelerators etc. + + # # Build & add disk, memory, network and cpu nodes. + # disk_id, disk_node = self.build_disk_compute_node(base_id, node) + # self.add_node(disk_id, disk_node) + # mem_id, mem_node = self.build_memory_compute_node(base_id, node) + # self.add_node(mem_id, mem_node) + # net_id, net_node = self._build_network_compute_node(base_id) + # self.add_node(net_id, net_node) + # cpu_id, cpu_node = self.build_cpu_compute_node(base_id, node) + # self.add_node(cpu_id, cpu_node) + + # # Connect the base compute node to the dependant nodes. + # self.add_edges_from([(base_id, disk_id), (base_id, mem_id), + # (base_id, cpu_id), (base_id, net_id)], + # label="contains") + + def build_compute_node(self, node): + """Build a compute node from a Nova compute node + + :param node: A node hypervisor instance + :type node: :py:class:`~novaclient.v2.hypervisors.Hypervisor` + """ + # build up the compute node. + compute_service = self.nova_helper.get_service(node.service["id"]) + node_attributes = { + "id": node.id, + "human_id": None, # TODO(v-francoise): get rid of it + "uuid": compute_service.host, + "hostname": node.hypervisor_hostname, + "memory": node.memory_mb, + "disk": node.free_disk_gb, + "disk_capacity": node.local_gb, + "vcpus": node.vcpus, + "state": node.state, + "status": node.status} + + compute_node = element.ComputeNode(**node_attributes) + # compute_node = self._build_node("physical", "compute", "hypervisor", + # node_attributes) + return compute_node + + # def _build_network_compute_node(self, base_node): + # attributes = {} + # net_node = self._build_node("physical", "network", "NIC", attributes) + # net_id = "{}_network".format(base_node) + # return net_id, net_node + + # def build_disk_compute_node(self, base_node, compute): + # # Build disk node attributes. + # disk_attributes = { + # "size_gb": compute.local_gb, + # "used_gb": compute.local_gb_used, + # "available_gb": compute.free_disk_gb} + # disk_node = self._build_node("physical", "storage", "disk", + # disk_attributes) + # disk_id = "{}_disk".format(base_node) + # return disk_id, disk_node + + # def build_memory_compute_node(self, base_node, compute): + # # Build memory node attributes. + # memory_attrs = {"size_mb": compute.memory_mb, + # "used_mb": compute.memory_mb_used, + # "available_mb": compute.free_ram_mb} + # memory_node = self._build_node("physical", "memory", "memory", + # memory_attrs) + # memory_id = "{}_memory".format(base_node) + # return memory_id, memory_node + + # def build_cpu_compute_node(self, base_node, compute): + # # Build memory node attributes. + # cpu_attributes = {"vcpus": compute.vcpus, + # "vcpus_used": compute.vcpus_used, + # "info": jsonutils.loads(compute.cpu_info)} + # cpu_node = self._build_node("physical", "cpu", "cpu", cpu_attributes) + # cpu_id = "{}_cpu".format(base_node) + # return cpu_id, cpu_node + + # @staticmethod + # def _build_node(layer, category, node_type, attributes): + # return {"layer": layer, "category": category, "type": node_type, + # "attributes": attributes} + + def _add_virtual_layer(self): + """Add the virtual layer to the graph. + + This layer is the virtual components of the infrastructure, + such as vms. + """ + self._add_virtual_servers() + # self._add_virtual_network() + # self._add_virtual_storage() + + def _add_virtual_servers(self): + all_instances = self.nova_helper.get_instance_list() + for inst in all_instances: + # Add Node + instance = self._build_instance_node(inst) + self.model.add_instance(instance) + # Get the cnode_name uuid. + cnode_uuid = getattr(inst, "OS-EXT-SRV-ATTR:host") + if cnode_uuid is None: + # The instance is not attached to any Compute node + continue + try: + # Nova compute node + # cnode = self.nova_helper.get_compute_node_by_hostname( + # cnode_uuid) + compute_node = self.model.get_node_by_uuid( + cnode_uuid) + # Connect the instance to its compute node + self.model.add_edge( + instance, compute_node, label='RUNS_ON') + except exception.ComputeNodeNotFound: + continue + + def _build_instance_node(self, instance): + """Build an instance node + + Create an instance node for the graph using nova and the + `server` nova object. + :param instance: Nova VM object. + :return: A instance node for the graph. + """ + flavor = self.nova_helper.get_flavor(instance.flavor["id"]) + instance_attributes = { + "uuid": instance.id, + "human_id": instance.human_id, + "memory": flavor.ram, + "disk": flavor.disk, + "disk_capacity": flavor.disk, + "vcpus": flavor.vcpus, + "state": getattr(instance, "OS-EXT-STS:vm_state")} + + # node_attributes = dict() + # node_attributes["layer"] = "virtual" + # node_attributes["category"] = "compute" + # node_attributes["type"] = "compute" + # node_attributes["attributes"] = instance_attributes + return element.Instance(**instance_attributes) + + # def _add_virtual_storage(self): + # try: + # volumes = self.cinder.volumes.list() + # except Exception: + # return + # for volume in volumes: + # volume_id, volume_node = self._build_storage_node(volume) + # self.add_node(volume_id, volume_node) + # host = self._get_volume_host_id(volume_node) + # self.add_edge(volume_id, host) + # # Add connections to an instance. + # if volume_node['attributes']['attachments']: + # for attachment in volume_node['attributes']['attachments']: + # self.add_edge(volume_id, attachment['server_id'], + # label='ATTACHED_TO') + # volume_node['attributes'].pop('attachments') + + # def _add_virtual_network(self): + # try: + # routers = self.neutron.list_routers() + # except Exception: + # return + + # for network in self.neutron.list_networks()['networks']: + # self.add_node(*self._build_network(network)) + + # for router in routers['routers']: + # self.add_node(*self._build_router(router)) + + # router_interfaces, _, compute_ports = self._group_ports() + # for router_interface in router_interfaces: + # interface = self._build_router_interface(router_interface) + # router_interface_id = interface[0] + # router_interface_node = interface[1] + # router_id = interface[2] + # self.add_node(router_interface_id, router_interface_node) + # self.add_edge(router_id, router_interface_id) + # network_id = router_interface_node['attributes']['network_id'] + # self.add_edge(router_interface_id, network_id) + + # for compute_port in compute_ports: + # cp_id, cp_node, instance_id = self._build_compute_port_node( + # compute_port) + # self.add_node(cp_id, cp_node) + # self.add_edge(cp_id, vm_id) + # net_id = cp_node['attributes']['network_id'] + # self.add_edge(net_id, cp_id) + # # Connect port to physical node + # phys_net_node = "{}_network".format(cp_node['attributes'] + # ['binding:host_id']) + # self.add_edge(cp_id, phys_net_node) + + # def _get_volume_host_id(self, volume_node): + # host = volume_node['attributes']['os-vol-host-attr:host'] + # if host.find('@') != -1: + # host = host.split('@')[0] + # elif host.find('#') != -1: + # host = host.split('#')[0] + # return "{}_disk".format(host) + + # def _build_storage_node(self, volume_obj): + # volume = volume_obj.__dict__ + # volume["name"] = volume["id"] + # volume.pop("id") + # volume.pop("manager") + # node = self._build_node("virtual", "storage", 'volume', volume) + # return volume["name"], node + + # def _build_compute_port_node(self, compute_port): + # compute_port["name"] = compute_port["id"] + # compute_port.pop("id") + # nde_type = "{}_port".format( + # compute_port["device_owner"].split(":")[0]) + # compute_port.pop("device_owner") + # device_id = compute_port["device_id"] + # compute_port.pop("device_id") + # node = self._build_node("virtual", "network", nde_type, compute_port) + # return compute_port["name"], node, device_id + + # def _group_ports(self): + # router_interfaces = [] + # floating_ips = [] + # compute_ports = [] + # interface_types = ["network:router_interface", + # 'network:router_gateway'] + + # for port in self.neutron.list_ports()['ports']: + # if port['device_owner'] in interface_types: + # router_interfaces.append(port) + # elif port['device_owner'].startswith('compute:'): + # compute_ports.append(port) + # elif port['device_owner'] == 'network:floatingip': + # floating_ips.append(port) + + # return router_interfaces, floating_ips, compute_ports + + # def _build_router_interface(self, interface): + # interface["name"] = interface["id"] + # interface.pop("id") + # node_type = interface["device_owner"].split(":")[1] + # node = self._build_node("virtual", "network", node_type, interface) + # return interface["name"], node, interface["device_id"] + + # def _build_router(self, router): + # router_attrs = {"uuid": router['id'], + # "name": router['name'], + # "state": router['status']} + # node = self._build_node('virtual', 'network', 'router', router_attrs) + # return str(router['id']), node + + # def _build_network(self, network): + # node = self._build_node('virtual', 'network', 'network', network) + # return network['id'], node + + def execute(self): + """Instantiates the graph with the openstack cluster data. + + The graph is populated along 2 layers: virtual and physical. As each + new layer is built connections are made back to previous layers. + """ + self._add_physical_layer() + self._add_virtual_layer() + return self.model diff --git a/watcher/decision_engine/model/element/base.py b/watcher/decision_engine/model/element/base.py index 63b6441d4..21cd5b2fa 100644 --- a/watcher/decision_engine/model/element/base.py +++ b/watcher/decision_engine/model/element/base.py @@ -17,13 +17,51 @@ # limitations under the License. import abc +import collections +from lxml import etree +from oslo_log import log import six +from watcher.objects import base +from watcher.objects import fields as wfields + +LOG = log.getLogger(__name__) + @six.add_metaclass(abc.ABCMeta) -class Element(object): +class Element(base.WatcherObject, base.WatcherObjectDictCompat): + + # Initial version + VERSION = '1.0' + + fields = {} + + def __init__(self, context=None, **kwargs): + for name, field in self.fields.items(): + # The idea here is to force the initialization of unspecified + # fields that have a default value + if (name not in kwargs and not field.nullable and + field.default != wfields.UnspecifiedDefault): + kwargs[name] = field.default + super(Element, self).__init__(context, **kwargs) @abc.abstractmethod def accept(self, visitor): raise NotImplementedError() + + def as_xml_element(self): + sorted_fieldmap = [] + for field in self.fields: + try: + value = str(self[field]) + sorted_fieldmap.append((field, value)) + except Exception as exc: + LOG.exception(exc) + + attrib = collections.OrderedDict(sorted_fieldmap) + + element_name = self.__class__.__name__ + instance_el = etree.Element(element_name, attrib=attrib) + + return instance_el diff --git a/watcher/decision_engine/model/element/compute_resource.py b/watcher/decision_engine/model/element/compute_resource.py index 326956fc7..4b0348a02 100644 --- a/watcher/decision_engine/model/element/compute_resource.py +++ b/watcher/decision_engine/model/element/compute_resource.py @@ -19,39 +19,15 @@ import abc import six from watcher.decision_engine.model.element import base +from watcher.objects import fields as wfields @six.add_metaclass(abc.ABCMeta) class ComputeResource(base.Element): - def __init__(self): - self._uuid = "" - self._human_id = "" - self._hostname = "" + VERSION = '1.0' - @property - def uuid(self): - return self._uuid - - @uuid.setter - def uuid(self, u): - self._uuid = u - - @property - def hostname(self): - return self._hostname - - @hostname.setter - def hostname(self, h): - self._hostname = h - - @property - def human_id(self): - return self._human_id - - @human_id.setter - def human_id(self, h): - self._human_id = h - - def __str__(self): - return "[{0}]".format(self.uuid) + fields = { + "uuid": wfields.StringField(), + "human_id": wfields.StringField(default=""), + } diff --git a/watcher/decision_engine/model/element/instance.py b/watcher/decision_engine/model/element/instance.py index 7bf08e86f..7cc0bc343 100644 --- a/watcher/decision_engine/model/element/instance.py +++ b/watcher/decision_engine/model/element/instance.py @@ -17,6 +17,8 @@ import enum from watcher.decision_engine.model.element import compute_resource +from watcher.objects import base +from watcher.objects import fields as wfields class InstanceState(enum.Enum): @@ -36,19 +38,17 @@ class InstanceState(enum.Enum): ERROR = 'error' +@base.WatcherObjectRegistry.register_if(False) class Instance(compute_resource.ComputeResource): - def __init__(self): - super(Instance, self).__init__() - self._state = InstanceState.ACTIVE.value + fields = { + "state": wfields.StringField(default=InstanceState.ACTIVE.value), + + "memory": wfields.NonNegativeIntegerField(), + "disk": wfields.IntegerField(), + "disk_capacity": wfields.NonNegativeIntegerField(), + "vcpus": wfields.NonNegativeIntegerField(), + } def accept(self, visitor): raise NotImplementedError() - - @property - def state(self): - return self._state - - @state.setter - def state(self, state): - self._state = state diff --git a/watcher/decision_engine/model/element/node.py b/watcher/decision_engine/model/element/node.py index fb77202bf..0a8d6924a 100644 --- a/watcher/decision_engine/model/element/node.py +++ b/watcher/decision_engine/model/element/node.py @@ -17,6 +17,8 @@ import enum from watcher.decision_engine.model.element import compute_resource +from watcher.objects import base +from watcher.objects import fields as wfields class ServiceState(enum.Enum): @@ -26,29 +28,20 @@ class ServiceState(enum.Enum): DISABLED = 'disabled' +@base.WatcherObjectRegistry.register_if(False) class ComputeNode(compute_resource.ComputeResource): - def __init__(self, id): - super(ComputeNode, self).__init__() - self.id = id - self._state = ServiceState.ONLINE.value - self._status = ServiceState.ENABLED.value + fields = { + "id": wfields.NonNegativeIntegerField(), + "hostname": wfields.StringField(), + "status": wfields.StringField(default=ServiceState.ENABLED.value), + "state": wfields.StringField(default=ServiceState.ONLINE.value), + + "memory": wfields.NonNegativeIntegerField(), + "disk": wfields.IntegerField(), + "disk_capacity": wfields.NonNegativeIntegerField(), + "vcpus": wfields.NonNegativeIntegerField(), + } def accept(self, visitor): raise NotImplementedError() - - @property - def state(self): - return self._state - - @state.setter - def state(self, state): - self._state = state - - @property - def status(self): - return self._status - - @status.setter - def status(self, s): - self._status = s diff --git a/watcher/decision_engine/model/element/resource.py b/watcher/decision_engine/model/element/resource.py index 9b0ecdc0f..720213ae5 100644 --- a/watcher/decision_engine/model/element/resource.py +++ b/watcher/decision_engine/model/element/resource.py @@ -20,7 +20,8 @@ from watcher.common import exception class ResourceType(enum.Enum): - cpu_cores = 'num_cores' + cpu_cores = 'vcpus' + vcpus = 'vcpus' memory = 'memory' disk = 'disk' disk_capacity = 'disk_capacity' diff --git a/watcher/decision_engine/model/mapping.py b/watcher/decision_engine/model/mapping.py deleted file mode 100644 index f370e6868..000000000 --- a/watcher/decision_engine/model/mapping.py +++ /dev/null @@ -1,101 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from oslo_concurrency import lockutils -from oslo_log import log - -from watcher._i18n import _LW - -LOG = log.getLogger(__name__) - - -class Mapping(object): - def __init__(self, model): - self.model = model - self.compute_node_mapping = {} - self.instance_mapping = {} - - def map(self, node, instance): - """Select the node where the instance is launched - - :param node: the node - :param instance: the virtual machine or instance - """ - with lockutils.lock(__name__): - # init first - if node.uuid not in self.compute_node_mapping.keys(): - self.compute_node_mapping[node.uuid] = set() - - # map node => instances - self.compute_node_mapping[node.uuid].add(instance.uuid) - - # map instance => node - self.instance_mapping[instance.uuid] = node.uuid - - def unmap(self, node, instance): - """Remove the instance from the node - - :param node: the node - :param instance: the virtual machine or instance - """ - self.unmap_by_uuid(node.uuid, instance.uuid) - - def unmap_by_uuid(self, node_uuid, instance_uuid): - """Remove the instance (by id) from the node (by id) - - :rtype : object - """ - with lockutils.lock(__name__): - if str(node_uuid) in self.compute_node_mapping: - self.compute_node_mapping[str(node_uuid)].remove( - str(instance_uuid)) - # remove instance - self.instance_mapping.pop(instance_uuid) - else: - LOG.warning( - _LW("Trying to delete the instance %(instance)s but it " - "was not found on node %(node)s") % - {'instance': instance_uuid, 'node': node_uuid}) - - def get_mapping(self): - return self.compute_node_mapping - - def get_node_from_instance(self, instance): - return self.get_node_by_instance_uuid(instance.uuid) - - def get_node_by_instance_uuid(self, instance_uuid): - """Getting host information from the guest instance - - :param instance: the uuid of the instance - :return: node - """ - return self.model.get_node_by_uuid( - self.instance_mapping[str(instance_uuid)]) - - def get_node_instances(self, node): - """Get the list of instances running on the node - - :param node: - :return: - """ - return self.get_node_instances_by_uuid(node.uuid) - - def get_node_instances_by_uuid(self, node_uuid): - if str(node_uuid) in self.compute_node_mapping.keys(): - return self.compute_node_mapping[str(node_uuid)] - else: - # empty - return set() diff --git a/watcher/decision_engine/model/model_root.py b/watcher/decision_engine/model/model_root.py index 3798b91e1..6c1c1d6a4 100644 --- a/watcher/decision_engine/model/model_root.py +++ b/watcher/decision_engine/model/model_root.py @@ -1,39 +1,40 @@ # -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com +# Copyright (c) 2016 Intel Innovation and Research Ireland Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import collections +""" +Openstack implementation of the cluster graph. +""" from lxml import etree +import networkx as nx +from oslo_concurrency import lockutils +from oslo_log import log import six from watcher._i18n import _ from watcher.common import exception -from watcher.common import utils from watcher.decision_engine.model import base from watcher.decision_engine.model import element -from watcher.decision_engine.model import mapping + +LOG = log.getLogger(__name__) -class ModelRoot(base.Model): +class ModelRoot(nx.DiGraph, base.Model): + """Cluster graph for an Openstack cluster.""" def __init__(self, stale=False): - self._nodes = utils.Struct() - self._instances = utils.Struct() - self.mapping = mapping.Mapping(self) - self.resource = utils.Struct() + super(ModelRoot, self).__init__() self.stale = stale def __nonzero__(self): @@ -41,35 +42,47 @@ class ModelRoot(base.Model): __bool__ = __nonzero__ - def assert_node(self, obj): + @staticmethod + def assert_node(obj): if not isinstance(obj, element.ComputeNode): raise exception.IllegalArgumentException( - message=_("'obj' argument type is not valid")) + message=_("'obj' argument type is not valid: %s") % type(obj)) - def assert_instance(self, obj): + @staticmethod + def assert_instance(obj): if not isinstance(obj, element.Instance): raise exception.IllegalArgumentException( message=_("'obj' argument type is not valid")) + @lockutils.synchronized("model_root") def add_node(self, node): self.assert_node(node) - self._nodes[node.uuid] = node + super(ModelRoot, self).add_node(node) + @lockutils.synchronized("model_root") def remove_node(self, node): self.assert_node(node) - if str(node.uuid) not in self._nodes: + try: + super(ModelRoot, self).remove_node(node) + except nx.NetworkXError as exc: + LOG.exception(exc) raise exception.ComputeNodeNotFound(name=node.uuid) - else: - del self._nodes[node.uuid] + @lockutils.synchronized("model_root") def add_instance(self, instance): self.assert_instance(instance) - self._instances[instance.uuid] = instance + try: + super(ModelRoot, self).add_node(instance) + except nx.NetworkXError as exc: + LOG.exception(exc) + raise exception.InstanceNotFound(name=instance.uuid) + @lockutils.synchronized("model_root") def remove_instance(self, instance): self.assert_instance(instance) - del self._instances[instance.uuid] + super(ModelRoot, self).remove_node(instance) + @lockutils.synchronized("model_root") def map_instance(self, instance, node): """Map a newly created instance to a node @@ -82,38 +95,25 @@ class ModelRoot(base.Model): instance = self.get_instance_by_uuid(instance) if isinstance(node, six.string_types): node = self.get_node_by_uuid(node) + self.assert_node(node) + self.assert_instance(instance) - self.add_instance(instance) - self.mapping.map(node, instance) + self.add_edge(instance, node) + @lockutils.synchronized("model_root") def unmap_instance(self, instance, node): - """Unmap an instance from a node - - :param instance: :py:class:`~.Instance` object or instance UUID - :type instance: str or :py:class:`~.Instance` - :param node: :py:class:`~.ComputeNode` object or node UUID - :type node: str or :py:class:`~.Instance` - """ if isinstance(instance, six.string_types): instance = self.get_instance_by_uuid(instance) if isinstance(node, six.string_types): node = self.get_node_by_uuid(node) - self.add_instance(instance) - self.mapping.unmap(node, instance) + self.remove_edge(instance, node) def delete_instance(self, instance, node=None): - if node is not None: - self.mapping.unmap(node, instance) - + self.assert_instance(instance) self.remove_instance(instance) - for resource in self.resource.values(): - try: - resource.unset_capacity(instance) - except KeyError: - pass - + @lockutils.synchronized("model_root") def migrate_instance(self, instance, source_node, destination_node): """Migrate single instance from source_node to destination_node @@ -122,96 +122,96 @@ class ModelRoot(base.Model): :param destination_node: :return: """ + self.assert_instance(instance) + self.assert_node(source_node) + self.assert_node(destination_node) + if source_node == destination_node: return False + # unmap - self.mapping.unmap(source_node, instance) + self.remove_edge(instance, source_node) # map - self.mapping.map(destination_node, instance) + self.add_edge(instance, destination_node) return True + @lockutils.synchronized("model_root") def get_all_compute_nodes(self): - return self._nodes + return {cn.uuid: cn for cn in self.nodes() + if isinstance(cn, element.ComputeNode)} - def get_node_by_uuid(self, node_uuid): - if str(node_uuid) not in self._nodes: - raise exception.ComputeNodeNotFound(name=node_uuid) - return self._nodes[str(node_uuid)] + @lockutils.synchronized("model_root") + def get_node_by_uuid(self, uuid): + for graph_node in self.nodes(): + if (isinstance(graph_node, element.ComputeNode) and + graph_node.uuid == uuid): + return graph_node + raise exception.ComputeNodeNotFound(name=uuid) + @lockutils.synchronized("model_root") def get_instance_by_uuid(self, uuid): - if str(uuid) not in self._instances: - raise exception.InstanceNotFound(name=uuid) - return self._instances[str(uuid)] + return self._get_instance_by_uuid(uuid) + def _get_instance_by_uuid(self, uuid): + for graph_node in self.nodes(): + if (isinstance(graph_node, element.Instance) and + graph_node.uuid == str(uuid)): + return graph_node + raise exception.InstanceNotFound(name=uuid) + + @lockutils.synchronized("model_root") def get_node_by_instance_uuid(self, instance_uuid): - """Getting host information from the guest instance - - :param instance_uuid: the uuid of the instance - :return: node - """ - if str(instance_uuid) not in self.mapping.instance_mapping: - raise exception.InstanceNotFound(name=instance_uuid) - return self.get_node_by_uuid( - self.mapping.instance_mapping[str(instance_uuid)]) + instance = self._get_instance_by_uuid(instance_uuid) + for node in self.neighbors(instance): + if isinstance(node, element.ComputeNode): + return node + raise exception.ComputeNodeNotFound(name=instance_uuid) + @lockutils.synchronized("model_root") def get_all_instances(self): - return self._instances - - def get_mapping(self): - return self.mapping - - def create_resource(self, r): - self.resource[str(r.name)] = r + return {inst.uuid: inst for inst in self.nodes() + if isinstance(inst, element.Instance)} + @lockutils.synchronized("model_root") def get_resource_by_uuid(self, resource_id): - return self.resource[str(resource_id)] + # TODO(v-francoise): deprecate this method + # This is a trick to keep the compatibility with the old model root + class Resource(object): + def __init__(self, resource_id): + if isinstance(resource_id, element.ResourceType): + resource_id = resource_id.value + self.resource_id = resource_id + def get_capacity(self, element): + # We ignore element because value already contains the value + return getattr(element, self.resource_id) + + return Resource(resource_id) + + @lockutils.synchronized("model_root") def get_node_instances(self, node): - return self.mapping.get_node_instances(node) + self.assert_node(node) + node_instances = [] + for neighbor in self.predecessors(node): + if isinstance(neighbor, element.Instance): + node_instances.append(neighbor) - def _build_compute_node_element(self, compute_node): - attrib = collections.OrderedDict( - id=six.text_type(compute_node.id), uuid=compute_node.uuid, - human_id=compute_node.human_id, hostname=compute_node.hostname, - state=compute_node.state, status=compute_node.status) - - for resource_name, resource in sorted( - self.resource.items(), key=lambda x: x[0]): - res_value = resource.get_capacity(compute_node) - if res_value is not None: - attrib[resource_name] = six.text_type(res_value) - - compute_node_el = etree.Element("ComputeNode", attrib=attrib) - - return compute_node_el - - def _build_instance_element(self, instance): - attrib = collections.OrderedDict( - uuid=instance.uuid, human_id=instance.human_id, - hostname=instance.hostname, state=instance.state) - - for resource_name, resource in sorted( - self.resource.items(), key=lambda x: x[0]): - res_value = resource.get_capacity(instance) - if res_value is not None: - attrib[resource_name] = six.text_type(res_value) - - instance_el = etree.Element("Instance", attrib=attrib) - - return instance_el + return node_instances def to_string(self): + return self.to_xml() + + def to_xml(self): root = etree.Element("ModelRoot") # Build compute node tree for cn in sorted(self.get_all_compute_nodes().values(), key=lambda cn: cn.uuid): - compute_node_el = self._build_compute_node_element(cn) + compute_node_el = cn.as_xml_element() # Build mapped instance tree - node_instance_uuids = self.get_node_instances(cn) - for instance_uuid in sorted(node_instance_uuids): - instance = self.get_instance_by_uuid(instance_uuid) - instance_el = self._build_instance_element(instance) + node_instances = self.get_node_instances(cn) + for instance in sorted(node_instances, key=lambda x: x.uuid): + instance_el = instance.as_xml_element() compute_node_el.append(instance_el) root.append(compute_node_el) @@ -221,51 +221,23 @@ class ModelRoot(base.Model): key=lambda inst: inst.uuid): try: self.get_node_by_instance_uuid(instance.uuid) - except exception.InstanceNotFound: - root.append(self._build_instance_element(instance)) + except (exception.InstanceNotFound, exception.ComputeNodeNotFound): + root.append(instance.as_xml_element()) return etree.tostring(root, pretty_print=True).decode('utf-8') @classmethod def from_xml(cls, data): model = cls() + root = etree.fromstring(data) - - mem = element.Resource(element.ResourceType.memory) - num_cores = element.Resource(element.ResourceType.cpu_cores) - disk = element.Resource(element.ResourceType.disk) - disk_capacity = element.Resource(element.ResourceType.disk_capacity) - model.create_resource(mem) - model.create_resource(num_cores) - model.create_resource(disk) - model.create_resource(disk_capacity) - for cn in root.findall('.//ComputeNode'): - node = element.ComputeNode(cn.get('id')) - node.uuid = cn.get('uuid') - node.hostname = cn.get('hostname') - # set capacity - mem.set_capacity(node, int(cn.get(str(mem.name)))) - disk.set_capacity(node, int(cn.get(str(disk.name)))) - disk_capacity.set_capacity( - node, int(cn.get(str(disk_capacity.name)))) - num_cores.set_capacity(node, int(cn.get(str(num_cores.name)))) - node.state = cn.get('state') - node.status = cn.get('status') - + node = element.ComputeNode(**cn.attrib) model.add_node(node) for inst in root.findall('.//Instance'): - instance = element.Instance() - instance.uuid = inst.get('uuid') - instance.state = inst.get('state') - - mem.set_capacity(instance, int(inst.get(str(mem.name)))) - disk.set_capacity(instance, int(inst.get(str(disk.name)))) - disk_capacity.set_capacity( - instance, int(inst.get(str(disk_capacity.name)))) - num_cores.set_capacity( - instance, int(inst.get(str(num_cores.name)))) + instance = element.Instance(**inst.attrib) + model.add_instance(instance) parent = inst.getparent() if parent.tag == 'ComputeNode': diff --git a/watcher/decision_engine/model/notification/nova.py b/watcher/decision_engine/model/notification/nova.py index 3ec755e81..976e3fa4d 100644 --- a/watcher/decision_engine/model/notification/nova.py +++ b/watcher/decision_engine/model/notification/nova.py @@ -18,7 +18,7 @@ from oslo_log import log -from watcher._i18n import _LI +from watcher._i18n import _LI, _LW from watcher.common import exception from watcher.common import nova_helper from watcher.decision_engine.model import element @@ -40,14 +40,21 @@ class NovaNotification(base.NotificationEndpoint): self._nova = nova_helper.NovaHelper() return self._nova - def get_or_create_instance(self, uuid): + def get_or_create_instance(self, instance_uuid, node_uuid=None): try: - instance = self.cluster_data_model.get_instance_by_uuid(uuid) + if node_uuid: + self.get_or_create_node(node_uuid) + except exception.ComputeNodeNotFound: + LOG.warning(_LW("Could not find compute node %(node)s for " + "instance %(instance)s"), + dict(node=node_uuid, instance=instance_uuid)) + try: + instance = self.cluster_data_model.get_instance_by_uuid( + instance_uuid) except exception.InstanceNotFound: # The instance didn't exist yet so we create a new instance object - LOG.debug("New instance created: %s", uuid) - instance = element.Instance() - instance.uuid = uuid + LOG.debug("New instance created: %s", instance_uuid) + instance = element.Instance(uuid=instance_uuid) self.cluster_data_model.add_instance(instance) @@ -57,9 +64,11 @@ class NovaNotification(base.NotificationEndpoint): instance_data = data['nova_object.data'] instance_flavor_data = instance_data['flavor']['nova_object.data'] - instance.state = instance_data['state'] - instance.hostname = instance_data['host_name'] - instance.human_id = instance_data['display_name'] + instance.update({ + 'state': instance_data['state'], + 'hostname': instance_data['host_name'], + 'human_id': instance_data['display_name'], + }) memory_mb = instance_flavor_data['memory_mb'] num_cores = instance_flavor_data['vcpus'] @@ -67,7 +76,7 @@ class NovaNotification(base.NotificationEndpoint): self.update_capacity(element.ResourceType.memory, instance, memory_mb) self.update_capacity( - element.ResourceType.cpu_cores, instance, num_cores) + element.ResourceType.vcpus, instance, num_cores) self.update_capacity( element.ResourceType.disk, instance, disk_gb) self.update_capacity( @@ -83,13 +92,14 @@ class NovaNotification(base.NotificationEndpoint): self.update_instance_mapping(instance, node) def update_capacity(self, resource_id, obj, value): - resource = self.cluster_data_model.get_resource_by_uuid(resource_id) - resource.set_capacity(obj, value) + setattr(obj, resource_id.value, value) def legacy_update_instance(self, instance, data): - instance.state = data['state'] - instance.hostname = data['hostname'] - instance.human_id = data['display_name'] + instance.update({ + 'state': data['state'], + 'hostname': data['hostname'], + 'human_id': data['display_name'], + }) memory_mb = data['memory_mb'] num_cores = data['vcpus'] @@ -97,7 +107,7 @@ class NovaNotification(base.NotificationEndpoint): self.update_capacity(element.ResourceType.memory, instance, memory_mb) self.update_capacity( - element.ResourceType.cpu_cores, instance, num_cores) + element.ResourceType.vcpus, instance, num_cores) self.update_capacity( element.ResourceType.disk, instance, disk_gb) self.update_capacity( @@ -115,28 +125,34 @@ class NovaNotification(base.NotificationEndpoint): def update_compute_node(self, node, data): """Update the compute node using the notification data.""" node_data = data['nova_object.data'] - node.hostname = node_data['host'] - node.state = ( + node_state = ( element.ServiceState.OFFLINE.value if node_data['forced_down'] else element.ServiceState.ONLINE.value) - node.status = ( + node_status = ( element.ServiceState.DISABLED.value if node_data['disabled'] else element.ServiceState.ENABLED.value) + node.update({ + 'hostname': node_data['host'], + 'state': node_state, + 'status': node_status, + }) + def create_compute_node(self, node_hostname): """Update the compute node by querying the Nova API.""" try: _node = self.nova.get_compute_node_by_hostname(node_hostname) - node = element.ComputeNode(_node.id) - node.uuid = node_hostname - node.hostname = _node.hypervisor_hostname - node.state = _node.state - node.status = _node.status + node = element.ComputeNode( + id=_node.id, + uuid=node_hostname, + hostname=_node.hypervisor_hostname, + state=_node.state, + status=_node.status) self.update_capacity( element.ResourceType.memory, node, _node.memory_mb) self.update_capacity( - element.ResourceType.cpu_cores, node, _node.vcpus) + element.ResourceType.vcpus, node, _node.vcpus) self.update_capacity( element.ResourceType.disk, node, _node.free_disk_gb) self.update_capacity( @@ -170,18 +186,20 @@ class NovaNotification(base.NotificationEndpoint): return try: try: - old_node = self.get_or_create_node(node.uuid) + current_node = ( + self.cluster_data_model.get_node_by_instance_uuid( + instance.uuid) or self.get_or_create_node(node.uuid)) except exception.ComputeNodeNotFound as exc: LOG.exception(exc) # If we can't create the node, # we consider the instance as unmapped - old_node = None + current_node = None LOG.debug("Mapped node %s found", node.uuid) - if node and node != old_node: + if current_node and node != current_node: LOG.debug("Unmapping instance %s from %s", instance.uuid, node.uuid) - self.cluster_data_model.unmap_instance(instance, old_node) + self.cluster_data_model.unmap_instance(instance, current_node) except exception.InstanceNotFound: # The instance didn't exist yet so we map it for the first time LOG.debug("New instance: mapping it to %s", node.uuid) @@ -221,6 +239,7 @@ class ServiceUpdated(VersionnedNotificationEndpoint): dict(event=event_type, publisher=publisher_id, metadata=metadata)) + LOG.debug(payload) node_data = payload['nova_object.data'] node_uuid = node_data['host'] try: @@ -262,10 +281,12 @@ class InstanceCreated(VersionnedNotificationEndpoint): dict(event=event_type, publisher=publisher_id, metadata=metadata)) + LOG.debug(payload) instance_data = payload['nova_object.data'] instance_uuid = instance_data['uuid'] - instance = self.get_or_create_instance(instance_uuid) + node_uuid = instance_data.get('host') + instance = self.get_or_create_instance(instance_uuid, node_uuid) self.update_instance(instance, payload) @@ -294,9 +315,11 @@ class InstanceUpdated(VersionnedNotificationEndpoint): dict(event=event_type, publisher=publisher_id, metadata=metadata)) + LOG.debug(payload) instance_data = payload['nova_object.data'] instance_uuid = instance_data['uuid'] - instance = self.get_or_create_instance(instance_uuid) + node_uuid = instance_data.get('host') + instance = self.get_or_create_instance(instance_uuid, node_uuid) self.update_instance(instance, payload) @@ -317,10 +340,12 @@ class InstanceDeletedEnd(VersionnedNotificationEndpoint): dict(event=event_type, publisher=publisher_id, metadata=metadata)) + LOG.debug(payload) instance_data = payload['nova_object.data'] instance_uuid = instance_data['uuid'] - instance = self.get_or_create_instance(instance_uuid) + node_uuid = instance_data.get('host') + instance = self.get_or_create_instance(instance_uuid, node_uuid) try: node = self.get_or_create_node(instance_data['host']) @@ -348,9 +373,11 @@ class LegacyInstanceUpdated(UnversionnedNotificationEndpoint): dict(event=event_type, publisher=publisher_id, metadata=metadata)) + LOG.debug(payload) instance_uuid = payload['instance_id'] - instance = self.get_or_create_instance(instance_uuid) + node_uuid = payload.get('node') + instance = self.get_or_create_instance(instance_uuid, node_uuid) self.legacy_update_instance(instance, payload) @@ -371,9 +398,11 @@ class LegacyInstanceCreatedEnd(UnversionnedNotificationEndpoint): dict(event=event_type, publisher=publisher_id, metadata=metadata)) + LOG.debug(payload) instance_uuid = payload['instance_id'] - instance = self.get_or_create_instance(instance_uuid) + node_uuid = payload.get('node') + instance = self.get_or_create_instance(instance_uuid, node_uuid) self.legacy_update_instance(instance, payload) @@ -394,8 +423,10 @@ class LegacyInstanceDeletedEnd(UnversionnedNotificationEndpoint): dict(event=event_type, publisher=publisher_id, metadata=metadata)) + LOG.debug(payload) instance_uuid = payload['instance_id'] - instance = self.get_or_create_instance(instance_uuid) + node_uuid = payload.get('node') + instance = self.get_or_create_instance(instance_uuid, node_uuid) try: node = self.get_or_create_node(payload['host']) @@ -423,8 +454,10 @@ class LegacyLiveMigratedEnd(UnversionnedNotificationEndpoint): dict(event=event_type, publisher=publisher_id, metadata=metadata)) + LOG.debug(payload) instance_uuid = payload['instance_id'] - instance = self.get_or_create_instance(instance_uuid) + node_uuid = payload.get('node') + instance = self.get_or_create_instance(instance_uuid, node_uuid) self.legacy_update_instance(instance, payload) diff --git a/watcher/decision_engine/scope/default.py b/watcher/decision_engine/scope/default.py index f9c64fd56..4c0a5af30 100644 --- a/watcher/decision_engine/scope/default.py +++ b/watcher/decision_engine/scope/default.py @@ -13,9 +13,6 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. -# - -import copy from oslo_log import log @@ -101,9 +98,8 @@ class DefaultScope(base.BaseScope): self._osc = osc self.wrapper = nova_helper.NovaHelper(osc=self._osc) - def _remove_instance(self, cluster_model, instance_uuid, node_name): + def remove_instance(self, cluster_model, instance, node_name): node = cluster_model.get_node_by_uuid(node_name) - instance = cluster_model.get_instance_by_uuid(instance_uuid) cluster_model.delete_instance(instance, node) def _check_wildcard(self, aggregate_list): @@ -147,7 +143,7 @@ class DefaultScope(base.BaseScope): if zone.zoneName in zone_names or include_all_nodes: allowed_nodes.extend(zone.hosts.keys()) - def _exclude_resources(self, resources, **kwargs): + def exclude_resources(self, resources, **kwargs): instances_to_exclude = kwargs.get('instances') nodes_to_exclude = kwargs.get('nodes') for resource in resources: @@ -160,32 +156,32 @@ class DefaultScope(base.BaseScope): [host['name'] for host in resource['compute_nodes']]) - def _remove_node_from_model(self, nodes_to_remove, cluster_model): - for node_name in nodes_to_remove: - instances = copy.copy( - cluster_model.get_mapping().get_node_instances_by_uuid( - node_name)) - for instance_uuid in instances: - self._remove_instance(cluster_model, instance_uuid, node_name) - node = cluster_model.get_node_by_uuid(node_name) + def remove_nodes_from_model(self, nodes_to_remove, cluster_model): + for node_uuid in nodes_to_remove: + node = cluster_model.get_node_by_uuid(node_uuid) + instances = cluster_model.get_node_instances(node) + for instance in instances: + self.remove_instance(cluster_model, instance, node_uuid) cluster_model.remove_node(node) - def _remove_instances_from_model(self, instances_to_remove, cluster_model): + def remove_instances_from_model(self, instances_to_remove, cluster_model): for instance_uuid in instances_to_remove: try: - node_name = (cluster_model.get_mapping() - .get_node_by_instance_uuid(instance_uuid).uuid) - except KeyError: + node_name = cluster_model.get_node_by_instance_uuid( + instance_uuid).uuid + except exception.InstanceNotFound: LOG.warning(_LW("The following instance %s cannot be found. " "It might be deleted from CDM along with node" " instance was hosted on."), instance_uuid) continue - self._remove_instance(cluster_model, instance_uuid, node_name) + self.remove_instance( + cluster_model, + cluster_model.get_instance_by_uuid(instance_uuid), + node_name) def get_scoped_model(self, cluster_model): """Leave only nodes and instances proposed in the audit scope""" - if not cluster_model: return None @@ -205,7 +201,7 @@ class DefaultScope(base.BaseScope): self._collect_zones(rule['availability_zones'], allowed_nodes) elif 'exclude' in rule: - self._exclude_resources( + self.exclude_resources( rule['exclude'], instances=instances_to_exclude, nodes=nodes_to_exclude) @@ -213,7 +209,7 @@ class DefaultScope(base.BaseScope): nodes_to_remove = set(model_hosts) - set(allowed_nodes) nodes_to_remove.update(nodes_to_exclude) - self._remove_node_from_model(nodes_to_remove, cluster_model) - self._remove_instances_from_model(instances_to_remove, cluster_model) + self.remove_nodes_from_model(nodes_to_remove, cluster_model) + self.remove_instances_from_model(instances_to_remove, cluster_model) return cluster_model diff --git a/watcher/decision_engine/strategy/strategies/basic_consolidation.py b/watcher/decision_engine/strategy/strategies/basic_consolidation.py index d0a70771c..63287ae8f 100644 --- a/watcher/decision_engine/strategy/strategies/basic_consolidation.py +++ b/watcher/decision_engine/strategy/strategies/basic_consolidation.py @@ -174,15 +174,14 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): total_disk = 0 total_mem = 0 cpu_capacity = self.compute_model.get_resource_by_uuid( - element.ResourceType.cpu_cores) + element.ResourceType.vcpus) disk_capacity = self.compute_model.get_resource_by_uuid( element.ResourceType.disk) memory_capacity = self.compute_model.get_resource_by_uuid( element.ResourceType.memory) - for instance_id in self.compute_model.mapping.get_node_instances( + for instance in self.compute_model.get_node_instances( destination_node): - instance = self.compute_model.get_instance_by_uuid(instance_id) total_cores += cpu_capacity.get_capacity(instance) total_disk += disk_capacity.get_capacity(instance) total_mem += memory_capacity.get_capacity(instance) @@ -210,7 +209,7 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): :return: True if the threshold is not exceed """ cpu_capacity = self.compute_model.get_resource_by_uuid( - element.ResourceType.cpu_cores).get_capacity(destination_node) + element.ResourceType.vcpus).get_capacity(destination_node) disk_capacity = self.compute_model.get_resource_by_uuid( element.ResourceType.disk).get_capacity(destination_node) memory_capacity = self.compute_model.get_resource_by_uuid( @@ -231,7 +230,7 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): :return: """ cpu_capacity = self.compute_model.get_resource_by_uuid( - element.ResourceType.cpu_cores).get_capacity(compute_resource) + element.ResourceType.vcpus).get_capacity(compute_resource) disk_capacity = self.compute_model.get_resource_by_uuid( element.ResourceType.disk).get_capacity(compute_resource) @@ -333,7 +332,7 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): host_avg_cpu_util = 100 cpu_capacity = self.compute_model.get_resource_by_uuid( - element.ResourceType.cpu_cores).get_capacity(node) + element.ResourceType.vcpus).get_capacity(node) total_cores_used = cpu_capacity * (host_avg_cpu_util / 100.0) @@ -356,7 +355,7 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): instance_cpu_utilization = 100 cpu_capacity = self.compute_model.get_resource_by_uuid( - element.ResourceType.cpu_cores).get_capacity(instance) + element.ResourceType.vcpus).get_capacity(instance) total_cores_used = cpu_capacity * (instance_cpu_utilization / 100.0) @@ -387,28 +386,24 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): if node.status == element.ServiceState.ENABLED.value: self.number_of_enabled_nodes += 1 - count = self.compute_model.mapping.get_node_instances(node) - if len(count) > 0: + instances = self.compute_model.get_node_instances(node) + if len(instances) > 0: result = self.calculate_score_node(node) - else: - # The node has not VMs - result = 0 - if len(count) > 0: score.append((node.uuid, result)) + return score def node_and_instance_score(self, sorted_scores): """Get List of VMs from node""" node_to_release = sorted_scores[len(sorted_scores) - 1][0] - instances_to_migrate = self.compute_model.mapping.get_node_instances( + instances_to_migrate = self.compute_model.get_node_instances( self.compute_model.get_node_by_uuid(node_to_release)) instance_score = [] - for instance_id in instances_to_migrate: - instance = self.compute_model.get_instance_by_uuid(instance_id) + for instance in instances_to_migrate: if instance.state == element.InstanceState.ACTIVE.value: instance_score.append( - (instance_id, self.calculate_score_instance(instance))) + (instance, self.calculate_score_instance(instance))) return node_to_release, instance_score @@ -421,8 +416,7 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): mig_source_node.uuid, mig_destination_node.uuid) - if len(self.compute_model.mapping.get_node_instances( - mig_source_node)) == 0: + if len(self.compute_model.get_node_instances(mig_source_node)) == 0: self.add_change_service_state(mig_source_node. uuid, element.ServiceState.DISABLED.value) @@ -431,10 +425,8 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): def calculate_num_migrations(self, sorted_instances, node_to_release, sorted_score): number_migrations = 0 - for instance in sorted_instances: + for mig_instance, __ in sorted_instances: for j in range(0, len(sorted_score)): - mig_instance = self.compute_model.get_instance_by_uuid( - instance[0]) mig_source_node = self.compute_model.get_node_by_uuid( node_to_release) mig_destination_node = self.compute_model.get_node_by_uuid( diff --git a/watcher/decision_engine/strategy/strategies/outlet_temp_control.py b/watcher/decision_engine/strategy/strategies/outlet_temp_control.py index e31aa51da..ed1c17318 100644 --- a/watcher/decision_engine/strategy/strategies/outlet_temp_control.py +++ b/watcher/decision_engine/strategy/strategies/outlet_temp_control.py @@ -124,16 +124,14 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): def calc_used_res(self, node, cpu_capacity, memory_capacity, disk_capacity): """Calculate the used vcpus, memory and disk based on VM flavors""" - instances = self.compute_model.mapping.get_node_instances(node) + instances = self.compute_model.get_node_instances(node) vcpus_used = 0 memory_mb_used = 0 disk_gb_used = 0 - if len(instances) > 0: - for instance_id in instances: - instance = self.compute_model.get_instance_by_uuid(instance_id) - vcpus_used += cpu_capacity.get_capacity(instance) - memory_mb_used += memory_capacity.get_capacity(instance) - disk_gb_used += disk_capacity.get_capacity(instance) + for instance in instances: + vcpus_used += cpu_capacity.get_capacity(instance) + memory_mb_used += memory_capacity.get_capacity(instance) + disk_gb_used += disk_capacity.get_capacity(instance) return vcpus_used, memory_mb_used, disk_gb_used @@ -146,9 +144,7 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): hosts_need_release = [] hosts_target = [] - for node_id in nodes: - node = self.compute_model.get_node_by_uuid( - node_id) + for node in nodes.values(): resource_id = node.uuid outlet_temp = self.ceilometer.statistic_aggregation( @@ -174,30 +170,27 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): """Pick up an active instance to migrate from provided hosts""" for instance_data in hosts: mig_source_node = instance_data['node'] - instances_of_src = self.compute_model.mapping.get_node_instances( + instances_of_src = self.compute_model.get_node_instances( mig_source_node) - if len(instances_of_src) > 0: - for instance_id in instances_of_src: - try: - # select the first active instance to migrate - instance = self.compute_model.get_instance_by_uuid( - instance_id) - if (instance.state != - element.InstanceState.ACTIVE.value): - LOG.info(_LI("Instance not active, skipped: %s"), - instance.uuid) - continue - return mig_source_node, instance - except wexc.InstanceNotFound as e: - LOG.exception(e) - LOG.info(_LI("Instance not found")) + for instance in instances_of_src: + try: + # select the first active instance to migrate + if (instance.state != + element.InstanceState.ACTIVE.value): + LOG.info(_LI("Instance not active, skipped: %s"), + instance.uuid) + continue + return mig_source_node, instance + except wexc.InstanceNotFound as e: + LOG.exception(e) + LOG.info(_LI("Instance not found")) return None def filter_dest_servers(self, hosts, instance_to_migrate): """Only return hosts with sufficient available resources""" cpu_capacity = self.compute_model.get_resource_by_uuid( - element.ResourceType.cpu_cores) + element.ResourceType.vcpus) disk_capacity = self.compute_model.get_resource_by_uuid( element.ResourceType.disk) memory_capacity = self.compute_model.get_resource_by_uuid( diff --git a/watcher/decision_engine/strategy/strategies/uniform_airflow.py b/watcher/decision_engine/strategy/strategies/uniform_airflow.py index c501b15d1..ca98c09e7 100644 --- a/watcher/decision_engine/strategy/strategies/uniform_airflow.py +++ b/watcher/decision_engine/strategy/strategies/uniform_airflow.py @@ -45,7 +45,7 @@ airflow is higher than the specified threshold. from oslo_log import log -from watcher._i18n import _, _LE, _LI, _LW +from watcher._i18n import _, _LI, _LW from watcher.common import exception as wexc from watcher.datasource import ceilometer as ceil from watcher.decision_engine.model import element @@ -166,13 +166,11 @@ class UniformAirflow(base.BaseStrategy): def calculate_used_resource(self, node, cap_cores, cap_mem, cap_disk): """Compute the used vcpus, memory and disk based on instance flavors""" - instances = self.compute_model.mapping.get_node_instances(node) + instances = self.compute_model.get_node_instances(node) vcpus_used = 0 memory_mb_used = 0 disk_gb_used = 0 - for instance_id in instances: - instance = self.compute_model.get_instance_by_uuid( - instance_id) + for instance in instances: vcpus_used += cap_cores.get_capacity(instance) memory_mb_used += cap_mem.get_capacity(instance) disk_gb_used += cap_disk.get_capacity(instance) @@ -187,7 +185,7 @@ class UniformAirflow(base.BaseStrategy): instances_tobe_migrate = [] for nodemap in hosts: source_node = nodemap['node'] - source_instances = self.compute_model.mapping.get_node_instances( + source_instances = self.compute_model.get_node_instances( source_node) if source_instances: inlet_t = self.ceilometer.statistic_aggregation( @@ -203,32 +201,20 @@ class UniformAirflow(base.BaseStrategy): if (power < self.threshold_power and inlet_t < self.threshold_inlet_t): # hardware issue, migrate all instances from this node - for instance_id in source_instances: - try: - instance = (self.compute_model. - get_instance_by_uuid(instance_id)) - instances_tobe_migrate.append(instance) - except wexc.InstanceNotFound: - LOG.error(_LE("Instance not found; error: %s"), - instance_id) + for instance in source_instances: + instances_tobe_migrate.append(instance) return source_node, instances_tobe_migrate else: # migrate the first active instance - for instance_id in source_instances: - try: - instance = (self.compute_model. - get_instance_by_uuid(instance_id)) - if (instance.state != - element.InstanceState.ACTIVE.value): - LOG.info( - _LI("Instance not active, skipped: %s"), - instance.uuid) - continue - instances_tobe_migrate.append(instance) - return source_node, instances_tobe_migrate - except wexc.InstanceNotFound: - LOG.error(_LE("Instance not found; error: %s"), - instance_id) + for instance in source_instances: + if (instance.state != + element.InstanceState.ACTIVE.value): + LOG.info( + _LI("Instance not active, skipped: %s"), + instance.uuid) + continue + instances_tobe_migrate.append(instance) + return source_node, instances_tobe_migrate else: LOG.info(_LI("Instance not found on node: %s"), source_node.uuid) diff --git a/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py b/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py index 637d1e586..1f80478cc 100644 --- a/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py +++ b/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py @@ -139,18 +139,16 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): input_parameters=params) self.number_of_released_nodes += 1 - def add_migration(self, instance_uuid, source_node, + def add_migration(self, instance, source_node, destination_node, model): """Add an action for VM migration into the solution. - :param instance_uuid: instance uuid + :param instance: instance object :param source_node: node object :param destination_node: node object :param model: model_root object :return: None """ - instance = model.get_instance_by_uuid(instance_uuid) - instance_state_str = self.get_state_str(instance.state) if instance_state_str != element.InstanceState.ACTIVE.value: # Watcher curently only supports live VM migration and block live @@ -160,7 +158,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): LOG.error( _LE('Cannot live migrate: instance_uuid=%(instance_uuid)s, ' 'state=%(instance_state)s.') % dict( - instance_uuid=instance_uuid, + instance_uuid=instance.uuid, instance_state=instance_state_str)) return @@ -169,6 +167,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): destination_node_state_str = self.get_state_str(destination_node.state) if destination_node_state_str == element.ServiceState.DISABLED.value: self.add_action_enable_compute_node(destination_node) + if model.migrate_instance(instance, source_node, destination_node): params = {'migration_type': migration_type, 'source_node': source_node.uuid, @@ -185,7 +184,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): :return: None """ for node in model.get_all_compute_nodes().values(): - if (len(model.mapping.get_node_instances(node)) == 0 and + if (len(model.get_node_instances(node)) == 0 and node.status != element.ServiceState.DISABLED.value): self.add_action_disable_node(node) @@ -254,14 +253,13 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): :param aggr: string :return: dict(cpu(number of cores used), ram(MB used), disk(B used)) """ - node_instances = model.mapping.get_node_instances_by_uuid( - node.uuid) + node_instances = model.get_node_instances(node) node_ram_util = 0 node_disk_util = 0 node_cpu_util = 0 - for instance_uuid in node_instances: + for instance in node_instances: instance_util = self.get_instance_utilization( - instance_uuid, model, period, aggr) + instance.uuid, model, period, aggr) node_cpu_util += instance_util['cpu'] node_ram_util += instance_util['ram'] node_disk_util += instance_util['disk'] @@ -402,9 +400,9 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): self.number_of_migrations -= 1 src_node = model.get_node_by_uuid(src_uuid) dst_node = model.get_node_by_uuid(dst_uuid) - if model.migrate_instance(instance_uuid, dst_node, src_node): - self.add_migration( - instance_uuid, src_node, dst_node, model) + instance = model.get_instance_by_uuid(instance_uuid) + if model.migrate_instance(instance, dst_node, src_node): + self.add_migration(instance, src_node, dst_node, model) def offload_phase(self, model, cc): """Perform offloading phase. @@ -431,13 +429,13 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): for node in reversed(sorted_nodes): if self.is_overloaded(node, model, cc): for instance in sorted( - model.mapping.get_node_instances(node), + model.get_node_instances(node), key=lambda x: self.get_instance_utilization( - x, model)['cpu'] + x.uuid, model)['cpu'] ): for destination_node in reversed(sorted_nodes): if self.instance_fits( - instance, destination_node, model, cc): + instance.uuid, destination_node, model, cc): self.add_migration(instance, node, destination_node, model) break @@ -465,15 +463,16 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): asc = 0 for node in sorted_nodes: instances = sorted( - model.mapping.get_node_instances(node), - key=lambda x: self.get_instance_utilization(x, model)['cpu']) + model.get_node_instances(node), + key=lambda x: self.get_instance_utilization( + x.uuid, model)['cpu']) for instance in reversed(instances): dsc = len(sorted_nodes) - 1 for destination_node in reversed(sorted_nodes): if asc >= dsc: break if self.instance_fits( - instance, destination_node, model, cc): + instance.uuid, destination_node, model, cc): self.add_migration(instance, node, destination_node, model) break diff --git a/watcher/decision_engine/strategy/strategies/workload_balance.py b/watcher/decision_engine/strategy/strategies/workload_balance.py index bba6e7f29..d78ecbe89 100644 --- a/watcher/decision_engine/strategy/strategies/workload_balance.py +++ b/watcher/decision_engine/strategy/strategies/workload_balance.py @@ -148,12 +148,11 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): def calculate_used_resource(self, node, cap_cores, cap_mem, cap_disk): """Calculate the used vcpus, memory and disk based on VM flavors""" - instances = self.compute_model.mapping.get_node_instances(node) + instances = self.compute_model.get_node_instances(node) vcpus_used = 0 memory_mb_used = 0 disk_gb_used = 0 - for instance_id in instances: - instance = self.compute_model.get_instance_by_uuid(instance_id) + for instance in instances: vcpus_used += cap_cores.get_capacity(instance) memory_mb_used += cap_mem.get_capacity(instance) disk_gb_used += cap_disk.get_capacity(instance) @@ -169,27 +168,25 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): """ for instance_data in hosts: source_node = instance_data['node'] - source_instances = self.compute_model.mapping.get_node_instances( + source_instances = self.compute_model.get_node_instances( source_node) if source_instances: delta_workload = instance_data['workload'] - avg_workload min_delta = 1000000 instance_id = None - for inst_id in source_instances: + for instance in source_instances: try: # select the first active VM to migrate - instance = self.compute_model.get_instance_by_uuid( - inst_id) if (instance.state != element.InstanceState.ACTIVE.value): LOG.debug("Instance not active, skipped: %s", instance.uuid) continue current_delta = ( - delta_workload - workload_cache[inst_id]) + delta_workload - workload_cache[instance.uuid]) if 0 <= current_delta < min_delta: min_delta = current_delta - instance_id = inst_id + instance_id = instance.uuid except wexc.InstanceNotFound: LOG.error(_LE("Instance not found; error: %s"), instance_id) @@ -254,7 +251,7 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): raise wexc.ClusterEmpty() # get cpu cores capacity of nodes and instances cap_cores = self.compute_model.get_resource_by_uuid( - element.ResourceType.cpu_cores) + element.ResourceType.vcpus) overload_hosts = [] nonoverload_hosts = [] # total workload of cluster @@ -264,13 +261,12 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): for node_id in nodes: node = self.compute_model.get_node_by_uuid( node_id) - instances = self.compute_model.mapping.get_node_instances(node) + instances = self.compute_model.get_node_instances(node) node_workload = 0.0 - for instance_id in instances: - instance = self.compute_model.get_instance_by_uuid(instance_id) + for instance in instances: try: cpu_util = self.ceilometer.statistic_aggregation( - resource_id=instance_id, + resource_id=instance.uuid, meter_name=self._meter, period=self._period, aggregate='avg') @@ -279,12 +275,12 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): LOG.error(_LE("Can not get cpu_util from Ceilometer")) continue if cpu_util is None: - LOG.debug("Instance (%s): cpu_util is None", instance_id) + LOG.debug("Instance (%s): cpu_util is None", instance.uuid) continue instance_cores = cap_cores.get_capacity(instance) - workload_cache[instance_id] = cpu_util * instance_cores / 100 - node_workload += workload_cache[instance_id] - LOG.debug("VM (%s): cpu_util %f", instance_id, cpu_util) + workload_cache[instance.uuid] = cpu_util * instance_cores / 100 + node_workload += workload_cache[instance.uuid] + LOG.debug("VM (%s): cpu_util %f", instance.uuid, cpu_util) node_cores = cap_cores.get_capacity(node) hy_cpu_util = node_workload / node_cores * 100 diff --git a/watcher/decision_engine/strategy/strategies/workload_stabilization.py b/watcher/decision_engine/strategy/strategies/workload_stabilization.py index ed20e2def..318cfd8b1 100644 --- a/watcher/decision_engine/strategy/strategies/workload_stabilization.py +++ b/watcher/decision_engine/strategy/strategies/workload_stabilization.py @@ -343,29 +343,26 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): instance_host_map = [] nodes = list(self.get_available_nodes()) - for source_hp_id in nodes: + for src_node_id in nodes: + src_node = self.compute_model.get_node_by_uuid(src_node_id) c_nodes = copy.copy(nodes) - c_nodes.remove(source_hp_id) + c_nodes.remove(src_node_id) node_list = yield_nodes(c_nodes) - instances_id = self.compute_model.get_mapping(). \ - get_node_instances_by_uuid(source_hp_id) - for instance_id in instances_id: + for instance in self.compute_model.get_node_instances(src_node): min_sd_case = {'value': len(self.metrics)} - instance = self.compute_model.get_instance_by_uuid(instance_id) if instance.state not in [element.InstanceState.ACTIVE.value, element.InstanceState.PAUSED.value]: continue for dst_node_id in next(node_list): - sd_case = self.calculate_migration_case(hosts, instance_id, - source_hp_id, - dst_node_id) + sd_case = self.calculate_migration_case( + hosts, instance.uuid, src_node_id, dst_node_id) weighted_sd = self.calculate_weighted_sd(sd_case[:-1]) if weighted_sd < min_sd_case['value']: min_sd_case = { 'host': dst_node_id, 'value': weighted_sd, - 's_host': source_hp_id, 'instance': instance_id} + 's_host': src_node_id, 'instance': instance.uuid} instance_host_map.append(min_sd_case) return sorted(instance_host_map, key=lambda x: x['value']) diff --git a/watcher/objects/fields.py b/watcher/objects/fields.py index 8868b23f5..cdcfb07bf 100644 --- a/watcher/objects/fields.py +++ b/watcher/objects/fields.py @@ -24,10 +24,14 @@ BaseEnumField = fields.BaseEnumField BooleanField = fields.BooleanField DateTimeField = fields.DateTimeField Enum = fields.Enum +FloatField = fields.FloatField IntegerField = fields.IntegerField ListOfStringsField = fields.ListOfStringsField +NonNegativeFloatField = fields.NonNegativeFloatField +NonNegativeIntegerField = fields.NonNegativeIntegerField ObjectField = fields.ObjectField StringField = fields.StringField +UnspecifiedDefault = fields.UnspecifiedDefault UUIDField = fields.UUIDField diff --git a/watcher/tests/decision_engine/cluster/test_nova_cdmc.py b/watcher/tests/decision_engine/cluster/test_nova_cdmc.py index f236b5e29..c679d2403 100644 --- a/watcher/tests/decision_engine/cluster/test_nova_cdmc.py +++ b/watcher/tests/decision_engine/cluster/test_nova_cdmc.py @@ -19,6 +19,7 @@ import mock from watcher.common import nova_helper +from watcher.common import utils from watcher.decision_engine.model.collector import nova from watcher.tests import base from watcher.tests import conf_fixture @@ -33,9 +34,13 @@ class TestNovaClusterDataModelCollector(base.TestCase): @mock.patch('keystoneclient.v3.client.Client', mock.Mock()) @mock.patch.object(nova_helper, 'NovaHelper') def test_nova_cdmc_execute(self, m_nova_helper_cls): - m_nova_helper = mock.Mock() + m_nova_helper = mock.Mock(name="nova_helper") m_nova_helper_cls.return_value = m_nova_helper + m_nova_helper.get_service.return_value = mock.Mock( + host="test_hostname") + fake_compute_node = mock.Mock( + id=1337, service={'id': 123}, hypervisor_hostname='test_hostname', memory_mb=333, @@ -47,19 +52,16 @@ class TestNovaClusterDataModelCollector(base.TestCase): ) fake_instance = mock.Mock( id='ef500f7e-dac8-470f-960c-169486fce71b', - state=mock.Mock(**{'OS-EXT-STS:vm_state': 'VM_STATE'}), - flavor={'ram': 333, 'disk': 222, 'vcpus': 4}, + human_id='fake_instance', + flavor={'ram': 333, 'disk': 222, 'vcpus': 4, 'id': 1}, ) + setattr(fake_instance, 'OS-EXT-STS:vm_state', 'VM_STATE') m_nova_helper.get_compute_node_list.return_value = [fake_compute_node] - m_nova_helper.get_instances_by_node.return_value = [fake_instance] - m_nova_helper.nova.services.find.return_value = mock.Mock( - host='test_hostname') + # m_nova_helper.get_instances_by_node.return_value = [fake_instance] + m_nova_helper.get_instance_list.return_value = [fake_instance] - def m_get_flavor_instance(instance, cache): - instance.flavor = {'ram': 333, 'disk': 222, 'vcpus': 4} - return instance - - m_nova_helper.get_flavor_instance.side_effect = m_get_flavor_instance + m_nova_helper.get_flavor.return_value = utils.Struct(**{ + 'ram': 333, 'disk': 222, 'vcpus': 4}) m_config = mock.Mock() m_osc = mock.Mock() diff --git a/watcher/tests/decision_engine/model/data/scenario_1.xml b/watcher/tests/decision_engine/model/data/scenario_1.xml index e9039cc13..dcf08626d 100644 --- a/watcher/tests/decision_engine/model/data/scenario_1.xml +++ b/watcher/tests/decision_engine/model/data/scenario_1.xml @@ -1,47 +1,47 @@ - - - + + + - - + + - - - - + + + + - - + + - - + + - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/watcher/tests/decision_engine/model/data/scenario_1_with_metrics.xml b/watcher/tests/decision_engine/model/data/scenario_1_with_metrics.xml index 12bd6efeb..3edf6d17e 100644 --- a/watcher/tests/decision_engine/model/data/scenario_1_with_metrics.xml +++ b/watcher/tests/decision_engine/model/data/scenario_1_with_metrics.xml @@ -1,8 +1,8 @@ - - + + - - + + diff --git a/watcher/tests/decision_engine/model/data/scenario_2_with_metrics.xml b/watcher/tests/decision_engine/model/data/scenario_2_with_metrics.xml index b401f5aaa..6edba3e7d 100644 --- a/watcher/tests/decision_engine/model/data/scenario_2_with_metrics.xml +++ b/watcher/tests/decision_engine/model/data/scenario_2_with_metrics.xml @@ -1,13 +1,13 @@ - - - - - - - + + + + + + + - - - + + + diff --git a/watcher/tests/decision_engine/model/data/scenario_3_with_2_nodes.xml b/watcher/tests/decision_engine/model/data/scenario_3_with_2_nodes.xml index a9ad7d896..fee3a27eb 100644 --- a/watcher/tests/decision_engine/model/data/scenario_3_with_2_nodes.xml +++ b/watcher/tests/decision_engine/model/data/scenario_3_with_2_nodes.xml @@ -1,8 +1,8 @@ - - + + - - + + diff --git a/watcher/tests/decision_engine/model/data/scenario_3_with_metrics.xml b/watcher/tests/decision_engine/model/data/scenario_3_with_metrics.xml index 988940868..b8987ffd5 100644 --- a/watcher/tests/decision_engine/model/data/scenario_3_with_metrics.xml +++ b/watcher/tests/decision_engine/model/data/scenario_3_with_metrics.xml @@ -1,9 +1,9 @@ - - - - - + + + + + - + diff --git a/watcher/tests/decision_engine/model/data/scenario_4_with_1_node_no_instance.xml b/watcher/tests/decision_engine/model/data/scenario_4_with_1_node_no_instance.xml index 2bed67c9c..963beca6a 100644 --- a/watcher/tests/decision_engine/model/data/scenario_4_with_1_node_no_instance.xml +++ b/watcher/tests/decision_engine/model/data/scenario_4_with_1_node_no_instance.xml @@ -1,3 +1,3 @@ - + diff --git a/watcher/tests/decision_engine/model/data/scenario_5_with_instance_disk_0.xml b/watcher/tests/decision_engine/model/data/scenario_5_with_instance_disk_0.xml index d8027aa8f..72caedbbb 100644 --- a/watcher/tests/decision_engine/model/data/scenario_5_with_instance_disk_0.xml +++ b/watcher/tests/decision_engine/model/data/scenario_5_with_instance_disk_0.xml @@ -1,5 +1,5 @@ - - + + diff --git a/watcher/tests/decision_engine/model/data/scenario_6_with_2_nodes.xml b/watcher/tests/decision_engine/model/data/scenario_6_with_2_nodes.xml index d8b0d38a7..1ddcf7cab 100644 --- a/watcher/tests/decision_engine/model/data/scenario_6_with_2_nodes.xml +++ b/watcher/tests/decision_engine/model/data/scenario_6_with_2_nodes.xml @@ -1,10 +1,10 @@ - - - + + + - - - + + + diff --git a/watcher/tests/decision_engine/model/data/scenario_7_with_2_nodes.xml b/watcher/tests/decision_engine/model/data/scenario_7_with_2_nodes.xml index 630d61858..1da834336 100644 --- a/watcher/tests/decision_engine/model/data/scenario_7_with_2_nodes.xml +++ b/watcher/tests/decision_engine/model/data/scenario_7_with_2_nodes.xml @@ -1,10 +1,10 @@ - - - + + + - - - + + + diff --git a/watcher/tests/decision_engine/model/data/scenario_8_with_4_nodes.xml b/watcher/tests/decision_engine/model/data/scenario_8_with_4_nodes.xml index 9abaf9d4b..6e619c4a7 100644 --- a/watcher/tests/decision_engine/model/data/scenario_8_with_4_nodes.xml +++ b/watcher/tests/decision_engine/model/data/scenario_8_with_4_nodes.xml @@ -1,16 +1,16 @@ - - - - + + + + - - + + - - + + - - + + diff --git a/watcher/tests/decision_engine/model/data/scenario_9_with_3_active_plus_1_disabled_nodes.xml b/watcher/tests/decision_engine/model/data/scenario_9_with_3_active_plus_1_disabled_nodes.xml index 1c23eb07d..33396d824 100644 --- a/watcher/tests/decision_engine/model/data/scenario_9_with_3_active_plus_1_disabled_nodes.xml +++ b/watcher/tests/decision_engine/model/data/scenario_9_with_3_active_plus_1_disabled_nodes.xml @@ -1,16 +1,16 @@ - - - - + + + + - - + + - - + + - - + + diff --git a/watcher/tests/decision_engine/model/faker_cluster_and_metrics.py b/watcher/tests/decision_engine/model/faker_cluster_and_metrics.py index e16cba719..93cfd1272 100644 --- a/watcher/tests/decision_engine/model/faker_cluster_and_metrics.py +++ b/watcher/tests/decision_engine/model/faker_cluster_and_metrics.py @@ -102,11 +102,12 @@ class FakeCeilometerMetrics(object): Returns relative node CPU utilization <0, 100>. :param r_id: resource id """ - uuid = '%s_%s' % (r_id.split('_')[0], r_id.split('_')[1]) - instances = self.model.get_mapping().get_node_instances_by_uuid(uuid) + node_uuid = '%s_%s' % (r_id.split('_')[0], r_id.split('_')[1]) + node = self.model.get_node_by_uuid(node_uuid) + instances = self.model.get_node_instances(node) util_sum = 0.0 node_cpu_cores = self.model.get_resource_by_uuid( - element.ResourceType.cpu_cores).get_capacity_by_uuid(uuid) + element.ResourceType.cpu_cores).get_capacity_by_uuid(node.uuid) for instance_uuid in instances: instance_cpu_cores = self.model.get_resource_by_uuid( element.ResourceType.cpu_cores).\ diff --git a/watcher/tests/decision_engine/model/faker_cluster_state.py b/watcher/tests/decision_engine/model/faker_cluster_state.py index fb654c594..66dd5e1cd 100644 --- a/watcher/tests/decision_engine/model/faker_cluster_state.py +++ b/watcher/tests/decision_engine/model/faker_cluster_state.py @@ -54,7 +54,7 @@ class FakerModelCollector(base.BaseClusterDataModelCollector): def build_scenario_1(self): instances = [] - current_state_cluster = modelroot.ModelRoot() + model = modelroot.ModelRoot() # number of nodes node_count = 5 # number max of instance per node @@ -62,74 +62,52 @@ class FakerModelCollector(base.BaseClusterDataModelCollector): # total number of virtual machine instance_count = (node_count * node_instance_count) - # define ressouce ( CPU, MEM disk, ... ) - mem = element.Resource(element.ResourceType.memory) - # 2199.954 Mhz - num_cores = element.Resource(element.ResourceType.cpu_cores) - disk = element.Resource(element.ResourceType.disk) - disk_capacity = element.Resource(element.ResourceType.disk_capacity) - - current_state_cluster.create_resource(mem) - current_state_cluster.create_resource(num_cores) - current_state_cluster.create_resource(disk) - current_state_cluster.create_resource(disk_capacity) - for id_ in range(0, node_count): node_uuid = "Node_{0}".format(id_) - node = element.ComputeNode(id_) - node.uuid = node_uuid - node.hostname = "hostname_{0}".format(id_) - - mem.set_capacity(node, 132) - disk.set_capacity(node, 250) - disk_capacity.set_capacity(node, 250) - num_cores.set_capacity(node, 40) - current_state_cluster.add_node(node) + hostname = "hostname_{0}".format(id_) + node_attributes = { + "id": id_, + "uuid": node_uuid, + "hostname": hostname, + "memory": 132, + "disk": 250, + "disk_capacity": 250, + "vcpus": 40, + } + node = element.ComputeNode(**node_attributes) + model.add_node(node) for i in range(0, instance_count): instance_uuid = "INSTANCE_{0}".format(i) - instance = element.Instance() - instance.uuid = instance_uuid - mem.set_capacity(instance, 2) - disk.set_capacity(instance, 20) - disk_capacity.set_capacity(instance, 20) - num_cores.set_capacity(instance, 10) + instance_attributes = { + "uuid": instance_uuid, + "memory": 2, + "disk": 20, + "disk_capacity": 20, + "vcpus": 10, + } + + instance = element.Instance(**instance_attributes) instances.append(instance) - current_state_cluster.add_instance(instance) + model.add_instance(instance) - current_state_cluster.mapping.map( - current_state_cluster.get_node_by_uuid("Node_0"), - current_state_cluster.get_instance_by_uuid("INSTANCE_0")) + mappings = [ + ("INSTANCE_0", "Node_0"), + ("INSTANCE_1", "Node_0"), + ("INSTANCE_2", "Node_1"), + ("INSTANCE_3", "Node_2"), + ("INSTANCE_4", "Node_2"), + ("INSTANCE_5", "Node_2"), + ("INSTANCE_6", "Node_3"), + ("INSTANCE_7", "Node_4"), + ] + for instance_uuid, node_uuid in mappings: + model.map_instance( + model.get_instance_by_uuid(instance_uuid), + model.get_node_by_uuid(node_uuid), + ) - current_state_cluster.mapping.map( - current_state_cluster.get_node_by_uuid("Node_0"), - current_state_cluster.get_instance_by_uuid("INSTANCE_1")) - - current_state_cluster.mapping.map( - current_state_cluster.get_node_by_uuid("Node_1"), - current_state_cluster.get_instance_by_uuid("INSTANCE_2")) - - current_state_cluster.mapping.map( - current_state_cluster.get_node_by_uuid("Node_2"), - current_state_cluster.get_instance_by_uuid("INSTANCE_3")) - - current_state_cluster.mapping.map( - current_state_cluster.get_node_by_uuid("Node_2"), - current_state_cluster.get_instance_by_uuid("INSTANCE_4")) - - current_state_cluster.mapping.map( - current_state_cluster.get_node_by_uuid("Node_2"), - current_state_cluster.get_instance_by_uuid("INSTANCE_5")) - - current_state_cluster.mapping.map( - current_state_cluster.get_node_by_uuid("Node_3"), - current_state_cluster.get_instance_by_uuid("INSTANCE_6")) - - current_state_cluster.mapping.map( - current_state_cluster.get_node_by_uuid("Node_4"), - current_state_cluster.get_instance_by_uuid("INSTANCE_7")) - - return current_state_cluster + return model def generate_scenario_1(self): return self.load_model('scenario_1.xml') diff --git a/watcher/tests/decision_engine/model/notification/test_nova_notifications.py b/watcher/tests/decision_engine/model/notification/test_nova_notifications.py index f99c5dca7..b24edea2d 100644 --- a/watcher/tests/decision_engine/model/notification/test_nova_notifications.py +++ b/watcher/tests/decision_engine/model/notification/test_nova_notifications.py @@ -26,7 +26,6 @@ from watcher.common import exception from watcher.common import nova_helper from watcher.common import service as watcher_service from watcher.decision_engine.model import element -from watcher.decision_engine.model import model_root from watcher.decision_engine.model.notification import nova as novanotification from watcher.tests import base as base_test from watcher.tests.decision_engine.model import faker_cluster_state @@ -35,7 +34,8 @@ from watcher.tests.decision_engine.model.notification import fake_managers class NotificationTestCase(base_test.TestCase): - def load_message(self, filename): + @staticmethod + def load_message(filename): cwd = os.path.abspath(os.path.dirname(__file__)) data_folder = os.path.join(cwd, "data") @@ -188,6 +188,9 @@ class TestNovaNotifications(NotificationTestCase): side_effect=lambda uuid: mock.Mock( name='m_get_compute_node_by_hostname', id=3, + hypervisor_hostname="Node_2", + state='up', + status='enabled', uuid=uuid, memory_mb=7777, vcpus=42, @@ -215,7 +218,7 @@ class TestNovaNotifications(NotificationTestCase): instance0 = compute_model.get_instance_by_uuid(instance0_uuid) cpu_capacity = compute_model.get_resource_by_uuid( - element.ResourceType.cpu_cores) + element.ResourceType.vcpus) disk = compute_model.get_resource_by_uuid( element.ResourceType.disk) disk_capacity = compute_model.get_resource_by_uuid( @@ -239,7 +242,7 @@ class TestNovaNotifications(NotificationTestCase): def test_instance_update_node_notfound_set_unmapped( self, m_nova_helper_cls): m_get_compute_node_by_hostname = mock.Mock( - side_effect=exception.ComputeNodeNotFound) + side_effect=exception.ComputeNodeNotFound(name="TEST")) m_nova_helper_cls.return_value = mock.Mock( get_compute_node_by_hostname=m_get_compute_node_by_hostname, name='m_nova_helper') @@ -263,7 +266,7 @@ class TestNovaNotifications(NotificationTestCase): instance0 = compute_model.get_instance_by_uuid(instance0_uuid) cpu_capacity = compute_model.get_resource_by_uuid( - element.ResourceType.cpu_cores) + element.ResourceType.vcpus) disk = compute_model.get_resource_by_uuid( element.ResourceType.disk) disk_capacity = compute_model.get_resource_by_uuid( @@ -304,7 +307,7 @@ class TestNovaNotifications(NotificationTestCase): instance0 = compute_model.get_instance_by_uuid(instance0_uuid) cpu_capacity = compute_model.get_resource_by_uuid( - element.ResourceType.cpu_cores) + element.ResourceType.vcpus) disk_capacity = compute_model.get_resource_by_uuid( element.ResourceType.disk) memory_capacity = compute_model.get_resource_by_uuid( @@ -324,8 +327,6 @@ class TestNovaNotifications(NotificationTestCase): # Before self.assertTrue(compute_model.get_instance_by_uuid(instance0_uuid)) - for resource in compute_model.resource.values(): - self.assertIn(instance0_uuid, resource.mapping) message = self.load_message('scenario3_instance-delete-end.json') handler.info( @@ -341,9 +342,6 @@ class TestNovaNotifications(NotificationTestCase): exception.InstanceNotFound, compute_model.get_instance_by_uuid, instance0_uuid) - for resource in compute_model.resource.values(): - self.assertNotIn(instance0_uuid, resource.mapping) - class TestLegacyNovaNotifications(NotificationTestCase): @@ -377,7 +375,7 @@ class TestLegacyNovaNotifications(NotificationTestCase): instance0 = compute_model.get_instance_by_uuid(instance0_uuid) cpu_capacity = compute_model.get_resource_by_uuid( - element.ResourceType.cpu_cores) + element.ResourceType.vcpus) disk_capacity = compute_model.get_resource_by_uuid( element.ResourceType.disk) memory_capacity = compute_model.get_resource_by_uuid( @@ -410,31 +408,6 @@ class TestLegacyNovaNotifications(NotificationTestCase): self.assertEqual(element.InstanceState.PAUSED.value, instance0.state) - def test_legacy_instance_update_instance_notfound_creates(self): - compute_model = self.fake_cdmc.generate_scenario_3_with_2_nodes() - self.fake_cdmc.cluster_data_model = compute_model - handler = novanotification.LegacyInstanceUpdated(self.fake_cdmc) - - instance0_uuid = '73b09e16-35b7-4922-804e-e8f5d9b740fc' - - message = self.load_message('scenario3_legacy_instance-update.json') - - with mock.patch.object( - model_root.ModelRoot, 'get_instance_by_uuid' - ) as m_get_instance_by_uuid: - m_get_instance_by_uuid.side_effect = exception.InstanceNotFound( - name='TEST') - handler.info( - ctxt=self.context, - publisher_id=message['publisher_id'], - event_type=message['event_type'], - payload=message['payload'], - metadata=self.FAKE_METADATA, - ) - - instance0 = compute_model.get_instance_by_uuid(instance0_uuid) - self.assertEqual(element.InstanceState.PAUSED.value, instance0.state) - @mock.patch.object(nova_helper, "NovaHelper") def test_legacy_instance_update_node_notfound_still_creates( self, m_nova_helper_cls): @@ -443,6 +416,9 @@ class TestLegacyNovaNotifications(NotificationTestCase): name='m_get_compute_node_by_hostname', id=3, uuid=uuid, + hypervisor_hostname="Node_2", + state='up', + status='enabled', memory_mb=7777, vcpus=42, free_disk_gb=974, @@ -470,7 +446,7 @@ class TestLegacyNovaNotifications(NotificationTestCase): instance0 = compute_model.get_instance_by_uuid(instance0_uuid) cpu_capacity = compute_model.get_resource_by_uuid( - element.ResourceType.cpu_cores) + element.ResourceType.vcpus) disk = compute_model.get_resource_by_uuid( element.ResourceType.disk) disk_capacity = compute_model.get_resource_by_uuid( @@ -519,7 +495,7 @@ class TestLegacyNovaNotifications(NotificationTestCase): instance0 = compute_model.get_instance_by_uuid(instance0_uuid) cpu_capacity = compute_model.get_resource_by_uuid( - element.ResourceType.cpu_cores) + element.ResourceType.vcpus) disk = compute_model.get_resource_by_uuid( element.ResourceType.disk) disk_capacity = compute_model.get_resource_by_uuid( @@ -571,8 +547,6 @@ class TestLegacyNovaNotifications(NotificationTestCase): # Before self.assertTrue(compute_model.get_instance_by_uuid(instance0_uuid)) - for resource in compute_model.resource.values(): - self.assertIn(instance0_uuid, resource.mapping) message = self.load_message( 'scenario3_legacy_instance-delete-end.json') @@ -588,6 +562,3 @@ class TestLegacyNovaNotifications(NotificationTestCase): self.assertRaises( exception.InstanceNotFound, compute_model.get_instance_by_uuid, instance0_uuid) - - for resource in compute_model.resource.values(): - self.assertNotIn(instance0_uuid, resource.mapping) diff --git a/watcher/tests/decision_engine/model/test_disk_info.py b/watcher/tests/decision_engine/model/test_disk_info.py deleted file mode 100644 index 9b4fc3998..000000000 --- a/watcher/tests/decision_engine/model/test_disk_info.py +++ /dev/null @@ -1,33 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Authors: Jean-Emile DARTOIS -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from watcher.decision_engine.model import element -from watcher.tests import base - - -class TestDiskInfo(base.TestCase): - def test_all(self): - disk_information = element.DiskInfo() - disk_information.set_size(1024) - self.assertEqual(1024, disk_information.get_size()) - - disk_information.set_scheduler = "scheduler_qcq" - - disk_information.set_device_name("nom_qcq") - self.assertEqual("nom_qcq", disk_information.get_device_name()) diff --git a/watcher/tests/decision_engine/model/test_element.py b/watcher/tests/decision_engine/model/test_element.py new file mode 100644 index 000000000..771c73064 --- /dev/null +++ b/watcher/tests/decision_engine/model/test_element.py @@ -0,0 +1,72 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2017 b<>com +# +# Authors: Vincent FRANCOISE +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from watcher.decision_engine.model import element +from watcher.tests import base + + +class TestElement(base.TestCase): + + scenarios = [ + ("ComputeNode_with_all_fields", dict( + cls=element.Instance, + data={ + 'uuid': 'FAKE_UUID', + 'state': 'state', + 'hostname': 'hostname', + 'human_id': 'human_id', + 'memory': 111, + 'vcpus': 222, + 'disk': 333, + 'disk_capacity': 444, + })), + ("ComputeNode_with_some_fields", dict( + cls=element.Instance, + data={ + 'uuid': 'FAKE_UUID', + 'state': 'state', + 'vcpus': 222, + 'disk': 333, + 'disk_capacity': 444, + })), + ("Instance_with_all_fields", dict( + cls=element.Instance, + data={ + 'uuid': 'FAKE_UUID', + 'state': 'state', + 'hostname': 'hostname', + 'human_id': 'human_id', + 'memory': 111, + 'vcpus': 222, + 'disk': 333, + 'disk_capacity': 444, + })), + ("Instance_with_some_fields", dict( + cls=element.Instance, + data={ + 'uuid': 'FAKE_UUID', + 'state': 'state', + 'vcpus': 222, + 'disk': 333, + 'disk_capacity': 444, + })), + ] + + def test_as_xml_element(self): + el = self.cls(**self.data) + el.as_xml_element() diff --git a/watcher/tests/decision_engine/model/test_instance.py b/watcher/tests/decision_engine/model/test_instance.py deleted file mode 100644 index 6e42322c4..000000000 --- a/watcher/tests/decision_engine/model/test_instance.py +++ /dev/null @@ -1,30 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Authors: Jean-Emile DARTOIS -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -from watcher.decision_engine.model import element -from watcher.tests import base - - -class TestInstance(base.TestCase): - - def test_namedelement(self): - instance = element.Instance() - instance.state = element.InstanceState.ACTIVE - self.assertEqual(element.InstanceState.ACTIVE, instance.state) - instance.human_id = "human_05" - self.assertEqual("human_05", instance.human_id) diff --git a/watcher/tests/decision_engine/model/test_mapping.py b/watcher/tests/decision_engine/model/test_mapping.py deleted file mode 100644 index e1e4cdbde..000000000 --- a/watcher/tests/decision_engine/model/test_mapping.py +++ /dev/null @@ -1,113 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Authors: Jean-Emile DARTOIS -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -from oslo_utils import uuidutils - -from watcher.decision_engine.model import element -from watcher.tests import base -from watcher.tests.decision_engine.model import faker_cluster_state - - -class TestMapping(base.TestCase): - - INST1_UUID = "73b09e16-35b7-4922-804e-e8f5d9b740fc" - INST2_UUID = "a4cab39b-9828-413a-bf88-f76921bf1517" - - def setUp(self): - super(TestMapping, self).setUp() - self.fake_cluster = faker_cluster_state.FakerModelCollector() - - def test_get_node_from_instance(self): - model = self.fake_cluster.generate_scenario_3_with_2_nodes() - - instances = model.get_all_instances() - keys = list(instances.keys()) - instance = instances[keys[0]] - if instance.uuid != self.INST1_UUID: - instance = instances[keys[1]] - node = model.mapping.get_node_from_instance(instance) - self.assertEqual('Node_0', node.uuid) - - def test_get_node_by_instance_uuid(self): - model = self.fake_cluster.generate_scenario_3_with_2_nodes() - - nodes = model.mapping.get_node_instances_by_uuid("BLABLABLA") - self.assertEqual(0, len(nodes)) - - def test_get_all_instances(self): - model = self.fake_cluster.generate_scenario_3_with_2_nodes() - - instances = model.get_all_instances() - self.assertEqual(2, len(instances)) - self.assertEqual(element.InstanceState.ACTIVE.value, - instances[self.INST1_UUID].state) - self.assertEqual(self.INST1_UUID, instances[self.INST1_UUID].uuid) - self.assertEqual(element.InstanceState.ACTIVE.value, - instances[self.INST2_UUID].state) - self.assertEqual(self.INST2_UUID, instances[self.INST2_UUID].uuid) - - def test_get_mapping(self): - model = self.fake_cluster.generate_scenario_3_with_2_nodes() - instance_mapping = model.mapping.instance_mapping - self.assertEqual(2, len(instance_mapping)) - self.assertEqual('Node_0', instance_mapping[self.INST1_UUID]) - self.assertEqual('Node_1', instance_mapping[self.INST2_UUID]) - - def test_migrate_instance(self): - model = self.fake_cluster.generate_scenario_3_with_2_nodes() - instances = model.get_all_instances() - keys = list(instances.keys()) - instance0 = instances[keys[0]] - node0 = model.mapping.get_node_by_instance_uuid(instance0.uuid) - instance1 = instances[keys[1]] - node1 = model.mapping.get_node_by_instance_uuid(instance1.uuid) - - self.assertEqual( - False, - model.migrate_instance(instance1, node1, node1)) - self.assertEqual( - False, - model.migrate_instance(instance1, node0, node0)) - self.assertEqual( - True, - model.migrate_instance(instance1, node1, node0)) - self.assertEqual( - True, - model.migrate_instance(instance1, node0, node1)) - - def test_unmap_by_uuid_log_warning(self): - model = self.fake_cluster.generate_scenario_3_with_2_nodes() - instances = model.get_all_instances() - keys = list(instances.keys()) - instance0 = instances[keys[0]] - uuid_ = uuidutils.generate_uuid() - node = element.ComputeNode(id=1) - node.uuid = uuid_ - - model.mapping.unmap_by_uuid(node.uuid, instance0.uuid) - - def test_unmap_by_uuid(self): - model = self.fake_cluster.generate_scenario_3_with_2_nodes() - instances = model.get_all_instances() - keys = list(instances.keys()) - instance0 = instances[keys[0]] - node0 = model.mapping.get_node_by_instance_uuid(instance0.uuid) - - model.mapping.unmap_by_uuid(node0.uuid, instance0.uuid) - self.assertEqual(0, len(model.mapping.get_node_instances_by_uuid( - node0.uuid))) diff --git a/watcher/tests/decision_engine/model/test_model.py b/watcher/tests/decision_engine/model/test_model.py index b995812ab..65c364afe 100644 --- a/watcher/tests/decision_engine/model/test_model.py +++ b/watcher/tests/decision_engine/model/test_model.py @@ -47,9 +47,9 @@ class TestModel(base.TestCase): fake_cluster = faker_cluster_state.FakerModelCollector() model = fake_cluster.build_scenario_1() - self.assertEqual(5, len(model._nodes)) - self.assertEqual(35, len(model._instances)) - self.assertEqual(5, len(model.mapping.get_mapping())) + self.assertEqual(5, len(model.get_all_compute_nodes())) + self.assertEqual(35, len(model.get_all_instances())) + self.assertEqual(8, len(model.edges())) expected_struct_str = self.load_data('scenario_1.xml') parser = etree.XMLParser(remove_blank_text=True) diff --git a/watcher/tests/decision_engine/scope/test_default.py b/watcher/tests/decision_engine/scope/test_default.py index 2c31d187f..f66e84ea4 100644 --- a/watcher/tests/decision_engine/scope/test_default.py +++ b/watcher/tests/decision_engine/scope/test_default.py @@ -42,34 +42,37 @@ class TestDefaultScope(base.TestCase): for i in range(2)] model = default.DefaultScope(audit_scope, osc=mock.Mock()).get_scoped_model(cluster) - nodes = {'Node_4': set([]), 'Node_0': set([]), 'Node_3': set([]), - 'Node_1': set(['INSTANCE_2']), 'Node_2': set([])} - self.assertEqual(nodes, model.get_mapping().get_mapping()) + + expected_edges = [('INSTANCE_2', 'Node_1')] + edges = [(src.uuid, dst.uuid) for src, dst in model.edges()] + self.assertEqual(sorted(expected_edges), sorted(edges)) @mock.patch.object(nova_helper.NovaHelper, 'get_availability_zone_list') def test_get_scoped_model_without_scope(self, mock_zone_list): - cluster = self.fake_cluster.generate_scenario_1() + model = self.fake_cluster.generate_scenario_1() default.DefaultScope([], - osc=mock.Mock()).get_scoped_model(cluster) + osc=mock.Mock()).get_scoped_model(model) assert not mock_zone_list.called - def test__remove_instance(self): - cluster = self.fake_cluster.generate_scenario_1() - default.DefaultScope([], - osc=mock.Mock())._remove_instance(cluster, - 'INSTANCE_2', - 'Node_1') - expected_map = {'Node_4': set(['INSTANCE_7']), 'Node_1': set([]), - 'Node_0': set(['INSTANCE_0', 'INSTANCE_1']), - 'Node_3': set(['INSTANCE_6']), - 'Node_2': set(['INSTANCE_4', 'INSTANCE_5', - 'INSTANCE_3'])} - self.assertEqual(expected_map, cluster.get_mapping().get_mapping()) + def test_remove_instance(self): + model = self.fake_cluster.generate_scenario_1() + default.DefaultScope([], osc=mock.Mock()).remove_instance( + model, model.get_instance_by_uuid('INSTANCE_2'), 'Node_1') + expected_edges = [ + ('INSTANCE_0', 'Node_0'), + ('INSTANCE_1', 'Node_0'), + ('INSTANCE_3', 'Node_2'), + ('INSTANCE_4', 'Node_2'), + ('INSTANCE_5', 'Node_2'), + ('INSTANCE_6', 'Node_3'), + ('INSTANCE_7', 'Node_4'), + ] + edges = [(src.uuid, dst.uuid) for src, dst in model.edges()] + self.assertEqual(sorted(expected_edges), sorted(edges)) @mock.patch.object(nova_helper.NovaHelper, 'get_aggregate_detail') @mock.patch.object(nova_helper.NovaHelper, 'get_aggregate_list') - def test__collect_aggregates(self, mock_aggregate, - mock_detailed_aggregate): + def test_collect_aggregates(self, mock_aggregate, mock_detailed_aggregate): allowed_nodes = [] mock_aggregate.return_value = [mock.Mock(id=i) for i in range(2)] mock_detailed_aggregate.side_effect = [ @@ -126,12 +129,12 @@ class TestDefaultScope(base.TestCase): self.assertEqual(['Node_0', 'Node_1'], allowed_nodes) @mock.patch.object(nova_helper.NovaHelper, 'get_availability_zone_list') - def test__collect_zones(self, mock_zone_list): + def test_collect_zones(self, mock_zone_list): allowed_nodes = [] mock_zone_list.return_value = [ - mock.Mock(zoneName="AZ{0}".format(i+1), - hosts={'Node_{0}'.format(2*i): 1, - 'Node_{0}'.format(2*i+1): 2}) + mock.Mock(zoneName="AZ{0}".format(i + 1), + hosts={'Node_{0}'.format(2 * i): 1, + 'Node_{0}'.format(2 * i + 1): 2}) for i in range(2)] default.DefaultScope([{'availability_zones': [{'name': "AZ1"}]}], osc=mock.Mock())._collect_zones( @@ -142,9 +145,9 @@ class TestDefaultScope(base.TestCase): def test_zones_wildcard_is_used(self, mock_zone_list): allowed_nodes = [] mock_zone_list.return_value = [ - mock.Mock(zoneName="AZ{0}".format(i+1), - hosts={'Node_{0}'.format(2*i): 1, - 'Node_{0}'.format(2*i+1): 2}) + mock.Mock(zoneName="AZ{0}".format(i + 1), + hosts={'Node_{0}'.format(2 * i): 1, + 'Node_{0}'.format(2 * i + 1): 2}) for i in range(2)] default.DefaultScope([{'availability_zones': [{'name': "*"}]}], osc=mock.Mock())._collect_zones( @@ -156,9 +159,9 @@ class TestDefaultScope(base.TestCase): def test_zones_wildcard_with_other_ids(self, mock_zone_list): allowed_nodes = [] mock_zone_list.return_value = [ - mock.Mock(zoneName="AZ{0}".format(i+1), - hosts={'Node_{0}'.format(2*i): 1, - 'Node_{0}'.format(2*i+1): 2}) + mock.Mock(zoneName="AZ{0}".format(i + 1), + hosts={'Node_{0}'.format(2 * i): 1, + 'Node_{0}'.format(2 * i + 1): 2}) for i in range(2)] scope_handler = default.DefaultScope( [{'availability_zones': [{'name': "*"}, {'name': 'AZ1'}]}], @@ -173,38 +176,42 @@ class TestDefaultScope(base.TestCase): validators.Draft4Validator( default.DefaultScope.DEFAULT_SCHEMA).validate(test_scope) - def test__exclude_resources(self): + def test_exclude_resources(self): resources_to_exclude = [{'instances': [{'uuid': 'INSTANCE_1'}, {'uuid': 'INSTANCE_2'}]}, {'compute_nodes': [{'name': 'Node_1'}, - {'name': 'Node_2'}]} - ] + {'name': 'Node_2'}]}] instances_to_exclude = [] nodes_to_exclude = [] - default.DefaultScope([], osc=mock.Mock())._exclude_resources( + default.DefaultScope([], osc=mock.Mock()).exclude_resources( resources_to_exclude, instances=instances_to_exclude, nodes=nodes_to_exclude) self.assertEqual(['Node_1', 'Node_2'], sorted(nodes_to_exclude)) self.assertEqual(['INSTANCE_1', 'INSTANCE_2'], sorted(instances_to_exclude)) - def test__remove_node_from_model(self): - cluster = self.fake_cluster.generate_scenario_1() - default.DefaultScope([], osc=mock.Mock())._remove_node_from_model( - ['Node_1', 'Node_2'], cluster) - expected_cluster = {'Node_0': set(['INSTANCE_0', 'INSTANCE_1']), - 'Node_1': set([]), 'Node_2': set([]), - 'Node_3': set(['INSTANCE_6']), - 'Node_4': set(['INSTANCE_7'])} - self.assertEqual(expected_cluster, cluster.get_mapping().get_mapping()) + def test_remove_nodes_from_model(self): + model = self.fake_cluster.generate_scenario_1() + default.DefaultScope([], osc=mock.Mock()).remove_nodes_from_model( + ['Node_1', 'Node_2'], model) + expected_edges = [ + ('INSTANCE_0', 'Node_0'), + ('INSTANCE_1', 'Node_0'), + ('INSTANCE_6', 'Node_3'), + ('INSTANCE_7', 'Node_4')] + edges = [(src.uuid, dst.uuid) for src, dst in model.edges()] + self.assertEqual(sorted(expected_edges), sorted(edges)) - def test__remove_instances_from_model(self): - cluster = self.fake_cluster.generate_scenario_1() - default.DefaultScope([], osc=mock.Mock())._remove_instances_from_model( - ['INSTANCE_1', 'INSTANCE_2'], cluster) - expected_cluster = {'Node_0': set(['INSTANCE_0']), 'Node_1': set([]), - 'Node_2': set(['INSTANCE_3', 'INSTANCE_4', - 'INSTANCE_5']), - 'Node_3': set(['INSTANCE_6']), - 'Node_4': set(['INSTANCE_7'])} - self.assertEqual(expected_cluster, cluster.get_mapping().get_mapping()) + def test_remove_instances_from_model(self): + model = self.fake_cluster.generate_scenario_1() + default.DefaultScope([], osc=mock.Mock()).remove_instances_from_model( + ['INSTANCE_1', 'INSTANCE_2'], model) + expected_edges = [ + ('INSTANCE_0', 'Node_0'), + ('INSTANCE_3', 'Node_2'), + ('INSTANCE_4', 'Node_2'), + ('INSTANCE_5', 'Node_2'), + ('INSTANCE_6', 'Node_3'), + ('INSTANCE_7', 'Node_4')] + edges = [(src.uuid, dst.uuid) for src, dst in model.edges()] + self.assertEqual(sorted(expected_edges), sorted(edges)) diff --git a/watcher/tests/decision_engine/strategy/strategies/test_uniform_airflow.py b/watcher/tests/decision_engine/strategy/strategies/test_uniform_airflow.py index 67e1e9627..66651cad9 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_uniform_airflow.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_uniform_airflow.py @@ -130,7 +130,7 @@ class TestUniformAirflow(base.TestCase): self.strategy.threshold_inlet_t = 22 n1, n2 = self.strategy.group_hosts_by_airflow() instances = model.get_all_instances() - instances.clear() + [model.remove_instance(inst) for inst in instances.values()] instance_to_mig = self.strategy.choose_instance_to_migrate(n1) self.assertIsNone(instance_to_mig) diff --git a/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py b/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py index 9cf329abf..96ec0f9e2 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py @@ -127,7 +127,8 @@ class TestVMWorkloadConsolidation(base.TestCase): n1 = model.get_node_by_uuid('Node_0') n2 = model.get_node_by_uuid('Node_1') instance_uuid = 'INSTANCE_0' - self.strategy.add_migration(instance_uuid, n1, n2, model) + instance = model.get_instance_by_uuid(instance_uuid) + self.strategy.add_migration(instance, n1, n2, model) self.assertEqual(1, len(self.strategy.solution.actions)) expected = {'action_type': 'migrate', 'input_parameters': {'destination_node': n2.uuid, @@ -196,11 +197,12 @@ class TestVMWorkloadConsolidation(base.TestCase): n1 = model.get_node_by_uuid('Node_0') n2 = model.get_node_by_uuid('Node_1') instance_uuid = 'INSTANCE_0' + instance = model.get_instance_by_uuid(instance_uuid) self.strategy.disable_unused_nodes(model) self.assertEqual(0, len(self.strategy.solution.actions)) # Migrate VM to free the node - self.strategy.add_migration(instance_uuid, n1, n2, model) + self.strategy.add_migration(instance, n1, n2, model) self.strategy.disable_unused_nodes(model) expected = {'action_type': 'change_nova_service_state', diff --git a/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py b/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py index bc2d5aa11..529ece291 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py @@ -74,7 +74,7 @@ class TestWorkloadBalance(base.TestCase): model = self.fake_cluster.generate_scenario_6_with_2_nodes() self.m_model.return_value = model node = model.get_node_by_uuid('Node_0') - cap_cores = model.get_resource_by_uuid(element.ResourceType.cpu_cores) + cap_cores = model.get_resource_by_uuid(element.ResourceType.vcpus) cap_mem = model.get_resource_by_uuid(element.ResourceType.memory) cap_disk = model.get_resource_by_uuid(element.ResourceType.disk) cores_used, mem_used, disk_used = ( @@ -107,7 +107,7 @@ class TestWorkloadBalance(base.TestCase): self.m_model.return_value = model n1, n2, avg, w_map = self.strategy.group_hosts_by_cpu_util() instances = model.get_all_instances() - instances.clear() + [model.remove_instance(inst) for inst in instances.values()] instance_to_mig = self.strategy.choose_instance_to_migrate( n1, avg, w_map) self.assertIsNone(instance_to_mig)