Add Scoring Module implementation

This change is adding the main logic for the scoring module,
defines entry points for the scoring engine plugins and provides
a watcher-sync tool to enable Watcher database synchronization
without needing to restart any Watcher service.

Partially-Implements: blueprint scoring-module
Change-Id: If10daae969ec27a7008af5173359992e957dcd5e
This commit is contained in:
Tomasz Kaczynski 2016-08-03 08:11:00 +00:00
parent ab10201c72
commit a1cb142009
18 changed files with 1073 additions and 2 deletions

View File

@ -39,6 +39,7 @@ console_scripts =
watcher-db-manage = watcher.cmd.dbmanage:main
watcher-decision-engine = watcher.cmd.decisionengine:main
watcher-applier = watcher.cmd.applier:main
watcher-sync = watcher.cmd.sync:main
tempest.test_plugins =
watcher_tests = watcher_tempest_plugin.plugin:WatcherTempestPlugin
@ -54,8 +55,15 @@ watcher_goals =
workload_balancing = watcher.decision_engine.goal.goals:WorkloadBalancing
airflow_optimization = watcher.decision_engine.goal.goals:AirflowOptimization
watcher_scoring_engines =
dummy_scorer = watcher.decision_engine.scoring.dummy_scorer:DummyScorer
watcher_scoring_engine_containers =
dummy_scoring_container = watcher.decision_engine.scoring.dummy_scoring_container:DummyScoringContainer
watcher_strategies =
dummy = watcher.decision_engine.strategy.strategies.dummy_strategy:DummyStrategy
dummy_with_scorer = watcher.decision_engine.strategy.strategies.dummy_with_scorer:DummyWithScorer
basic = watcher.decision_engine.strategy.strategies.basic_consolidation:BasicConsolidation
outlet_temperature = watcher.decision_engine.strategy.strategies.outlet_temp_control:OutletTempControl
vm_workload_consolidation = watcher.decision_engine.strategy.strategies.vm_workload_consolidation:VMWorkloadConsolidation

39
watcher/cmd/sync.py Normal file
View File

@ -0,0 +1,39 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2016 Intel
#
# Authors: Tomasz Kaczynski <tomasz.kaczynski@intel.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Script for the sync tool."""
import sys
from oslo_log import log as logging
from watcher._i18n import _LI
from watcher.common import service as service
from watcher.decision_engine import sync
LOG = logging.getLogger(__name__)
def main():
LOG.info(_LI('Watcher sync started.'))
service.prepare_service(sys.argv)
syncer = sync.Syncer()
syncer.sync()
LOG.info(_LI('Watcher sync finished.'))

View File

@ -3,6 +3,7 @@
#
# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
# Vincent FRANCOISE <vincent.francoise@b-com.com>
# Tomasz Kaczynski <tomasz.kaczynski@intel.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -46,3 +47,15 @@ class ClusterDataModelCollectorLoader(default.DefaultLoader):
def __init__(self):
super(ClusterDataModelCollectorLoader, self).__init__(
namespace='watcher_cluster_data_model_collectors')
class DefaultScoringLoader(default.DefaultLoader):
def __init__(self):
super(DefaultScoringLoader, self).__init__(
namespace='watcher_scoring_engines')
class DefaultScoringContainerLoader(default.DefaultLoader):
def __init__(self):
super(DefaultScoringContainerLoader, self).__init__(
namespace='watcher_scoring_engine_containers')

View File

@ -0,0 +1,169 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 Intel
#
# Authors: Tomasz Kaczynski <tomasz.kaczynski@intel.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from oslo_log import log
from oslo_serialization import jsonutils
from oslo_utils import units
from watcher._i18n import _
from watcher.decision_engine.scoring import scoring_engine
LOG = log.getLogger(__name__)
class DummyScorer(scoring_engine.ScoringEngine):
"""Sample Scoring Engine implementing simplified workload classification.
Typically a scoring engine would be implemented using machine learning
techniques. For example, for workload classification problem the solution
could consist of the following steps:
1. Define a problem to solve: we want to detect the workload on the
machine based on the collected metrics like power consumption,
temperature, CPU load, memory usage, disk usage, network usage, etc.
2. The workloads could be predefined, e.g. IDLE, CPU-INTENSIVE,
MEMORY-INTENSIVE, IO-BOUND, ...
Or we could let the ML algorithm to find the workloads based on the
learning data provided. The decision here leads to learning algorithm
used (supervised vs. non-supervised learning).
3. Collect metrics from sample servers (learning data).
4. Define the analytical model, pick ML framework and algorithm.
5. Apply learning data to the data model. Once taught, the data model
becomes a scoring engine and can start doing predictions or
classifications.
6. Wrap up the scoring engine with the class like this one, so it has a
standard interface and can be used inside Watcher.
This class is a greatly very simplified version of the above model. The
goal is to provide an example how such class could be implemented and used
in Watcher, without adding additional dependencies like machine learning
frameworks (which can be quite heavy) or over-complicating it's internal
implementation, which can distract from looking at the overall picture.
That said, this class implements a workload classification "manually"
(in plain python code) and is not intended to be used in production.
"""
# Constants defining column indices for the input data
PROCESSOR_TIME_PERC = 0
MEM_TOTAL_BYTES = 1
MEM_AVAIL_BYTES = 2
MEM_PAGE_READS_PER_SEC = 3
MEM_PAGE_WRITES_PER_SEC = 4
DISK_READ_BYTES_PER_SEC = 5
DISK_WRITE_BYTES_PER_SEC = 6
NET_BYTES_RECEIVED_PER_SEC = 7
NET_BYTES_SENT_PER_SEC = 8
# Types of workload
WORKLOAD_IDLE = 0
WORKLOAD_CPU = 1
WORKLOAD_MEM = 2
WORKLOAD_DISK = 3
def get_name(self):
return 'dummy_scorer'
def get_description(self):
return 'Dummy workload classifier'
def get_metainfo(self):
"""Metadata about input/output format of this scoring engine.
This information is used in strategy using this scoring engine to
prepare the input information and to understand the results.
"""
return """{
"feature_columns": [
"proc-processor-time-%",
"mem-total-bytes",
"mem-avail-bytes",
"mem-page-reads/sec",
"mem-page-writes/sec",
"disk-read-bytes/sec",
"disk-write-bytes/sec",
"net-bytes-received/sec",
"net-bytes-sent/sec"],
"result_columns": [
"workload",
"idle-probability",
"cpu-probability",
"memory-probability",
"disk-probability"],
"workloads": [
"idle",
"cpu-intensive",
"memory-intensive",
"disk-intensive"]
}"""
def calculate_score(self, features):
"""Arbitrary algorithm calculating the score.
It demonstrates how to parse the input data (features) and serialize
the results. It detects the workload type based on the metrics and
also returns the probabilities of each workload detection (again,
the arbitrary values are returned, just for demonstration how the
"real" machine learning algorithm could work. For example, the
Gradient Boosting Machine from H2O framework is using exactly the
same format:
http://www.h2o.ai/verticals/algos/gbm/
"""
LOG.debug('Calculating score, features: %s', features)
# By default IDLE workload will be returned
workload = self.WORKLOAD_IDLE
idle_prob = 0.0
cpu_prob = 0.0
mem_prob = 0.0
disk_prob = 0.0
# Basic input validation
try:
flist = jsonutils.loads(features)
except Exception as e:
raise ValueError(_('Unable to parse features: ') % e)
if type(flist) is not list:
raise ValueError(_('JSON list expected in feature argument'))
if len(flist) != 9:
raise ValueError(_('Invalid number of features, expected 9'))
# Simple logic for workload classification
if flist[self.PROCESSOR_TIME_PERC] >= 80:
workload = self.WORKLOAD_CPU
cpu_prob = 100.0
elif flist[self.MEM_PAGE_READS_PER_SEC] >= 1000 \
and flist[self.MEM_PAGE_WRITES_PER_SEC] >= 1000:
workload = self.WORKLOAD_MEM
mem_prob = 100.0
elif flist[self.DISK_READ_BYTES_PER_SEC] >= 50*units.Mi \
and flist[self.DISK_WRITE_BYTES_PER_SEC] >= 50*units.Mi:
workload = self.WORKLOAD_DISK
disk_prob = 100.0
else:
idle_prob = 100.0
if flist[self.PROCESSOR_TIME_PERC] >= 40:
cpu_prob = 50.0
if flist[self.MEM_PAGE_READS_PER_SEC] >= 500 \
or flist[self.MEM_PAGE_WRITES_PER_SEC] >= 500:
mem_prob = 50.0
return jsonutils.dumps(
[workload, idle_prob, cpu_prob, mem_prob, disk_prob])

View File

@ -0,0 +1,99 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 Intel
#
# Authors: Tomasz Kaczynski <tomasz.kaczynski@intel.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from oslo_log import log
from oslo_serialization import jsonutils
from watcher._i18n import _
from watcher.decision_engine.scoring import scoring_container
from watcher.decision_engine.scoring import scoring_engine
LOG = log.getLogger(__name__)
class DummyScoringContainer(scoring_container.ScoringEngineContainer):
"""Sample Scoring Engine container returning a list of scoring engines.
Please note that it can be used in dynamic scenarios and the returned list
might return instances based on some external configuration (e.g. in
database). In order for these scoring engines to become discoverable in
Watcher API and Watcher CLI, a database re-sync is required. It can be
executed using watcher-sync tool for example.
"""
@classmethod
def get_scoring_engine_list(self):
return [
SimpleFunctionScorer(
'dummy_min_scorer',
'Dummy Scorer calculating the minimum value',
min),
SimpleFunctionScorer(
'dummy_max_scorer',
'Dummy Scorer calculating the maximum value',
max),
SimpleFunctionScorer(
'dummy_avg_scorer',
'Dummy Scorer calculating the average value',
lambda x: float(sum(x)) / len(x)),
]
class SimpleFunctionScorer(scoring_engine.ScoringEngine):
"""A simple generic scoring engine for demonstration purposes only.
A generic scoring engine implementation, which is expecting a JSON
formatted array of numbers to be passed as an input for score calculation.
It then executes the aggregate function on this array and returns an
array with a single aggregated number (also JSON formatted).
"""
def __init__(self, name, description, aggregate_function):
super(SimpleFunctionScorer, self).__init__(config=None)
self._name = name
self._description = description
self._aggregate_function = aggregate_function
def get_name(self):
return self._name
def get_description(self):
return self._description
def get_metainfo(self):
return ''
def calculate_score(self, features):
LOG.debug('Calculating score, features: %s', features)
# Basic input validation
try:
flist = jsonutils.loads(features)
except Exception as e:
raise ValueError(_('Unable to parse features: %s') % e)
if type(flist) is not list:
raise ValueError(_('JSON list expected in feature argument'))
if len(flist) < 1:
raise ValueError(_('At least one feature is required'))
# Calculate the result
result = self._aggregate_function(flist)
# Return the aggregated result
return jsonutils.dumps([result])

View File

@ -0,0 +1,51 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 Intel
#
# Authors: Tomasz Kaczynski <tomasz.kaczynski@intel.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import six
from watcher.common.loader import loadable
@six.add_metaclass(abc.ABCMeta)
class ScoringEngineContainer(loadable.Loadable):
"""A base class for all the Scoring Engines Containers.
A Scoring Engine Container is an abstraction which allows to plugin
multiple Scoring Engines as a single Stevedore plugin. This enables some
more advanced scenarios like dynamic reloading of Scoring Engine
implementations without having to restart any Watcher services.
"""
@classmethod
@abc.abstractmethod
def get_scoring_engine_list(self):
"""Returns a list of Scoring Engine instances.
:return: A list of Scoring Engine instances
:rtype: :class: `~.scoring_engine.ScoringEngine`
"""
@classmethod
def get_config_opts(cls):
"""Defines the configuration options to be associated to this loadable
:return: A list of configuration options relative to this Loadable
:rtype: list of :class:`oslo_config.cfg.Opt` instances
"""
return []

View File

@ -0,0 +1,97 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 Intel
#
# Authors: Tomasz Kaczynski <tomasz.kaczynski@intel.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import six
from watcher.common.loader import loadable
@six.add_metaclass(abc.ABCMeta)
class ScoringEngine(loadable.Loadable):
"""A base class for all the Scoring Engines.
A Scoring Engine is an instance of a data model, to which the learning
data was applied.
Please note that this class contains non-static and non-class methods by
design, so that it's easy to create multiple Scoring Engine instances
using a single class (possibly configured differently).
"""
@abc.abstractmethod
def get_name(self):
"""Returns the name of the Scoring Engine.
The name should be unique across all Scoring Engines.
:return: A Scoring Engine name
:rtype: str
"""
@abc.abstractmethod
def get_description(self):
"""Returns the description of the Scoring Engine.
The description might contain any human readable information, which
might be useful for Strategy developers planning to use this Scoring
Engine. It will be also visible in the Watcher API and CLI.
:return: A Scoring Engine description
:rtype: str
"""
@abc.abstractmethod
def get_metainfo(self):
"""Returns the metadata information about Scoring Engine.
The metadata might contain a machine-friendly (e.g. in JSON format)
information needed to use this Scoring Engine. For example, some
Scoring Engines require to pass the array of features in particular
order to be able to calculate the score value. This order can be
defined in metadata and used in Strategy.
:return: A Scoring Engine metadata
:rtype: str
"""
@abc.abstractmethod
def calculate_score(self, features):
"""Calculates a score value based on arguments passed.
Scoring Engines might be very different to each other. They might
solve different problems or use different algorithms or frameworks
internally. To enable this kind of flexibility, the method takes only
one argument (string) and produces the results in the same format
(string). The consumer of the Scoring Engine is ultimately responsible
for providing the right arguments and parsing the result.
:param features: Input data for Scoring Engine
:type features: str
:return: A score result
:rtype: str
"""
@classmethod
def get_config_opts(cls):
"""Defines the configuration options to be associated to this loadable
:return: A list of configuration options relative to this Loadable
:rtype: list of :class:`oslo_config.cfg.Opt` instances
"""
return []

View File

@ -0,0 +1,106 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 Intel
#
# Authors: Tomasz Kaczynski <tomasz.kaczynski@intel.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
A module providing helper methods to work with Scoring Engines.
"""
from oslo_log import log
from watcher._i18n import _
from watcher.decision_engine.loading import default
LOG = log.getLogger(__name__)
_scoring_engine_map = None
def get_scoring_engine(scoring_engine_name):
"""Returns a Scoring Engine by its name.
Method retrieves a Scoring Engine instance by its name. Scoring Engine
instances are being cached in memory to avoid enumerating the Stevedore
plugins on each call.
When called for the first time, it reloads the cache.
:return: A Scoring Engine instance with a given name
:rtype: :class:
`watcher.decision_engine.scoring.scoring_engine.ScoringEngine`
"""
global _scoring_engine_map
_reload_scoring_engines()
scoring_engine = _scoring_engine_map.get(scoring_engine_name)
if scoring_engine is None:
raise KeyError(_('Scoring Engine with name=%s not found')
% scoring_engine_name)
return scoring_engine
def get_scoring_engine_list():
"""Returns a list of Scoring Engine instances.
The main use case for this method is discoverability, so the Scoring
Engine list is always reloaded before returning any results.
Frequent calling of this method might have a negative performance impact.
:return: A list of all available Scoring Engine instances
:rtype: List of :class:
`watcher.decision_engine.scoring.scoring_engine.ScoringEngine`
"""
global _scoring_engine_map
_reload_scoring_engines(True)
return _scoring_engine_map.values()
def _reload_scoring_engines(refresh=False):
"""Reloads Scoring Engines from Stevedore plugins to memory.
Please note that two Stevedore entry points are used:
- watcher_scoring_engines: for simple plugin implementations
- watcher_scoring_engine_containers: for container plugins, which enable
the dynamic scenarios (its get_scoring_engine_list method might return
different values on each call)
"""
global _scoring_engine_map
if _scoring_engine_map is None or refresh:
LOG.debug("Reloading Scoring Engine plugins")
engines = default.DefaultScoringLoader().list_available()
_scoring_engine_map = dict()
for name in engines.keys():
se_impl = default.DefaultScoringLoader().load(name)
LOG.debug("Found Scoring Engine plugin: %s" % se_impl.get_name())
_scoring_engine_map[se_impl.get_name()] = se_impl
engine_containers = \
default.DefaultScoringContainerLoader().list_available()
for container_id, container_cls in engine_containers.items():
LOG.debug("Found Scoring Engine container plugin: %s" %
container_id)
for se in container_cls.get_scoring_engine_list():
LOG.debug("Found Scoring Engine plugin: %s" %
se.get_name())
_scoring_engine_map[se.get_name()] = se

View File

@ -16,6 +16,7 @@
from watcher.decision_engine.strategy.strategies import basic_consolidation
from watcher.decision_engine.strategy.strategies import dummy_strategy
from watcher.decision_engine.strategy.strategies import dummy_with_scorer
from watcher.decision_engine.strategy.strategies import outlet_temp_control
from watcher.decision_engine.strategy.strategies import uniform_airflow
from watcher.decision_engine.strategy.strategies import \
@ -26,11 +27,12 @@ from watcher.decision_engine.strategy.strategies import workload_stabilization
BasicConsolidation = basic_consolidation.BasicConsolidation
OutletTempControl = outlet_temp_control.OutletTempControl
DummyStrategy = dummy_strategy.DummyStrategy
DummyWithScorer = dummy_with_scorer.DummyWithScorer
VMWorkloadConsolidation = vm_workload_consolidation.VMWorkloadConsolidation
WorkloadBalance = workload_balance.WorkloadBalance
WorkloadStabilization = workload_stabilization.WorkloadStabilization
UniformAirflow = uniform_airflow.UniformAirflow
__all__ = ("BasicConsolidation", "OutletTempControl", "DummyStrategy",
"VMWorkloadConsolidation", "WorkloadBalance",
"DummyWithScorer", "VMWorkloadConsolidation", "WorkloadBalance",
"WorkloadStabilization", "UniformAirflow")

View File

@ -0,0 +1,166 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 Intel
#
# Authors: Tomasz Kaczynski <tomasz.kaczynski@intel.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import random
from oslo_log import log
from oslo_serialization import jsonutils
from oslo_utils import units
from watcher._i18n import _
from watcher.decision_engine.scoring import scoring_factory
from watcher.decision_engine.strategy.strategies import base
LOG = log.getLogger(__name__)
class DummyWithScorer(base.DummyBaseStrategy):
"""A dummy strategy using dummy scoring engines.
This is a dummy strategy demonstrating how to work with scoring
engines. One scoring engine is predicting the workload type of a machine
based on the telemetry data, the other one is simply calculating the
average value for given elements in a list. Results are then passed to the
NOP action.
The strategy is presenting the whole workflow:
- Get a reference to a scoring engine
- Prepare input data (features) for score calculation
- Perform score calculation
- Use scorer's metadata for results interpretation
"""
DEFAULT_NAME = "dummy_with_scorer"
DEFAULT_DESCRIPTION = "Dummy Strategy with Scorer"
NOP = "nop"
SLEEP = "sleep"
def __init__(self, config, osc=None):
"""Constructor: the signature should be identical within the subclasses
:param config: Configuration related to this plugin
:type config: :py:class:`~.Struct`
:param osc: An OpenStackClients instance
:type osc: :py:class:`~.OpenStackClients` instance
"""
super(DummyWithScorer, self).__init__(config, osc)
# Setup Scoring Engines
self._workload_scorer = (scoring_factory
.get_scoring_engine('dummy_scorer'))
self._avg_scorer = (scoring_factory
.get_scoring_engine('dummy_avg_scorer'))
# Get metainfo from Workload Scorer for result intepretation
metainfo = jsonutils.loads(self._workload_scorer.get_metainfo())
self._workloads = {index: workload
for index, workload in enumerate(
metainfo['workloads'])}
def pre_execute(self):
pass
def do_execute(self):
# Simple "hello world" from strategy
param1 = self.input_parameters.param1
param2 = self.input_parameters.param2
LOG.debug('DummyWithScorer params: param1=%(p1)f, param2=%(p2)s',
{'p1': param1, 'p2': param2})
parameters = {'message': 'Hello from Dummy Strategy with Scorer!'}
self.solution.add_action(action_type=self.NOP,
input_parameters=parameters)
# Demonstrate workload scorer
features = self._generate_random_telemetry()
result_str = self._workload_scorer.calculate_score(features)
LOG.debug('Workload Scorer result: %s', result_str)
# Parse the result using workloads from scorer's metainfo
result = self._workloads[jsonutils.loads(result_str)[0]]
LOG.debug('Detected Workload: %s', result)
parameters = {'message': 'Detected Workload: %s' % result}
self.solution.add_action(action_type=self.NOP,
input_parameters=parameters)
# Demonstrate AVG scorer
features = jsonutils.dumps(random.sample(range(1000), 20))
result_str = self._avg_scorer.calculate_score(features)
LOG.debug('AVG Scorer result: %s', result_str)
result = jsonutils.loads(result_str)[0]
LOG.debug('AVG Scorer result (parsed): %d', result)
parameters = {'message': 'AVG Scorer result: %s' % result}
self.solution.add_action(action_type=self.NOP,
input_parameters=parameters)
# Sleep action
self.solution.add_action(action_type=self.SLEEP,
input_parameters={'duration': 5.0})
def post_execute(self):
pass
@classmethod
def get_name(cls):
return 'dummy_with_scorer'
@classmethod
def get_display_name(cls):
return _('Dummy Strategy using sample Scoring Engines')
@classmethod
def get_translatable_display_name(cls):
return 'Dummy Strategy using sample Scoring Engines'
@classmethod
def get_schema(cls):
# Mandatory default setting for each element
return {
'properties': {
'param1': {
'description': 'number parameter example',
'type': 'number',
'default': 3.2,
'minimum': 1.0,
'maximum': 10.2,
},
'param2': {
'description': 'string parameter example',
'type': "string",
'default': "hello"
},
},
}
def _generate_random_telemetry(self):
processor_time = random.randint(0, 100)
mem_total_bytes = 4*units.Gi
mem_avail_bytes = random.randint(1*units.Gi, 4*units.Gi)
mem_page_reads = random.randint(0, 2000)
mem_page_writes = random.randint(0, 2000)
disk_read_bytes = random.randint(0*units.Mi, 200*units.Mi)
disk_write_bytes = random.randint(0*units.Mi, 200*units.Mi)
net_bytes_received = random.randint(0*units.Mi, 20*units.Mi)
net_bytes_sent = random.randint(0*units.Mi, 10*units.Mi)
return jsonutils.dumps([
processor_time, mem_total_bytes, mem_avail_bytes,
mem_page_reads, mem_page_writes, disk_read_bytes,
disk_write_bytes, net_bytes_received, net_bytes_sent])

View File

@ -21,6 +21,7 @@ from oslo_log import log
from watcher._i18n import _LI, _LW
from watcher.common import context
from watcher.decision_engine.loading import default
from watcher.decision_engine.scoring import scoring_factory
from watcher import objects
from watcher.objects import action_plan as apobjects
from watcher.objects import audit as auditobjects
@ -32,6 +33,9 @@ GoalMapping = collections.namedtuple(
StrategyMapping = collections.namedtuple(
'StrategyMapping',
['name', 'goal_name', 'display_name', 'parameters_spec'])
ScoringEngineMapping = collections.namedtuple(
'ScoringEngineMapping',
['name', 'description', 'metainfo'])
IndicatorSpec = collections.namedtuple(
'IndicatorSpec', ['name', 'description', 'unit', 'schema'])
@ -50,10 +54,15 @@ class Syncer(object):
self._available_strategies = None
self._available_strategies_map = None
self._available_scoringengines = None
self._available_scoringengines_map = None
# This goal mapping maps stale goal IDs to the synced goal
self.goal_mapping = dict()
# This strategy mapping maps stale strategy IDs to the synced goal
self.strategy_mapping = dict()
# Maps stale scoring engine IDs to the synced scoring engines
self.se_mapping = dict()
self.stale_audit_templates_map = {}
self.stale_audits_map = {}
@ -73,6 +82,14 @@ class Syncer(object):
self._available_strategies = objects.Strategy.list(self.ctx)
return self._available_strategies
@property
def available_scoringengines(self):
"""Scoring Engines loaded from DB"""
if self._available_scoringengines is None:
self._available_scoringengines = (objects.ScoringEngine
.list(self.ctx))
return self._available_scoringengines
@property
def available_goals_map(self):
"""Mapping of goals loaded from DB"""
@ -101,10 +118,22 @@ class Syncer(object):
}
return self._available_strategies_map
@property
def available_scoringengines_map(self):
if self._available_scoringengines_map is None:
self._available_scoringengines_map = {
ScoringEngineMapping(
name=s.id, description=s.description,
metainfo=s.metainfo): s
for s in self.available_scoringengines
}
return self._available_scoringengines_map
def sync(self):
self.discovered_map = self._discover()
goals_map = self.discovered_map["goals"]
strategies_map = self.discovered_map["strategies"]
scoringengines_map = self.discovered_map["scoringengines"]
for goal_name, goal_map in goals_map.items():
if goal_map in self.available_goals_map:
@ -122,7 +151,16 @@ class Syncer(object):
self.strategy_mapping.update(self._sync_strategy(strategy_map))
for se_name, se_map in scoringengines_map.items():
if se_map in self.available_scoringengines_map:
LOG.info(_LI("Scoring Engine %s already exists"),
se_name)
continue
self.se_mapping.update(self._sync_scoringengine(se_map))
self._sync_objects()
self._soft_delete_removed_scoringengines()
def _sync_goal(self, goal_map):
goal_name = goal_map.name
@ -181,6 +219,32 @@ class Syncer(object):
return strategy_mapping
def _sync_scoringengine(self, scoringengine_map):
scoringengine_name = scoringengine_map.name
se_mapping = dict()
# Scoring Engines matching by id with discovered Scoring engine
matching_scoringengines = [se for se in self.available_scoringengines
if se.name == scoringengine_name]
stale_scoringengines = self._soft_delete_stale_scoringengines(
scoringengine_map, matching_scoringengines)
if stale_scoringengines or not matching_scoringengines:
scoringengine = objects.ScoringEngine(self.ctx)
scoringengine.name = scoringengine_name
scoringengine.description = scoringengine_map.description
scoringengine.metainfo = scoringengine_map.metainfo
scoringengine.create()
LOG.info(_LI("Scoring Engine %s created"), scoringengine_name)
# Updating the internal states
self.available_scoringengines_map[scoringengine] = \
scoringengine_map
# Map the old scoring engine names to the new (equivalent) SE
for matching_scoringengine in matching_scoringengines:
se_mapping[matching_scoringengine.name] = scoringengine
return se_mapping
def _sync_objects(self):
# First we find audit templates, audits and action plans that are stale
# because their associated goal or strategy has been modified and we
@ -393,10 +457,22 @@ class Syncer(object):
self.stale_action_plans_map[
action_plan.id].state = apobjects.State.CANCELLED
def _soft_delete_removed_scoringengines(self):
removed_se = [
se for se in self.available_scoringengines
if se.name not in self.discovered_map['scoringengines']]
for se in removed_se:
LOG.info(_LI("Scoring Engine %s removed"), se.name)
se.soft_delete()
def _discover(self):
strategies_map = {}
goals_map = {}
discovered_map = {"goals": goals_map, "strategies": strategies_map}
scoringengines_map = {}
discovered_map = {
"goals": goals_map,
"strategies": strategies_map,
"scoringengines": scoringengines_map}
goal_loader = default.DefaultGoalLoader()
implemented_goals = goal_loader.list_available()
@ -419,6 +495,12 @@ class Syncer(object):
display_name=strategy_cls.get_translatable_display_name(),
parameters_spec=str(strategy_cls.get_schema()))
for se in scoring_factory.get_scoring_engine_list():
scoringengines_map[se.get_name()] = ScoringEngineMapping(
name=se.get_name(),
description=se.get_description(),
metainfo=se.get_metainfo())
return discovered_map
def _soft_delete_stale_goals(self, goal_map, matching_goals):
@ -462,3 +544,21 @@ class Syncer(object):
stale_strategies.append(matching_strategy)
return stale_strategies
def _soft_delete_stale_scoringengines(
self, scoringengine_map, matching_scoringengines):
se_name = scoringengine_map.name
se_description = scoringengine_map.description
se_metainfo = scoringengine_map.metainfo
stale_scoringengines = []
for matching_scoringengine in matching_scoringengines:
if (matching_scoringengine.description == se_description and
matching_scoringengine.metainfo == se_metainfo):
LOG.info(_LI("Scoring Engine %s unchanged"), se_name)
else:
LOG.info(_LI("Scoring Engine %s modified"), se_name)
matching_scoringengine.soft_delete()
stale_scoringengines.append(matching_scoringengine)
return stale_scoringengines

View File

@ -31,6 +31,8 @@ from watcher.decision_engine.planner import manager as planner_manager
PLUGIN_LOADERS = (
applier_loader.DefaultActionLoader,
decision_engine_loader.DefaultPlannerLoader,
decision_engine_loader.DefaultScoringLoader,
decision_engine_loader.DefaultScoringContainerLoader,
decision_engine_loader.DefaultStrategyLoader,
decision_engine_loader.ClusterDataModelCollectorLoader,
applier_loader.DefaultWorkFlowEngineLoader,

View File

@ -0,0 +1,54 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 Intel
#
# Authors: Tomasz Kaczynski <tomasz.kaczynski@intel.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_serialization import jsonutils
from watcher.decision_engine.scoring import dummy_scorer
from watcher.tests import base
class TestDummyScorer(base.TestCase):
def setUp(self):
super(TestDummyScorer, self).setUp()
def test_metadata(self):
scorer = dummy_scorer.DummyScorer(config=None)
self.assertEqual('dummy_scorer', scorer.get_name())
self.assertTrue('Dummy' in scorer.get_description())
metainfo = scorer.get_metainfo()
self.assertTrue('feature_columns' in metainfo)
self.assertTrue('result_columns' in metainfo)
self.assertTrue('workloads' in metainfo)
def test_calculate_score(self):
scorer = dummy_scorer.DummyScorer(config=None)
self._assert_result(scorer, 0, '[0, 0, 0, 0, 0, 0, 0, 0, 0]')
self._assert_result(scorer, 0, '[50, 0, 0, 600, 0, 0, 0, 0, 0]')
self._assert_result(scorer, 0, '[0, 0, 0, 0, 600, 0, 0, 0, 0]')
self._assert_result(scorer, 1, '[85, 0, 0, 0, 0, 0, 0, 0, 0]')
self._assert_result(scorer, 2, '[0, 0, 0, 1100, 1100, 0, 0, 0, 0]')
self._assert_result(scorer, 3,
'[0, 0, 0, 0, 0, 70000000, 70000000, 0, 0]')
def _assert_result(self, scorer, expected, features):
result_str = scorer.calculate_score(features)
actual_result = jsonutils.loads(result_str)[0]
self.assertEqual(expected, actual_result)

View File

@ -0,0 +1,51 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 Intel
#
# Authors: Tomasz Kaczynski <tomasz.kaczynski@intel.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_serialization import jsonutils
from watcher.decision_engine.scoring import dummy_scoring_container
from watcher.tests import base
class TestDummyScoringContainer(base.TestCase):
def setUp(self):
super(TestDummyScoringContainer, self).setUp()
def test_get_scoring_engine_list(self):
scorers = (dummy_scoring_container.DummyScoringContainer
.get_scoring_engine_list())
self.assertEqual(3, len(scorers))
self.assertEqual('dummy_min_scorer', scorers[0].get_name())
self.assertEqual('dummy_max_scorer', scorers[1].get_name())
self.assertEqual('dummy_avg_scorer', scorers[2].get_name())
def test_scorers(self):
scorers = (dummy_scoring_container.DummyScoringContainer
.get_scoring_engine_list())
self._assert_result(scorers[0], 1.1, '[1.1, 2.2, 4, 8]')
self._assert_result(scorers[1], 8, '[1.1, 2.2, 4, 8]')
# float(1 + 2 + 4 + 8) / 4 = 15.0 / 4 = 3.75
self._assert_result(scorers[2], 3.75, '[1, 2, 4, 8]')
def _assert_result(self, scorer, expected, features):
result_str = scorer.calculate_score(features)
actual_result = jsonutils.loads(result_str)[0]
self.assertEqual(expected, actual_result)

View File

@ -0,0 +1,53 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 Intel
#
# Authors: Tomasz Kaczynski <tomasz.kaczynski@intel.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from watcher.decision_engine.scoring import scoring_factory
from watcher.tests import base
class TestScoringFactory(base.TestCase):
def setUp(self):
super(TestScoringFactory, self).setUp()
def test_get_scoring_engine(self):
scorer = scoring_factory.get_scoring_engine('dummy_scorer')
self.assertEqual('dummy_scorer', scorer.get_name())
scorer = scoring_factory.get_scoring_engine('dummy_min_scorer')
self.assertEqual('dummy_min_scorer', scorer.get_name())
scorer = scoring_factory.get_scoring_engine('dummy_max_scorer')
self.assertEqual('dummy_max_scorer', scorer.get_name())
scorer = scoring_factory.get_scoring_engine('dummy_avg_scorer')
self.assertEqual('dummy_avg_scorer', scorer.get_name())
self.assertRaises(
KeyError,
scoring_factory.get_scoring_engine,
'non_existing_scorer')
def test_get_scoring_engine_list(self):
scoring_engines = scoring_factory.get_scoring_engine_list()
engine_names = {'dummy_scorer', 'dummy_min_scorer',
'dummy_max_scorer', 'dummy_avg_scorer'}
for scorer in scoring_engines:
self.assertIn(scorer.get_name(), engine_names)

View File

@ -0,0 +1,61 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from watcher.applier.loading import default
from watcher.common import utils
from watcher.decision_engine.model import model_root
from watcher.decision_engine.strategy import strategies
from watcher.tests import base
from watcher.tests.decision_engine.strategy.strategies import \
faker_cluster_state
class TestDummyWithScorer(base.TestCase):
def setUp(self):
super(TestDummyWithScorer, self).setUp()
# fake cluster
self.fake_cluster = faker_cluster_state.FakerModelCollector()
p_model = mock.patch.object(
strategies.DummyWithScorer, "compute_model",
new_callable=mock.PropertyMock)
self.m_model = p_model.start()
self.addCleanup(p_model.stop)
self.m_model.return_value = model_root.ModelRoot()
self.strategy = strategies.DummyWithScorer(config=mock.Mock())
def test_dummy_with_scorer(self):
dummy = strategies.DummyWithScorer(config=mock.Mock())
dummy.input_parameters = utils.Struct()
dummy.input_parameters.update({'param1': 4.0, 'param2': 'Hi'})
solution = dummy.execute()
self.assertEqual(4, len(solution.actions))
def test_check_parameters(self):
model = self.fake_cluster.generate_scenario_3_with_2_nodes()
self.m_model.return_value = model
self.strategy.input_parameters = utils.Struct()
self.strategy.input_parameters.update({'param1': 4.0, 'param2': 'Hi'})
solution = self.strategy.execute()
loader = default.DefaultActionLoader()
for action in solution.actions:
loaded_action = loader.load(action['action_type'])
loaded_action.input_parameters = action['input_parameters']
loaded_action.validate_parameters()