Merge "Watcher Planner Selector"
This commit is contained in:
commit
62020cac30
@ -26,7 +26,7 @@ from oslo_log import log
|
||||
from watcher.applier import rpcapi
|
||||
from watcher.common import exception
|
||||
from watcher.common import service
|
||||
from watcher.decision_engine.planner import manager as planner_manager
|
||||
from watcher.decision_engine.loading import default as loader
|
||||
from watcher.decision_engine.strategy.context import default as default_context
|
||||
from watcher import notifications
|
||||
from watcher import objects
|
||||
@ -41,11 +41,11 @@ LOG = log.getLogger(__name__)
|
||||
class BaseAuditHandler(object):
|
||||
|
||||
@abc.abstractmethod
|
||||
def execute(self, audit_uuid, request_context):
|
||||
def execute(self, audit, request_context):
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def pre_execute(self, audit_uuid, request_context):
|
||||
def pre_execute(self, audit, request_context):
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
@ -63,15 +63,18 @@ class AuditHandler(BaseAuditHandler):
|
||||
def __init__(self):
|
||||
super(AuditHandler, self).__init__()
|
||||
self._strategy_context = default_context.DefaultStrategyContext()
|
||||
self._planner_manager = planner_manager.PlannerManager()
|
||||
self._planner = None
|
||||
self._planner_loader = loader.DefaultPlannerLoader()
|
||||
self.applier_client = rpcapi.ApplierAPI()
|
||||
|
||||
@property
|
||||
def planner(self):
|
||||
if self._planner is None:
|
||||
self._planner = self._planner_manager.load()
|
||||
return self._planner
|
||||
def get_planner(self, audit, request_context):
|
||||
# because AuditHandler is a singletone we need to avoid race condition.
|
||||
# thus we need to load planner every time
|
||||
strategy = self.strategy_context.select_strategy(
|
||||
audit, request_context)
|
||||
planner_name = strategy.planner
|
||||
LOG.debug("Loading %s", planner_name)
|
||||
planner = self._planner_loader.load(name=planner_name)
|
||||
return planner
|
||||
|
||||
@property
|
||||
def strategy_context(self):
|
||||
@ -90,8 +93,8 @@ class AuditHandler(BaseAuditHandler):
|
||||
request_context, audit,
|
||||
action=fields.NotificationAction.PLANNER,
|
||||
phase=fields.NotificationPhase.START)
|
||||
action_plan = self.planner.schedule(request_context, audit.id,
|
||||
solution)
|
||||
planner = self.get_planner(audit, request_context)
|
||||
action_plan = planner.schedule(request_context, audit.id, solution)
|
||||
notifications.audit.send_action_notification(
|
||||
request_context, audit,
|
||||
action=fields.NotificationAction.PLANNER,
|
||||
@ -105,13 +108,15 @@ class AuditHandler(BaseAuditHandler):
|
||||
phase=fields.NotificationPhase.ERROR)
|
||||
raise
|
||||
|
||||
def update_audit_state(self, audit, state):
|
||||
@staticmethod
|
||||
def update_audit_state(audit, state):
|
||||
if audit.state != state:
|
||||
LOG.debug("Update audit state: %s", state)
|
||||
audit.state = state
|
||||
audit.save()
|
||||
|
||||
def check_ongoing_action_plans(self, request_context):
|
||||
@staticmethod
|
||||
def check_ongoing_action_plans(request_context):
|
||||
a_plan_filters = {'state': objects.action_plan.State.ONGOING}
|
||||
ongoing_action_plans = objects.ActionPlan.list(
|
||||
request_context, filters=a_plan_filters)
|
||||
|
@ -94,7 +94,8 @@ class ContinuousAuditHandler(base.AuditHandler):
|
||||
plan.save()
|
||||
return solution
|
||||
|
||||
def _next_cron_time(self, audit):
|
||||
@staticmethod
|
||||
def _next_cron_time(audit):
|
||||
if utils.is_cron_like(audit.interval):
|
||||
return croniter(audit.interval, datetime.datetime.utcnow()
|
||||
).get_next(datetime.datetime)
|
||||
|
@ -19,10 +19,7 @@ from oslo_log import log
|
||||
|
||||
from watcher.decision_engine.loading import default as loader
|
||||
|
||||
from watcher import conf
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
CONF = conf.CONF
|
||||
|
||||
|
||||
class PlannerManager(object):
|
||||
@ -33,7 +30,6 @@ class PlannerManager(object):
|
||||
def loader(self):
|
||||
return self._loader
|
||||
|
||||
def load(self):
|
||||
selected_planner = CONF.watcher_planner.planner
|
||||
LOG.debug("Loading %s", selected_planner)
|
||||
return self.loader.load(name=selected_planner)
|
||||
def load(self, planner_name):
|
||||
LOG.debug("Loading %s", planner_name)
|
||||
return self.loader.load(name=planner_name)
|
||||
|
@ -30,7 +30,8 @@ class DefaultStrategyContext(base.StrategyContext):
|
||||
super(DefaultStrategyContext, self).__init__()
|
||||
LOG.debug("Initializing Strategy Context")
|
||||
|
||||
def do_execute_strategy(self, audit, request_context):
|
||||
@staticmethod
|
||||
def select_strategy(audit, request_context):
|
||||
osc = clients.OpenStackClients()
|
||||
# todo(jed) retrieve in audit parameters (threshold,...)
|
||||
# todo(jed) create ActionPlan
|
||||
@ -50,9 +51,10 @@ class DefaultStrategyContext(base.StrategyContext):
|
||||
goal_name=goal.name,
|
||||
strategy_name=strategy_name,
|
||||
osc=osc)
|
||||
return strategy_selector.select()
|
||||
|
||||
selected_strategy = strategy_selector.select()
|
||||
|
||||
def do_execute_strategy(self, audit, request_context):
|
||||
selected_strategy = self.select_strategy(audit, request_context)
|
||||
selected_strategy.audit_scope = audit.scope
|
||||
|
||||
schema = selected_strategy.get_schema()
|
||||
|
@ -161,6 +161,7 @@ class BaseStrategy(loadable.Loadable):
|
||||
self._input_parameters = utils.Struct()
|
||||
self._audit_scope = None
|
||||
self._datasource_backend = None
|
||||
self._planner = 'weight'
|
||||
|
||||
@classmethod
|
||||
@abc.abstractmethod
|
||||
@ -432,6 +433,14 @@ class BaseStrategy(loadable.Loadable):
|
||||
def state_collector(self, s):
|
||||
self._cluster_state_collector = s
|
||||
|
||||
@property
|
||||
def planner(self):
|
||||
return self._planner
|
||||
|
||||
@planner.setter
|
||||
def planner(self, s):
|
||||
self._planner = s
|
||||
|
||||
def filter_instances_by_audit_tag(self, instances):
|
||||
if not self.config.check_optimize_metadata:
|
||||
return instances
|
||||
@ -512,6 +521,11 @@ class ThermalOptimizationBaseStrategy(BaseStrategy):
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class WorkloadStabilizationBaseStrategy(BaseStrategy):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(WorkloadStabilizationBaseStrategy, self
|
||||
).__init__(*args, **kwargs)
|
||||
self._planner = 'workload_stabilization'
|
||||
|
||||
@classmethod
|
||||
def get_goal_name(cls):
|
||||
return "workload_balancing"
|
||||
|
@ -439,7 +439,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
|
||||
audit_handler.launch_audits_periodically()
|
||||
m_remove_job.assert_called()
|
||||
|
||||
@mock.patch.object(continuous.ContinuousAuditHandler, 'planner',
|
||||
@mock.patch.object(continuous.ContinuousAuditHandler, 'get_planner',
|
||||
mock.Mock())
|
||||
@mock.patch.object(base_strategy.BaseStrategy, "compute_model",
|
||||
mock.Mock(stale=False))
|
||||
|
@ -25,4 +25,6 @@ class TestPlannerManager(base.TestCase):
|
||||
def test_load(self):
|
||||
cfg.CONF.set_override('planner', "weight", group='watcher_planner')
|
||||
manager = planner.PlannerManager()
|
||||
self.assertIsInstance(manager.load(), weight.WeightPlanner)
|
||||
selected_planner = cfg.CONF.watcher_planner.planner
|
||||
self.assertIsInstance(manager.load(selected_planner),
|
||||
weight.WeightPlanner)
|
||||
|
Loading…
Reference in New Issue
Block a user