Switch to Futurist library for asynchronous execution and periodic tasks

This change switches the conductor to using Futurist library executor and
periodic tasks worker instead of oslo.service periodic tasks. This allows
running periodic tasks in parallel and relying on more standard interfaces
(futures, executors) when dealing with asynchronous execution.

A green thread executor is used instead of using an eventlet green pool
directly. The maximum number of workers is taken from the existing
workers_pool_size configuration option, and no tasks are allowed
to be enqueued to mimic the previous behaviour (might be lifted later).

The periodic tasks worker is using the same executor, and its main loop thread
is also running on it. For this reason minimum value for workers_pool_size
is now 3: periodic task main loop, keep alive thread and at least one thread for
other tasks. A special decorator for driver-periodic tasks is now deprecated,
as the generic decorator can be used there as well.

Closes-Bug: #1526277
Change-Id: I57bf7cebfb6db805b6c521bacfef2993b16ce1ee
This commit is contained in:
Dmitry Tantsur 2016-01-07 12:12:59 +01:00
parent a70b5365d3
commit 3429e3824c
15 changed files with 283 additions and 308 deletions

View File

@ -77,12 +77,14 @@ Driver-Specific Periodic Tasks
Drivers may run their own periodic tasks, i.e. actions run repeatedly after Drivers may run their own periodic tasks, i.e. actions run repeatedly after
a certain amount of time. Such task is created by decorating a method on the a certain amount of time. Such task is created by decorating a method on the
driver itself or on any interface with driver_periodic_task_ decorator, e.g. driver itself or on any interface with periodic_ decorator, e.g.
:: ::
from futurist import periodics
class FakePower(base.PowerInterface): class FakePower(base.PowerInterface):
@base.driver_periodic_task(spacing=42) @periodics.periodic(spacing=42)
def task(self, manager, context): def task(self, manager, context):
pass # do something pass # do something
@ -90,7 +92,7 @@ driver itself or on any interface with driver_periodic_task_ decorator, e.g.
def __init__(self): def __init__(self):
self.power = FakePower() self.power = FakePower()
@base.driver_periodic_task(spacing=42) @periodics.periodic(spacing=42)
def task2(self, manager, context): def task2(self, manager, context):
pass # do something pass # do something
@ -98,21 +100,6 @@ driver itself or on any interface with driver_periodic_task_ decorator, e.g.
Here the ``spacing`` argument is a period in seconds for a given periodic task. Here the ``spacing`` argument is a period in seconds for a given periodic task.
For example 'spacing=5' means every 5 seconds. For example 'spacing=5' means every 5 seconds.
.. note::
The ``parallel`` argument may be passed to driver_periodic_task_.
If it's set to False, this task will be run in the periodic task loop,
rather than a separate greenthread.
This is deprecated as of Liberty release, and the parallel argument will be
ignored starting in the Mitaka cycle, as such task would prevent all other
periodic tasks from starting while it is running.
.. note::
By default periodic task names are derived from method names,
so they should be unique within a Python module.
Use ``name`` argument to driver_periodic_task_ to override
automatically generated name.
Message Routing Message Routing
=============== ===============
@ -137,4 +124,4 @@ driver actions such as take-over or clean-up.
.. _DB API: ../api/ironic.db.api.html .. _DB API: ../api/ironic.db.api.html
.. _diskimage-builder: https://github.com/openstack/diskimage-builder .. _diskimage-builder: https://github.com/openstack/diskimage-builder
.. _consistent hashing algorithm: ../api/ironic.common.hash_ring.html .. _consistent hashing algorithm: ../api/ironic.common.hash_ring.html
.. _driver_periodic_task: ../api/ironic.drivers.base.html#ironic.drivers.base.driver_periodic_task .. _periodic: http://docs.openstack.org/developer/futurist/api.html#futurist.periodics.periodic

View File

@ -117,7 +117,8 @@
# Options defined in ironic.common.service # Options defined in ironic.common.service
# #
# Seconds between running periodic tasks. (integer value) # Default interval for running driver periodic tasks. (integer
# value)
#periodic_interval=60 #periodic_interval=60
# Name of this node. This can be an opaque identifier. It is # Name of this node. This can be an opaque identifier. It is
@ -596,7 +597,9 @@
# Options defined in ironic.conductor.base_manager # Options defined in ironic.conductor.base_manager
# #
# The size of the workers greenthread pool. (integer value) # The size of the workers greenthread pool. Note that 2
# threads will be reserved by the conductor itself for
# handling heart beats and periodic tasks. (integer value)
#workers_pool_size=100 #workers_pool_size=100
# Seconds between conductor heart beats. (integer value) # Seconds between conductor heart beats. (integer value)

View File

@ -40,7 +40,8 @@ from ironic.objects import base as objects_base
service_opts = [ service_opts = [
cfg.IntOpt('periodic_interval', cfg.IntOpt('periodic_interval',
default=60, default=60,
help=_('Seconds between running periodic tasks.')), help=_('Default interval for running driver periodic tasks.'),
deprecated_for_removal=True),
cfg.StrOpt('host', cfg.StrOpt('host',
default=socket.getfqdn(), default=socket.getfqdn(),
help=_('Name of this node. This can be an opaque identifier. ' help=_('Name of this node. This can be an opaque identifier. '
@ -79,11 +80,7 @@ class RPCService(service.Service):
self.rpcserver.start() self.rpcserver.start()
self.handle_signal() self.handle_signal()
self.manager.init_host() self.manager.init_host(admin_context)
self.tg.add_dynamic_timer(
self.manager.periodic_tasks,
periodic_interval_max=CONF.periodic_interval,
context=admin_context)
LOG.info(_LI('Created RPC server for service %(service)s on host ' LOG.info(_LI('Created RPC server for service %(service)s on host '
'%(host)s.'), '%(host)s.'),

View File

@ -15,13 +15,13 @@
import inspect import inspect
import threading import threading
from eventlet import greenpool import futurist
from oslo_concurrency import lockutils from futurist import periodics
from futurist import rejection
from oslo_config import cfg from oslo_config import cfg
from oslo_context import context as ironic_context from oslo_context import context as ironic_context
from oslo_db import exception as db_exception from oslo_db import exception as db_exception
from oslo_log import log from oslo_log import log
from oslo_service import periodic_task
from oslo_utils import excutils from oslo_utils import excutils
from ironic.common import driver_factory from ironic.common import driver_factory
@ -40,8 +40,10 @@ from ironic.db import api as dbapi
conductor_opts = [ conductor_opts = [
cfg.IntOpt('workers_pool_size', cfg.IntOpt('workers_pool_size',
default=100, default=100, min=3,
help=_('The size of the workers greenthread pool.')), help=_('The size of the workers greenthread pool. '
'Note that 2 threads will be reserved by the conductor '
'itself for handling heart beats and periodic tasks.')),
cfg.IntOpt('heartbeat_interval', cfg.IntOpt('heartbeat_interval',
default=10, default=10,
help=_('Seconds between conductor heart beats.')), help=_('Seconds between conductor heart beats.')),
@ -51,18 +53,18 @@ conductor_opts = [
CONF = cfg.CONF CONF = cfg.CONF
CONF.register_opts(conductor_opts, 'conductor') CONF.register_opts(conductor_opts, 'conductor')
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
WORKER_SPAWN_lOCK = "conductor_worker_spawn"
class BaseConductorManager(periodic_task.PeriodicTasks): class BaseConductorManager(object):
def __init__(self, host, topic): def __init__(self, host, topic):
super(BaseConductorManager, self).__init__(CONF) super(BaseConductorManager, self).__init__()
if not host: if not host:
host = CONF.host host = CONF.host
self.host = host self.host = host
self.topic = topic self.topic = topic
self.notifier = rpc.get_notifier() self.notifier = rpc.get_notifier()
self._started = False
def _get_driver(self, driver_name): def _get_driver(self, driver_name):
"""Get the driver. """Get the driver.
@ -78,15 +80,29 @@ class BaseConductorManager(periodic_task.PeriodicTasks):
except KeyError: except KeyError:
raise exception.DriverNotFound(driver_name=driver_name) raise exception.DriverNotFound(driver_name=driver_name)
def init_host(self): def init_host(self, admin_context=None):
"""Initialize the conductor host.
:param admin_context: the admin context to pass to periodic tasks.
:raises: RuntimeError when conductor is already running
:raises: NoDriversLoaded when no drivers are enabled on the conductor
"""
if self._started:
raise RuntimeError(_('Attempt to start an already running '
'conductor manager'))
self.dbapi = dbapi.get_instance() self.dbapi = dbapi.get_instance()
self._keepalive_evt = threading.Event() self._keepalive_evt = threading.Event()
"""Event for the keepalive thread.""" """Event for the keepalive thread."""
self._worker_pool = greenpool.GreenPool( # TODO(dtantsur): make the threshold configurable?
size=CONF.conductor.workers_pool_size) rejection_func = rejection.reject_when_reached(
"""GreenPool of background workers for performing tasks async.""" CONF.conductor.workers_pool_size)
self._executor = futurist.GreenThreadPoolExecutor(
max_workers=CONF.conductor.workers_pool_size,
check_and_reject=rejection_func)
"""Executor for performing tasks async."""
self.ring_manager = hash.HashRingManager() self.ring_manager = hash.HashRingManager()
"""Consistent hash ring which maps drivers to conductors.""" """Consistent hash ring which maps drivers to conductors."""
@ -106,15 +122,36 @@ class BaseConductorManager(periodic_task.PeriodicTasks):
LOG.error(msg, self.host) LOG.error(msg, self.host)
raise exception.NoDriversLoaded(conductor=self.host) raise exception.NoDriversLoaded(conductor=self.host)
# Collect driver-specific periodic tasks # Collect driver-specific periodic tasks.
# Conductor periodic tasks accept context argument, driver periodic
# tasks accept this manager and context. We have to ensure that the
# same driver interface class is not traversed twice, otherwise
# we'll have several instances of the same task.
LOG.debug('Collecting periodic tasks')
self._periodic_task_callables = []
periodic_task_classes = set()
self._collect_periodic_tasks(self, (admin_context,))
for driver_obj in driver_factory.drivers().values(): for driver_obj in driver_factory.drivers().values():
self._collect_periodic_tasks(driver_obj) self._collect_periodic_tasks(driver_obj, (self, admin_context))
for iface_name in (driver_obj.core_interfaces + for iface_name in (driver_obj.core_interfaces +
driver_obj.standard_interfaces + driver_obj.standard_interfaces +
['vendor']): ['vendor']):
iface = getattr(driver_obj, iface_name, None) iface = getattr(driver_obj, iface_name, None)
if iface: if iface and iface.__class__ not in periodic_task_classes:
self._collect_periodic_tasks(iface) self._collect_periodic_tasks(iface, (self, admin_context))
periodic_task_classes.add(iface.__class__)
if (len(self._periodic_task_callables) >
CONF.conductor.workers_pool_size):
LOG.warning(_LW('This conductor has %(tasks)d periodic tasks '
'enabled, but only %(workers)d task workers '
'allowed by [conductor]workers_pool_size option'),
{'tasks': len(self._periodic_task_callables),
'workers': CONF.conductor.workers_pool_size})
self._periodic_tasks = periodics.PeriodicWorker(
self._periodic_task_callables,
executor_factory=periodics.ExistingExecutor(self._executor))
# clear all locks held by this conductor before registering # clear all locks held by this conductor before registering
self.dbapi.clear_node_reservations_for_conductor(self.host) self.dbapi.clear_node_reservations_for_conductor(self.host)
@ -134,6 +171,12 @@ class BaseConductorManager(periodic_task.PeriodicTasks):
update_existing=True) update_existing=True)
self.conductor = cdr self.conductor = cdr
# Start periodic tasks
self._periodic_tasks_worker = self._executor.submit(
self._periodic_tasks.start, allow_empty=True)
self._periodic_tasks_worker.add_done_callback(
self._on_periodic_tasks_stop)
# NOTE(lucasagomes): If the conductor server dies abruptly # NOTE(lucasagomes): If the conductor server dies abruptly
# mid deployment (OMM Killer, power outage, etc...) we # mid deployment (OMM Killer, power outage, etc...) we
# can not resume the deployment even if the conductor # can not resume the deployment even if the conductor
@ -161,10 +204,7 @@ class BaseConductorManager(periodic_task.PeriodicTasks):
LOG.critical(_LC('Failed to start keepalive')) LOG.critical(_LC('Failed to start keepalive'))
self.del_host() self.del_host()
def _collect_periodic_tasks(self, obj): self._started = True
for n, method in inspect.getmembers(obj, inspect.ismethod):
if getattr(method, '_periodic_enabled', False):
self.add_periodic_task(method)
def del_host(self, deregister=True): def del_host(self, deregister=True):
# Conductor deregistration fails if called on non-initialized # Conductor deregistration fails if called on non-initialized
@ -190,11 +230,34 @@ class BaseConductorManager(periodic_task.PeriodicTasks):
# Waiting here to give workers the chance to finish. This has the # Waiting here to give workers the chance to finish. This has the
# benefit of releasing locks workers placed on nodes, as well as # benefit of releasing locks workers placed on nodes, as well as
# having work complete normally. # having work complete normally.
self._worker_pool.waitall() self._periodic_tasks.stop()
self._periodic_tasks.wait()
self._executor.shutdown(wait=True)
self._started = False
def periodic_tasks(self, context, raise_on_error=False): def _collect_periodic_tasks(self, obj, args):
"""Periodic tasks are run at pre-specified interval.""" """Collect periodic tasks from a given object.
return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
Populates self._periodic_task_callables with tuples
(callable, args, kwargs).
:param obj: object containing periodic tasks as methods
:param args: tuple with arguments to pass to every task
"""
for name, member in inspect.getmembers(obj):
if periodics.is_periodic(member):
LOG.debug('Found periodic task %(owner)s.%(member)s',
{'owner': obj.__class__.__name__,
'member': name})
self._periodic_task_callables.append((member, args, {}))
def _on_periodic_tasks_stop(self, fut):
try:
fut.result()
except Exception as exc:
LOG.critical(_LC('Periodic tasks worker has failed: %s'), exc)
else:
LOG.info(_LI('Successfully shut down periodic tasks'))
def iter_nodes(self, fields=None, **kwargs): def iter_nodes(self, fields=None, **kwargs):
"""Iterate over nodes mapped to this conductor. """Iterate over nodes mapped to this conductor.
@ -217,7 +280,6 @@ class BaseConductorManager(periodic_task.PeriodicTasks):
if self._mapped_to_this_conductor(*result[:2]): if self._mapped_to_this_conductor(*result[:2]):
yield result yield result
@lockutils.synchronized(WORKER_SPAWN_lOCK, 'ironic-')
def _spawn_worker(self, func, *args, **kwargs): def _spawn_worker(self, func, *args, **kwargs):
"""Create a greenthread to run func(*args, **kwargs). """Create a greenthread to run func(*args, **kwargs).
@ -225,13 +287,13 @@ class BaseConductorManager(periodic_task.PeriodicTasks):
Spawns a greenthread if there are free slots in pool, otherwise raises Spawns a greenthread if there are free slots in pool, otherwise raises
exception. Execution control returns immediately to the caller. exception. Execution control returns immediately to the caller.
:returns: GreenThread object. :returns: Future object.
:raises: NoFreeConductorWorker if worker pool is currently full. :raises: NoFreeConductorWorker if worker pool is currently full.
""" """
if self._worker_pool.free(): try:
return self._worker_pool.spawn(func, *args, **kwargs) return self._executor.submit(func, *args, **kwargs)
else: except futurist.RejectedSubmission:
raise exception.NoFreeConductorWorker() raise exception.NoFreeConductorWorker()
def _conductor_service_record_keepalive(self): def _conductor_service_record_keepalive(self):

View File

@ -46,10 +46,10 @@ import datetime
import tempfile import tempfile
import eventlet import eventlet
from futurist import periodics
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
import oslo_messaging as messaging import oslo_messaging as messaging
from oslo_service import periodic_task
from oslo_utils import excutils from oslo_utils import excutils
from oslo_utils import uuidutils from oslo_utils import uuidutils
@ -1200,8 +1200,7 @@ class ConductorManager(base_manager.BaseConductorManager):
action=action, node=node.uuid, action=action, node=node.uuid,
state=node.provision_state) state=node.provision_state)
@periodic_task.periodic_task( @periodics.periodic(spacing=CONF.conductor.sync_power_state_interval)
spacing=CONF.conductor.sync_power_state_interval)
def _sync_power_states(self, context): def _sync_power_states(self, context):
"""Periodic task to sync power states for the nodes. """Periodic task to sync power states for the nodes.
@ -1269,8 +1268,7 @@ class ConductorManager(base_manager.BaseConductorManager):
# Yield on every iteration # Yield on every iteration
eventlet.sleep(0) eventlet.sleep(0)
@periodic_task.periodic_task( @periodics.periodic(spacing=CONF.conductor.check_provision_state_interval)
spacing=CONF.conductor.check_provision_state_interval)
def _check_deploy_timeouts(self, context): def _check_deploy_timeouts(self, context):
"""Periodically checks whether a deploy RPC call has timed out. """Periodically checks whether a deploy RPC call has timed out.
@ -1292,8 +1290,7 @@ class ConductorManager(base_manager.BaseConductorManager):
self._fail_if_in_state(context, filters, states.DEPLOYWAIT, self._fail_if_in_state(context, filters, states.DEPLOYWAIT,
sort_key, callback_method, err_handler) sort_key, callback_method, err_handler)
@periodic_task.periodic_task( @periodics.periodic(spacing=CONF.conductor.check_provision_state_interval)
spacing=CONF.conductor.check_provision_state_interval)
def _check_deploying_status(self, context): def _check_deploying_status(self, context):
"""Periodically checks the status of nodes in DEPLOYING state. """Periodically checks the status of nodes in DEPLOYING state.
@ -1376,8 +1373,7 @@ class ConductorManager(base_manager.BaseConductorManager):
task.node.conductor_affinity = self.conductor.id task.node.conductor_affinity = self.conductor.id
task.node.save() task.node.save()
@periodic_task.periodic_task( @periodics.periodic(spacing=CONF.conductor.check_provision_state_interval)
spacing=CONF.conductor.check_provision_state_interval)
def _check_cleanwait_timeouts(self, context): def _check_cleanwait_timeouts(self, context):
"""Periodically checks for nodes being cleaned. """Periodically checks for nodes being cleaned.
@ -1402,8 +1398,7 @@ class ConductorManager(base_manager.BaseConductorManager):
last_error=last_error, last_error=last_error,
keep_target_state=True) keep_target_state=True)
@periodic_task.periodic_task( @periodics.periodic(spacing=CONF.conductor.sync_local_state_interval)
spacing=CONF.conductor.sync_local_state_interval)
def _sync_local_state(self, context): def _sync_local_state(self, context):
"""Perform any actions necessary to sync local state. """Perform any actions necessary to sync local state.
@ -1826,8 +1821,7 @@ class ConductorManager(base_manager.BaseConductorManager):
driver = self._get_driver(driver_name) driver = self._get_driver(driver_name)
return driver.get_properties() return driver.get_properties()
@periodic_task.periodic_task( @periodics.periodic(spacing=CONF.conductor.send_sensor_data_interval)
spacing=CONF.conductor.send_sensor_data_interval)
def _send_sensor_data(self, context): def _send_sensor_data(self, context):
"""Periodically sends sensor data to Ceilometer.""" """Periodically sends sensor data to Ceilometer."""
# do nothing if send_sensor_data option is False # do nothing if send_sensor_data option is False
@ -2061,8 +2055,7 @@ class ConductorManager(base_manager.BaseConductorManager):
action='inspect', node=task.node.uuid, action='inspect', node=task.node.uuid,
state=task.node.provision_state) state=task.node.provision_state)
@periodic_task.periodic_task( @periodics.periodic(spacing=CONF.conductor.check_provision_state_interval)
spacing=CONF.conductor.check_provision_state_interval)
def _check_inspect_timeouts(self, context): def _check_inspect_timeouts(self, context):
"""Periodically checks inspect_timeout and fails upon reaching it. """Periodically checks inspect_timeout and fails upon reaching it.

View File

@ -383,15 +383,15 @@ class TaskManager(object):
# for some reason, this is true. # for some reason, this is true.
# All of the above are asserted in tests such that we'll # All of the above are asserted in tests such that we'll
# catch if eventlet ever changes this behavior. # catch if eventlet ever changes this behavior.
thread = None fut = None
try: try:
thread = self._spawn_method(*self._spawn_args, fut = self._spawn_method(*self._spawn_args,
**self._spawn_kwargs) **self._spawn_kwargs)
# NOTE(comstud): Trying to use a lambda here causes # NOTE(comstud): Trying to use a lambda here causes
# the callback to not occur for some reason. This # the callback to not occur for some reason. This
# also makes it easier to test. # also makes it easier to test.
thread.link(self._thread_release_resources) fut.add_done_callback(self._thread_release_resources)
# Don't unlock! The unlock will occur when the # Don't unlock! The unlock will occur when the
# thread finshes. # thread finshes.
return return
@ -408,9 +408,9 @@ class TaskManager(object):
{'method': self._on_error_method.__name__, {'method': self._on_error_method.__name__,
'node': self.node.uuid}) 'node': self.node.uuid})
if thread is not None: if fut is not None:
# This means the link() failed for some # This means the add_done_callback() failed for some
# reason. Nuke the thread. # reason. Nuke the thread.
thread.cancel() fut.cancel()
self.release_resources() self.release_resources()
self.release_resources() self.release_resources()

View File

@ -24,9 +24,9 @@ import inspect
import json import json
import os import os
import eventlet from futurist import periodics
from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from oslo_service import periodic_task
from oslo_utils import excutils from oslo_utils import excutils
import six import six
@ -40,6 +40,10 @@ RAID_CONFIG_SCHEMA = os.path.join(os.path.dirname(__file__),
'raid_config_schema.json') 'raid_config_schema.json')
CONF = cfg.CONF
CONF.import_opt('periodic_interval', 'ironic.common.service')
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class BaseDriver(object): class BaseDriver(object):
"""Base class for all drivers. """Base class for all drivers.
@ -1116,45 +1120,36 @@ def clean_step(priority, abortable=False, argsinfo=None):
return decorator return decorator
def driver_periodic_task(parallel=True, **other): def driver_periodic_task(**kwargs):
"""Decorator for a driver-specific periodic task. """Decorator for a driver-specific periodic task.
Deprecated, please use futurist directly.
Example:: Example::
from futurist import periodics
class MyDriver(base.BaseDriver): class MyDriver(base.BaseDriver):
@base.driver_periodic_task(spacing=42) @periodics.periodic(spacing=42)
def task(self, manager, context): def task(self, manager, context):
# do some job # do some job
:param parallel: If True (default), this task is run in a separate thread. :param kwargs: arguments to pass to @periodics.periodic
If False, this task will be run in the conductor's periodic task
loop, rather than a separate greenthread. This parameter is
deprecated and will be ignored starting with Mitaka cycle.
:param other: arguments to pass to @periodic_task.periodic_task
""" """
# TODO(dtantsur): drop all this magic once LOG.warning(_LW('driver_periodic_task decorator is deprecated, please '
# https://review.openstack.org/#/c/134303/ lands 'use futurist.periodics.periodic directly'))
semaphore = eventlet.semaphore.BoundedSemaphore() # Previously we accepted more arguments, make a backward compatibility
# layer for out-of-tree drivers.
new_kwargs = {}
for arg in ('spacing', 'enabled', 'run_immediately'):
try:
new_kwargs[arg] = kwargs.pop(arg)
except KeyError:
pass
new_kwargs.setdefault('spacing', CONF.periodic_interval)
def decorator2(func): if kwargs:
@six.wraps(func) LOG.warning(_LW('The following arguments are not supported by '
def wrapper(*args, **kwargs): 'futurist.periodics.periodic and are ignored: %s'),
if parallel: ', '.join(kwargs))
def _internal():
with semaphore:
func(*args, **kwargs)
eventlet.greenthread.spawn_n(_internal) return periodics.periodic(**new_kwargs)
else:
LOG.warning(_LW(
'Using periodic tasks with parallel=False is deprecated, '
'"parallel" argument will be ignored starting with '
'the Mitaka release'))
func(*args, **kwargs)
# NOTE(dtantsur): name should be unique
other.setdefault('name', '%s.%s' % (func.__module__, func.__name__))
decorator = periodic_task.periodic_task(**other)
return decorator(wrapper)
return decorator2

View File

@ -16,6 +16,7 @@ Modules required to work with ironic_inspector:
""" """
import eventlet import eventlet
from futurist import periodics
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from oslo_utils import importutils from oslo_utils import importutils
@ -121,7 +122,7 @@ class Inspector(base.InspectInterface):
eventlet.spawn_n(_start_inspection, task.node.uuid, task.context) eventlet.spawn_n(_start_inspection, task.node.uuid, task.context)
return states.INSPECTING return states.INSPECTING
@base.driver_periodic_task(spacing=CONF.inspector.status_check_period, @periodics.periodic(spacing=CONF.inspector.status_check_period,
enabled=CONF.inspector.enabled) enabled=CONF.inspector.enabled)
def _periodic_check_result(self, manager, context): def _periodic_check_result(self, manager, context):
"""Periodic task checking results of inspection.""" """Periodic task checking results of inspection."""

View File

@ -17,6 +17,7 @@
"""Test utils for Ironic Managers.""" """Test utils for Ironic Managers."""
from futurist import periodics
import mock import mock
from oslo_utils import strutils from oslo_utils import strutils
from oslo_utils import uuidutils from oslo_utils import uuidutils
@ -175,7 +176,11 @@ class ServiceSetUpMixin(object):
return return
self.service.del_host() self.service.del_host()
def _start_service(self): def _start_service(self, start_periodic_tasks=False):
if start_periodic_tasks:
self.service.init_host()
else:
with mock.patch.object(periodics, 'PeriodicWorker', autospec=True):
self.service.init_host() self.service.init_host()
self.addCleanup(self._stop_service) self.addCleanup(self._stop_service)

View File

@ -13,6 +13,8 @@
"""Test class for Ironic BaseConductorManager.""" """Test class for Ironic BaseConductorManager."""
import eventlet import eventlet
import futurist
from futurist import periodics
import mock import mock
from oslo_config import cfg from oslo_config import cfg
from oslo_db import exception as db_exception from oslo_db import exception as db_exception
@ -23,6 +25,7 @@ from ironic.conductor import base_manager
from ironic.conductor import manager from ironic.conductor import manager
from ironic.drivers import base as drivers_base from ironic.drivers import base as drivers_base
from ironic import objects from ironic import objects
from ironic.tests import base as tests_base
from ironic.tests.unit.conductor import mgr_utils from ironic.tests.unit.conductor import mgr_utils
from ironic.tests.unit.db import base as tests_db_base from ironic.tests.unit.db import base as tests_db_base
from ironic.tests.unit.objects import utils as obj_utils from ironic.tests.unit.objects import utils as obj_utils
@ -86,6 +89,7 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
res = objects.Conductor.get_by_hostname(self.context, res = objects.Conductor.get_by_hostname(self.context,
self.hostname) self.hostname)
self.assertEqual(init_names, res['drivers']) self.assertEqual(init_names, res['drivers'])
self._stop_service()
# verify that restart registers new driver names # verify that restart registers new driver names
self.config(enabled_drivers=restart_names) self.config(enabled_drivers=restart_names)
@ -98,12 +102,10 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
@mock.patch.object(driver_factory.DriverFactory, '__getitem__') @mock.patch.object(driver_factory.DriverFactory, '__getitem__')
def test_start_registers_driver_specific_tasks(self, get_mock): def test_start_registers_driver_specific_tasks(self, get_mock):
init_names = ['fake1'] init_names = ['fake1']
expected_name = 'ironic.tests.unit.conductor.test_base_manager.task'
expected_name2 = 'ironic.tests.unit.conductor.test_base_manager.iface'
self.config(enabled_drivers=init_names) self.config(enabled_drivers=init_names)
class TestInterface(object): class TestInterface(object):
@drivers_base.driver_periodic_task(spacing=100500) @periodics.periodic(spacing=100500)
def iface(self): def iface(self):
pass pass
@ -113,28 +115,27 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
iface = TestInterface() iface = TestInterface()
@drivers_base.driver_periodic_task(spacing=42) @periodics.periodic(spacing=42)
def task(self, context): def task(self, context):
pass pass
@drivers_base.driver_periodic_task()
def deprecated_task(self, context):
pass
obj = Driver() obj = Driver()
self.assertTrue(obj.task._periodic_enabled)
get_mock.return_value = mock.Mock(obj=obj) get_mock.return_value = mock.Mock(obj=obj)
with mock.patch.object( with mock.patch.object(
driver_factory.DriverFactory()._extension_manager, driver_factory.DriverFactory()._extension_manager,
'names') as mock_names: 'names') as mock_names:
mock_names.return_value = init_names mock_names.return_value = init_names
self._start_service() self._start_service(start_periodic_tasks=True)
tasks = dict(self.service._periodic_tasks)
self.assertEqual(obj.task, tasks[expected_name]) tasks = {c[0] for c in self.service._periodic_task_callables}
self.assertEqual(obj.iface.iface, tasks[expected_name2]) for t in (obj.task, obj.iface.iface, obj.deprecated_task):
self.assertEqual(42, self.assertTrue(periodics.is_periodic(t))
self.service._periodic_spacing[expected_name]) self.assertIn(t, tasks)
self.assertEqual(100500,
self.service._periodic_spacing[expected_name2])
self.assertIn(expected_name, self.service._periodic_last_run)
self.assertIn(expected_name2, self.service._periodic_last_run)
@mock.patch.object(driver_factory.DriverFactory, '__init__') @mock.patch.object(driver_factory.DriverFactory, '__init__')
def test_start_fails_on_missing_driver(self, mock_df): def test_start_fails_on_missing_driver(self, mock_df):
@ -154,6 +155,17 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
self.service.init_host) self.service.init_host)
self.assertTrue(log_mock.error.called) self.assertTrue(log_mock.error.called)
def test_prevent_double_start(self):
self._start_service()
self.assertRaisesRegexp(RuntimeError, 'already running',
self.service.init_host)
@mock.patch.object(base_manager, 'LOG')
def test_warning_on_low_workers_pool(self, log_mock):
CONF.set_override('workers_pool_size', 3, 'conductor')
self._start_service()
self.assertTrue(log_mock.warning.called)
@mock.patch.object(eventlet.greenpool.GreenPool, 'waitall') @mock.patch.object(eventlet.greenpool.GreenPool, 'waitall')
def test_del_host_waits_on_workerpool(self, wait_mock): def test_del_host_waits_on_workerpool(self, wait_mock):
self._start_service() self._start_service()
@ -185,3 +197,23 @@ class KeepAliveTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
mock_is_set.side_effect = [False, False, False, True] mock_is_set.side_effect = [False, False, False, True]
self.service._conductor_service_record_keepalive() self.service._conductor_service_record_keepalive()
self.assertEqual(3, mock_touch.call_count) self.assertEqual(3, mock_touch.call_count)
class ManagerSpawnWorkerTestCase(tests_base.TestCase):
def setUp(self):
super(ManagerSpawnWorkerTestCase, self).setUp()
self.service = manager.ConductorManager('hostname', 'test-topic')
self.executor = mock.Mock(spec=futurist.GreenThreadPoolExecutor)
self.service._executor = self.executor
def test__spawn_worker(self):
self.service._spawn_worker('fake', 1, 2, foo='bar', cat='meow')
self.executor.submit.assert_called_once_with(
'fake', 1, 2, foo='bar', cat='meow')
def test__spawn_worker_none_free(self):
self.executor.submit.side_effect = futurist.RejectedSubmission()
self.assertRaises(exception.NoFreeConductorWorker,
self.service._spawn_worker, 'fake')

View File

@ -70,7 +70,7 @@ class ChangeNodePowerStateTestCase(mgr_utils.ServiceSetUpMixin,
self.service.change_node_power_state(self.context, self.service.change_node_power_state(self.context,
node.uuid, node.uuid,
states.POWER_ON) states.POWER_ON)
self.service._worker_pool.waitall() self._stop_service()
get_power_mock.assert_called_once_with(mock.ANY) get_power_mock.assert_called_once_with(mock.ANY)
node.refresh() node.refresh()
@ -103,7 +103,7 @@ class ChangeNodePowerStateTestCase(mgr_utils.ServiceSetUpMixin,
# In this test worker should not be spawned, but waiting to make sure # In this test worker should not be spawned, but waiting to make sure
# the below perform_mock assertion is valid. # the below perform_mock assertion is valid.
self.service._worker_pool.waitall() self._stop_service()
self.assertFalse(pwr_act_mock.called, 'node_power_action has been ' self.assertFalse(pwr_act_mock.called, 'node_power_action has been '
'unexpectedly called.') 'unexpectedly called.')
# Verify existing reservation wasn't broken. # Verify existing reservation wasn't broken.
@ -162,7 +162,7 @@ class ChangeNodePowerStateTestCase(mgr_utils.ServiceSetUpMixin,
self.service.change_node_power_state(self.context, self.service.change_node_power_state(self.context,
node.uuid, node.uuid,
new_state) new_state)
self.service._worker_pool.waitall() self._stop_service()
get_power_mock.assert_called_once_with(mock.ANY) get_power_mock.assert_called_once_with(mock.ANY)
set_power_mock.assert_called_once_with(mock.ANY, new_state) set_power_mock.assert_called_once_with(mock.ANY, new_state)
@ -298,7 +298,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin,
'first_method', 'POST', 'first_method', 'POST',
info) info)
# Waiting to make sure the below assertions are valid. # Waiting to make sure the below assertions are valid.
self.service._worker_pool.waitall() self._stop_service()
# Assert spawn_after was called # Assert spawn_after was called
self.assertTrue(mock_spawn.called) self.assertTrue(mock_spawn.called)
@ -320,7 +320,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin,
'third_method_sync', 'third_method_sync',
'POST', info) 'POST', info)
# Waiting to make sure the below assertions are valid. # Waiting to make sure the below assertions are valid.
self.service._worker_pool.waitall() self._stop_service()
# Assert no workers were used # Assert no workers were used
self.assertFalse(mock_spawn.called) self.assertFalse(mock_spawn.called)
@ -438,7 +438,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin,
self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0]) self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0])
# Waiting to make sure the below assertions are valid. # Waiting to make sure the below assertions are valid.
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
self.assertIsNone(node.last_error) self.assertIsNone(node.last_error)
@ -715,7 +715,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
provision_state=states.AVAILABLE) provision_state=states.AVAILABLE)
self.service.do_node_deploy(self.context, node.uuid) self.service.do_node_deploy(self.context, node.uuid)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
self.assertEqual(states.DEPLOYING, node.provision_state) self.assertEqual(states.DEPLOYING, node.provision_state)
self.assertEqual(states.ACTIVE, node.target_provision_state) self.assertEqual(states.ACTIVE, node.target_provision_state)
@ -745,7 +745,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
driver_internal_info={'is_whole_disk_image': False}) driver_internal_info={'is_whole_disk_image': False})
self.service.do_node_deploy(self.context, node.uuid, rebuild=True) self.service.do_node_deploy(self.context, node.uuid, rebuild=True)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
self.assertEqual(states.DEPLOYING, node.provision_state) self.assertEqual(states.DEPLOYING, node.provision_state)
self.assertEqual(states.ACTIVE, node.target_provision_state) self.assertEqual(states.ACTIVE, node.target_provision_state)
@ -774,7 +774,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
instance_info={'image_source': uuidutils.generate_uuid()}) instance_info={'image_source': uuidutils.generate_uuid()})
self.service.do_node_deploy(self.context, node.uuid, rebuild=True) self.service.do_node_deploy(self.context, node.uuid, rebuild=True)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
self.assertEqual(states.DEPLOYWAIT, node.provision_state) self.assertEqual(states.DEPLOYWAIT, node.provision_state)
self.assertEqual(states.ACTIVE, node.target_provision_state) self.assertEqual(states.ACTIVE, node.target_provision_state)
@ -798,7 +798,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
target_provision_state=states.NOSTATE) target_provision_state=states.NOSTATE)
self.service.do_node_deploy(self.context, node.uuid, rebuild=True) self.service.do_node_deploy(self.context, node.uuid, rebuild=True)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
self.assertEqual(states.ACTIVE, node.provision_state) self.assertEqual(states.ACTIVE, node.provision_state)
self.assertEqual(states.NOSTATE, node.target_provision_state) self.assertEqual(states.NOSTATE, node.target_provision_state)
@ -822,7 +822,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
target_provision_state=states.NOSTATE) target_provision_state=states.NOSTATE)
self.service.do_node_deploy(self.context, node.uuid, rebuild=True) self.service.do_node_deploy(self.context, node.uuid, rebuild=True)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
self.assertEqual(states.ACTIVE, node.provision_state) self.assertEqual(states.ACTIVE, node.provision_state)
self.assertEqual(states.NOSTATE, node.target_provision_state) self.assertEqual(states.NOSTATE, node.target_provision_state)
@ -845,7 +845,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
target_provision_state=states.NOSTATE) target_provision_state=states.NOSTATE)
self.service.do_node_deploy(self.context, node.uuid, rebuild=True) self.service.do_node_deploy(self.context, node.uuid, rebuild=True)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
self.assertEqual(states.ACTIVE, node.provision_state) self.assertEqual(states.ACTIVE, node.provision_state)
self.assertEqual(states.NOSTATE, node.target_provision_state) self.assertEqual(states.NOSTATE, node.target_provision_state)
@ -893,7 +893,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node.uuid) self.context, node.uuid)
# Compare true exception hidden by @messaging.expected_exceptions # Compare true exception hidden by @messaging.expected_exceptions
self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0]) self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0])
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
# Make sure things were rolled back # Make sure things were rolled back
self.assertEqual(prv_state, node.provision_state) self.assertEqual(prv_state, node.provision_state)
@ -1049,7 +1049,7 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin,
provision_updated_at=datetime.datetime(2000, 1, 1, 0, 0)) provision_updated_at=datetime.datetime(2000, 1, 1, 0, 0))
self.service._check_deploy_timeouts(self.context) self.service._check_deploy_timeouts(self.context)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
self.assertEqual(states.DEPLOYFAIL, node.provision_state) self.assertEqual(states.DEPLOYFAIL, node.provision_state)
self.assertEqual(states.ACTIVE, node.target_provision_state) self.assertEqual(states.ACTIVE, node.target_provision_state)
@ -1067,7 +1067,7 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin,
provision_updated_at=datetime.datetime(2000, 1, 1, 0, 0)) provision_updated_at=datetime.datetime(2000, 1, 1, 0, 0))
self.service._check_cleanwait_timeouts(self.context) self.service._check_cleanwait_timeouts(self.context)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
self.assertEqual(states.CLEANFAIL, node.provision_state) self.assertEqual(states.CLEANFAIL, node.provision_state)
self.assertEqual(tgt_prov_state, node.target_provision_state) self.assertEqual(tgt_prov_state, node.target_provision_state)
@ -1162,8 +1162,9 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin,
target_provision_state=states.AVAILABLE, target_provision_state=states.AVAILABLE,
driver_internal_info={'is_whole_disk_image': False}) driver_internal_info={'is_whole_disk_image': False})
self._start_service()
self.service.do_node_tear_down(self.context, node.uuid) self.service.do_node_tear_down(self.context, node.uuid)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
# Node will be moved to AVAILABLE after cleaning, not tested here # Node will be moved to AVAILABLE after cleaning, not tested here
self.assertEqual(states.CLEANING, node.provision_state) self.assertEqual(states.CLEANING, node.provision_state)
@ -1176,7 +1177,6 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin,
def test__do_node_tear_down_from_valid_states(self): def test__do_node_tear_down_from_valid_states(self):
valid_states = [states.ACTIVE, states.DEPLOYWAIT, states.DEPLOYFAIL, valid_states = [states.ACTIVE, states.DEPLOYWAIT, states.DEPLOYFAIL,
states.ERROR] states.ERROR]
self._start_service()
for state in valid_states: for state in valid_states:
self._test_do_node_tear_down_from_state(state) self._test_do_node_tear_down_from_state(state)
@ -1207,7 +1207,7 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node.uuid) self.context, node.uuid)
# Compare true exception hidden by @messaging.expected_exceptions # Compare true exception hidden by @messaging.expected_exceptions
self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0]) self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0])
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
# Assert instance_info/driver_internal_info was not touched # Assert instance_info/driver_internal_info was not touched
self.assertEqual(fake_instance_info, node.instance_info) self.assertEqual(fake_instance_info, node.instance_info)
@ -1236,7 +1236,7 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node.uuid, 'provide') self.context, node.uuid, 'provide')
# Compare true exception hidden by @messaging.expected_exceptions # Compare true exception hidden by @messaging.expected_exceptions
self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0]) self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0])
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
# Make sure things were rolled back # Make sure things were rolled back
self.assertEqual(prv_state, node.provision_state) self.assertEqual(prv_state, node.provision_state)
@ -1463,7 +1463,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node.uuid, clean_steps) self.context, node.uuid, clean_steps)
# Compare true exception hidden by @messaging.expected_exceptions # Compare true exception hidden by @messaging.expected_exceptions
self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0]) self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0])
self.service._worker_pool.waitall() self._stop_service()
mock_validate.assert_called_once_with(mock.ANY) mock_validate.assert_called_once_with(mock.ANY)
mock_spawn.assert_called_with(self.service._do_node_clean, mock.ANY, mock_spawn.assert_called_with(self.service._do_node_clean, mock.ANY,
clean_steps) clean_steps)
@ -1492,9 +1492,6 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self.service.continue_node_clean, self.service.continue_node_clean,
self.context, node.uuid) self.context, node.uuid)
self.service._worker_pool.waitall()
node.refresh()
@mock.patch('ironic.conductor.manager.ConductorManager._spawn_worker') @mock.patch('ironic.conductor.manager.ConductorManager._spawn_worker')
def test_continue_node_clean_wrong_state(self, mock_spawn): def test_continue_node_clean_wrong_state(self, mock_spawn):
# Test the appropriate exception is raised if node isn't already # Test the appropriate exception is raised if node isn't already
@ -1511,7 +1508,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self.service.continue_node_clean, self.service.continue_node_clean,
self.context, node.uuid) self.context, node.uuid)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
# Make sure things were rolled back # Make sure things were rolled back
self.assertEqual(prv_state, node.provision_state) self.assertEqual(prv_state, node.provision_state)
@ -1533,7 +1530,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
clean_step=self.clean_steps[0]) clean_step=self.clean_steps[0])
self._start_service() self._start_service()
self.service.continue_node_clean(self.context, node.uuid) self.service.continue_node_clean(self.context, node.uuid)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
self.assertEqual(states.CLEANING, node.provision_state) self.assertEqual(states.CLEANING, node.provision_state)
self.assertEqual(tgt_prv_state, node.target_provision_state) self.assertEqual(tgt_prv_state, node.target_provision_state)
@ -1561,7 +1558,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
driver_internal_info=driver_info, clean_step=self.clean_steps[0]) driver_internal_info=driver_info, clean_step=self.clean_steps[0])
self._start_service() self._start_service()
self.service.continue_node_clean(self.context, node.uuid) self.service.continue_node_clean(self.context, node.uuid)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
if skip: if skip:
expected_step_index = 1 expected_step_index = 1
@ -1591,7 +1588,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self._start_service() self._start_service()
self.service.continue_node_clean(self.context, node.uuid) self.service.continue_node_clean(self.context, node.uuid)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
self.assertEqual(states.CLEANFAIL, node.provision_state) self.assertEqual(states.CLEANFAIL, node.provision_state)
self.assertEqual(tgt_prov_state, node.target_provision_state) self.assertEqual(tgt_prov_state, node.target_provision_state)
@ -1619,7 +1616,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self._start_service() self._start_service()
self.service.continue_node_clean(self.context, node.uuid) self.service.continue_node_clean(self.context, node.uuid)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
self.assertEqual(tgt_prov_state, node.provision_state) self.assertEqual(tgt_prov_state, node.provision_state)
self.assertIsNone(node.target_provision_state) self.assertIsNone(node.target_provision_state)
@ -1667,7 +1664,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
with task_manager.acquire( with task_manager.acquire(
self.context, node.uuid, shared=False) as task: self.context, node.uuid, shared=False) as task:
self.service._do_node_clean(task) self.service._do_node_clean(task)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
# Assert that the node was moved to available without cleaning # Assert that the node was moved to available without cleaning
@ -1779,7 +1776,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node.uuid, shared=False) as task: self.context, node.uuid, shared=False) as task:
self.service._do_node_clean(task, clean_steps=clean_steps) self.service._do_node_clean(task, clean_steps=clean_steps)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
mock_validate.assert_called_once_with(task) mock_validate.assert_called_once_with(task)
@ -1827,7 +1824,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node.uuid, shared=False) as task: self.context, node.uuid, shared=False) as task:
self.service._do_next_clean_step(task, 0) self.service._do_next_clean_step(task, 0)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
self.assertEqual(states.CLEANWAIT, node.provision_state) self.assertEqual(states.CLEANWAIT, node.provision_state)
@ -1868,7 +1865,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node.uuid, shared=False) as task: self.context, node.uuid, shared=False) as task:
self.service._do_next_clean_step(task, self.next_clean_step_index) self.service._do_next_clean_step(task, self.next_clean_step_index)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
self.assertEqual(states.CLEANWAIT, node.provision_state) self.assertEqual(states.CLEANWAIT, node.provision_state)
@ -1907,7 +1904,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node.uuid, shared=False) as task: self.context, node.uuid, shared=False) as task:
self.service._do_next_clean_step(task, None) self.service._do_next_clean_step(task, None)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
# Cleaning should be complete without calling additional steps # Cleaning should be complete without calling additional steps
@ -1947,7 +1944,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node.uuid, shared=False) as task: self.context, node.uuid, shared=False) as task:
self.service._do_next_clean_step(task, 0) self.service._do_next_clean_step(task, 0)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
# Cleaning should be complete # Cleaning should be complete
@ -1992,7 +1989,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self.service._do_next_clean_step(task, 0) self.service._do_next_clean_step(task, 0)
tear_mock.assert_called_once_with(task.driver.deploy, task) tear_mock.assert_called_once_with(task.driver.deploy, task)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
# Make sure we go to CLEANFAIL, clear clean_steps # Make sure we go to CLEANFAIL, clear clean_steps
@ -2034,7 +2031,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node.uuid, shared=False) as task: self.context, node.uuid, shared=False) as task:
self.service._do_next_clean_step(task, 0) self.service._do_next_clean_step(task, 0)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
# Make sure we go to CLEANFAIL, clear clean_steps # Make sure we go to CLEANFAIL, clear clean_steps
@ -2075,7 +2072,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node.uuid, shared=False) as task: self.context, node.uuid, shared=False) as task:
self.service._do_next_clean_step(task, None) self.service._do_next_clean_step(task, None)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
# Cleaning should be complete without calling additional steps # Cleaning should be complete without calling additional steps
@ -2114,7 +2111,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node.uuid, shared=False) as task: self.context, node.uuid, shared=False) as task:
self.service._do_next_clean_step(task, 0) self.service._do_next_clean_step(task, 0)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
# Make sure we go to CLEANFAIL, clear clean_steps # Make sure we go to CLEANFAIL, clear clean_steps
@ -2232,7 +2229,7 @@ class DoNodeVerifyTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node['id'], shared=False) as task: self.context, node['id'], shared=False) as task:
self.service._do_node_verify(task) self.service._do_node_verify(task)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
mock_validate.assert_called_once_with(task) mock_validate.assert_called_once_with(task)
@ -2261,7 +2258,7 @@ class DoNodeVerifyTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node['id'], shared=False) as task: self.context, node['id'], shared=False) as task:
self.service._do_node_verify(task) self.service._do_node_verify(task)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
mock_validate.assert_called_once_with(task) mock_validate.assert_called_once_with(task)
@ -2289,7 +2286,7 @@ class DoNodeVerifyTestCase(mgr_utils.ServiceSetUpMixin,
self.context, node['id'], shared=False) as task: self.context, node['id'], shared=False) as task:
self.service._do_node_verify(task) self.service._do_node_verify(task)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
mock_get_power_state.assert_called_once_with(task) mock_get_power_state.assert_called_once_with(task)
@ -2394,14 +2391,14 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
self.context, node.uuid, True) self.context, node.uuid, True)
# Compare true exception hidden by @messaging.expected_exceptions # Compare true exception hidden by @messaging.expected_exceptions
self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0]) self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0])
self.service._worker_pool.waitall() self._stop_service()
spawn_mock.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY) spawn_mock.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY)
def test_set_console_mode_enabled(self): def test_set_console_mode_enabled(self):
node = obj_utils.create_test_node(self.context, driver='fake') node = obj_utils.create_test_node(self.context, driver='fake')
self._start_service() self._start_service()
self.service.set_console_mode(self.context, node.uuid, True) self.service.set_console_mode(self.context, node.uuid, True)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
self.assertTrue(node.console_enabled) self.assertTrue(node.console_enabled)
@ -2409,7 +2406,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
node = obj_utils.create_test_node(self.context, driver='fake') node = obj_utils.create_test_node(self.context, driver='fake')
self._start_service() self._start_service()
self.service.set_console_mode(self.context, node.uuid, False) self.service.set_console_mode(self.context, node.uuid, False)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
self.assertFalse(node.console_enabled) self.assertFalse(node.console_enabled)
@ -2425,7 +2422,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
# Compare true exception hidden by @messaging.expected_exceptions # Compare true exception hidden by @messaging.expected_exceptions
self.assertEqual(exception.UnsupportedDriverExtension, self.assertEqual(exception.UnsupportedDriverExtension,
exc.exc_info[0]) exc.exc_info[0])
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
def test_set_console_mode_validation_fail(self): def test_set_console_mode_validation_fail(self):
@ -2449,7 +2446,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
'start_console') as mock_sc: 'start_console') as mock_sc:
mock_sc.side_effect = exception.IronicException('test-error') mock_sc.side_effect = exception.IronicException('test-error')
self.service.set_console_mode(self.context, node.uuid, True) self.service.set_console_mode(self.context, node.uuid, True)
self.service._worker_pool.waitall() self._stop_service()
mock_sc.assert_called_once_with(mock.ANY) mock_sc.assert_called_once_with(mock.ANY)
node.refresh() node.refresh()
self.assertIsNotNone(node.last_error) self.assertIsNotNone(node.last_error)
@ -2463,7 +2460,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
'stop_console') as mock_sc: 'stop_console') as mock_sc:
mock_sc.side_effect = exception.IronicException('test-error') mock_sc.side_effect = exception.IronicException('test-error')
self.service.set_console_mode(self.context, node.uuid, False) self.service.set_console_mode(self.context, node.uuid, False)
self.service._worker_pool.waitall() self._stop_service()
mock_sc.assert_called_once_with(mock.ANY) mock_sc.assert_called_once_with(mock.ANY)
node.refresh() node.refresh()
self.assertIsNotNone(node.last_error) self.assertIsNotNone(node.last_error)
@ -2475,7 +2472,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
with mock.patch.object(self.driver.console, with mock.patch.object(self.driver.console,
'start_console') as mock_sc: 'start_console') as mock_sc:
self.service.set_console_mode(self.context, node.uuid, True) self.service.set_console_mode(self.context, node.uuid, True)
self.service._worker_pool.waitall() self._stop_service()
self.assertFalse(mock_sc.called) self.assertFalse(mock_sc.called)
def test_disable_console_already_disabled(self): def test_disable_console_already_disabled(self):
@ -2485,7 +2482,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
with mock.patch.object(self.driver.console, with mock.patch.object(self.driver.console,
'stop_console') as mock_sc: 'stop_console') as mock_sc:
self.service.set_console_mode(self.context, node.uuid, False) self.service.set_console_mode(self.context, node.uuid, False)
self.service._worker_pool.waitall() self._stop_service()
self.assertFalse(mock_sc.called) self.assertFalse(mock_sc.called)
def test_get_console(self): def test_get_console(self):
@ -3065,32 +3062,6 @@ class RaidTestCases(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase):
self.assertEqual(exception.InvalidParameterValue, exc.exc_info[0]) self.assertEqual(exception.InvalidParameterValue, exc.exc_info[0])
class ManagerSpawnWorkerTestCase(tests_base.TestCase):
def setUp(self):
super(ManagerSpawnWorkerTestCase, self).setUp()
self.service = manager.ConductorManager('hostname', 'test-topic')
def test__spawn_worker(self):
worker_pool = mock.Mock(spec_set=['free', 'spawn'])
worker_pool.free.return_value = True
self.service._worker_pool = worker_pool
self.service._spawn_worker('fake', 1, 2, foo='bar', cat='meow')
worker_pool.spawn.assert_called_once_with(
'fake', 1, 2, foo='bar', cat='meow')
def test__spawn_worker_none_free(self):
worker_pool = mock.Mock(spec_set=['free', 'spawn'])
worker_pool.free.return_value = False
self.service._worker_pool = worker_pool
self.assertRaises(exception.NoFreeConductorWorker,
self.service._spawn_worker, 'fake')
self.assertFalse(worker_pool.spawn.called)
@mock.patch.object(conductor_utils, 'node_power_action') @mock.patch.object(conductor_utils, 'node_power_action')
class ManagerDoSyncPowerStateTestCase(tests_db_base.DbTestCase): class ManagerDoSyncPowerStateTestCase(tests_db_base.DbTestCase):
def setUp(self): def setUp(self):
@ -4184,7 +4155,7 @@ class NodeInspectHardware(mgr_utils.ServiceSetUpMixin,
inspection_started_at=datetime.datetime(2000, 1, 1, 0, 0)) inspection_started_at=datetime.datetime(2000, 1, 1, 0, 0))
self.service._check_inspect_timeouts(self.context) self.service._check_inspect_timeouts(self.context)
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
self.assertEqual(states.INSPECTFAIL, node.provision_state) self.assertEqual(states.INSPECTFAIL, node.provision_state)
self.assertEqual(states.MANAGEABLE, node.target_provision_state) self.assertEqual(states.MANAGEABLE, node.target_provision_state)
@ -4207,7 +4178,7 @@ class NodeInspectHardware(mgr_utils.ServiceSetUpMixin,
self.context, node.uuid) self.context, node.uuid)
# Compare true exception hidden by @messaging.expected_exceptions # Compare true exception hidden by @messaging.expected_exceptions
self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0]) self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0])
self.service._worker_pool.waitall() self._stop_service()
node.refresh() node.refresh()
# Make sure things were rolled back # Make sure things were rolled back
self.assertEqual(prv_state, node.provision_state) self.assertEqual(prv_state, node.provision_state)

View File

@ -17,8 +17,6 @@
"""Tests for :class:`ironic.conductor.task_manager`.""" """Tests for :class:`ironic.conductor.task_manager`."""
import eventlet
from eventlet import greenpool
import mock import mock
from oslo_utils import uuidutils from oslo_utils import uuidutils
@ -47,6 +45,7 @@ class TaskManagerTestCase(tests_db_base.DbTestCase):
self.config(node_locked_retry_attempts=1, group='conductor') self.config(node_locked_retry_attempts=1, group='conductor')
self.config(node_locked_retry_interval=0, group='conductor') self.config(node_locked_retry_interval=0, group='conductor')
self.node = obj_utils.create_test_node(self.context) self.node = obj_utils.create_test_node(self.context)
self.future_mock = mock.Mock(spec=['cancel', 'add_done_callback'])
def test_excl_lock(self, get_portgroups_mock, get_ports_mock, def test_excl_lock(self, get_portgroups_mock, get_ports_mock,
get_driver_mock, reserve_mock, release_mock, get_driver_mock, reserve_mock, release_mock,
@ -389,8 +388,7 @@ class TaskManagerTestCase(tests_db_base.DbTestCase):
def test_spawn_after( def test_spawn_after(
self, get_portgroups_mock, get_ports_mock, get_driver_mock, self, get_portgroups_mock, get_ports_mock, get_driver_mock,
reserve_mock, release_mock, node_get_mock): reserve_mock, release_mock, node_get_mock):
thread_mock = mock.Mock(spec_set=['link', 'cancel']) spawn_mock = mock.Mock(return_value=self.future_mock)
spawn_mock = mock.Mock(return_value=thread_mock)
task_release_mock = mock.Mock() task_release_mock = mock.Mock()
reserve_mock.return_value = self.node reserve_mock.return_value = self.node
@ -399,9 +397,9 @@ class TaskManagerTestCase(tests_db_base.DbTestCase):
task.release_resources = task_release_mock task.release_resources = task_release_mock
spawn_mock.assert_called_once_with(1, 2, foo='bar', cat='meow') spawn_mock.assert_called_once_with(1, 2, foo='bar', cat='meow')
thread_mock.link.assert_called_once_with( self.future_mock.add_done_callback.assert_called_once_with(
task._thread_release_resources) task._thread_release_resources)
self.assertFalse(thread_mock.cancel.called) self.assertFalse(self.future_mock.cancel.called)
# Since we mocked link(), we're testing that __exit__ didn't # Since we mocked link(), we're testing that __exit__ didn't
# release resources pending the finishing of the background # release resources pending the finishing of the background
# thread # thread
@ -444,9 +442,9 @@ class TaskManagerTestCase(tests_db_base.DbTestCase):
def test_spawn_after_link_fails( def test_spawn_after_link_fails(
self, get_portgroups_mock, get_ports_mock, get_driver_mock, self, get_portgroups_mock, get_ports_mock, get_driver_mock,
reserve_mock, release_mock, node_get_mock): reserve_mock, release_mock, node_get_mock):
thread_mock = mock.Mock(spec_set=['link', 'cancel']) self.future_mock.add_done_callback.side_effect = (
thread_mock.link.side_effect = exception.IronicException('foo') exception.IronicException('foo'))
spawn_mock = mock.Mock(return_value=thread_mock) spawn_mock = mock.Mock(return_value=self.future_mock)
task_release_mock = mock.Mock() task_release_mock = mock.Mock()
thr_release_mock = mock.Mock(spec_set=[]) thr_release_mock = mock.Mock(spec_set=[])
reserve_mock.return_value = self.node reserve_mock.return_value = self.node
@ -459,8 +457,9 @@ class TaskManagerTestCase(tests_db_base.DbTestCase):
self.assertRaises(exception.IronicException, _test_it) self.assertRaises(exception.IronicException, _test_it)
spawn_mock.assert_called_once_with(1, 2, foo='bar', cat='meow') spawn_mock.assert_called_once_with(1, 2, foo='bar', cat='meow')
thread_mock.link.assert_called_once_with(thr_release_mock) self.future_mock.add_done_callback.assert_called_once_with(
thread_mock.cancel.assert_called_once_with() thr_release_mock)
self.future_mock.cancel.assert_called_once_with()
task_release_mock.assert_called_once_with() task_release_mock.assert_called_once_with()
def test_spawn_after_on_error_hook( def test_spawn_after_on_error_hook(
@ -659,75 +658,3 @@ class ExclusiveLockDecoratorTestCase(tests_base.TestCase):
_req_excl_lock_method, _req_excl_lock_method,
*self.args_task_second, *self.args_task_second,
**self.kwargs) **self.kwargs)
class TaskManagerGreenThreadTestCase(tests_base.TestCase):
"""Class to assert our assumptions about greenthread behavior."""
def test_gt_link_callback_added_during_execution(self):
pool = greenpool.GreenPool()
q1 = eventlet.Queue()
q2 = eventlet.Queue()
def func():
q1.put(None)
q2.get()
link_callback = mock.Mock()
thread = pool.spawn(func)
q1.get()
thread.link(link_callback)
q2.put(None)
pool.waitall()
link_callback.assert_called_once_with(thread)
def test_gt_link_callback_added_after_execution(self):
pool = greenpool.GreenPool()
link_callback = mock.Mock()
thread = pool.spawn(lambda: None)
pool.waitall()
thread.link(link_callback)
link_callback.assert_called_once_with(thread)
def test_gt_link_callback_exception_inside_thread(self):
pool = greenpool.GreenPool()
q1 = eventlet.Queue()
q2 = eventlet.Queue()
def func():
q1.put(None)
q2.get()
raise Exception()
link_callback = mock.Mock()
thread = pool.spawn(func)
q1.get()
thread.link(link_callback)
q2.put(None)
pool.waitall()
link_callback.assert_called_once_with(thread)
def test_gt_link_callback_added_after_exception_inside_thread(self):
pool = greenpool.GreenPool()
def func():
raise Exception()
link_callback = mock.Mock()
thread = pool.spawn(func)
pool.waitall()
thread.link(link_callback)
link_callback.assert_called_once_with(thread)
def test_gt_cancel_doesnt_run_thread(self):
pool = greenpool.GreenPool()
func = mock.Mock()
thread = pool.spawn(func)
thread.link(lambda t: None)
thread.cancel()
pool.waitall()
self.assertFalse(func.called)

View File

@ -15,7 +15,7 @@
import json import json
import eventlet from futurist import periodics
import mock import mock
from ironic.common import exception from ironic.common import exception
@ -85,36 +85,21 @@ class PassthruDecoratorTestCase(base.TestCase):
inst2.driver_routes['driver_noexception']['func']) inst2.driver_routes['driver_noexception']['func'])
@mock.patch.object(eventlet.greenthread, 'spawn_n', autospec=True,
side_effect=lambda func, *args, **kw: func(*args, **kw))
class DriverPeriodicTaskTestCase(base.TestCase): class DriverPeriodicTaskTestCase(base.TestCase):
def test(self, spawn_mock): def test(self):
method_mock = mock.MagicMock(spec_set=[]) method_mock = mock.MagicMock(spec_set=[])
function_mock = mock.MagicMock(spec_set=[])
class TestClass(object): class TestClass(object):
@driver_base.driver_periodic_task(spacing=42) @driver_base.driver_periodic_task(spacing=42)
def method(self, foo, bar=None): def method(self, foo, bar=None):
method_mock(foo, bar=bar) method_mock(foo, bar=bar)
@driver_base.driver_periodic_task(spacing=100, parallel=False)
def function():
function_mock()
obj = TestClass() obj = TestClass()
self.assertEqual(42, obj.method._periodic_spacing) self.assertEqual(42, obj.method._periodic_spacing)
self.assertTrue(obj.method._periodic_task) self.assertTrue(periodics.is_periodic(obj.method))
self.assertEqual('ironic.tests.unit.drivers.test_base.method',
obj.method._periodic_name)
self.assertEqual('ironic.tests.unit.drivers.test_base.function',
function._periodic_name)
obj.method(1, bar=2) obj.method(1, bar=2)
method_mock.assert_called_once_with(1, bar=2) method_mock.assert_called_once_with(1, bar=2)
self.assertEqual(1, spawn_mock.call_count)
function()
function_mock.assert_called_once_with()
self.assertEqual(1, spawn_mock.call_count)
class CleanStepDecoratorTestCase(base.TestCase): class CleanStepDecoratorTestCase(base.TestCase):

View File

@ -0,0 +1,16 @@
---
prelude: >
This release features switch to Oslo Futurist library for asynchronous
thread execution and periodic tasks. Main benefit is that periodic tasks
are now executed truly in parallel, and not sequentially in one
green thread.
upgrade:
- Configuration option "workers_pool_size" can no longer be less or equal
to 2. Please set it to greater value (the default is 100) before update.
deprecations:
- Configuration option "periodic_interval" is deprecated.
- Using "driver_periodic_task" decorator is deprecated. Please update your
out-of-tree drivers to use "periodics.periodic" decorator from Futurist
library.
fixes:
- Periodic tasks are no longer executed all in one thread.

View File

@ -43,3 +43,4 @@ retrying!=1.3.0,>=1.2.3 # Apache-2.0
oslo.versionedobjects>=1.5.0 # Apache-2.0 oslo.versionedobjects>=1.5.0 # Apache-2.0
jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT
psutil<2.0.0,>=1.1.1 # BSD psutil<2.0.0,>=1.1.1 # BSD
futurist>=0.11.0 # Apache-2.0