Make scheduler delay configurable

* Made scheduler delay configurable. It now consists of a fixed
  part configured with the 'fixed_delay' property and a random
  addition limited by the 'random_delay' config property.
  Because of this, using loopingcall from oslo was replaced with
  a regular loop in a separate thread becase loopingcall
  supports only fixed delays.

Closes-Bug: #1721733
Change-Id: I8f6a15be339e208755323afb18e4b58f886770c1
This commit is contained in:
Renat Akhmerov 2017-10-06 13:16:11 +07:00
parent 14c8d807b1
commit f6b6f1d20b
2 changed files with 83 additions and 29 deletions

View File

@ -194,6 +194,31 @@ executor_opts = [
)
]
scheduler_opts = [
cfg.FloatOpt(
'fixed_delay',
default=1,
min=0.1,
help=(
'Fixed part of the delay between scheduler iterations, '
'in seconds. '
'Full delay is defined as a sum of "fixed_delay" and a random '
'delay limited by "random_delay".'
)
),
cfg.FloatOpt(
'random_delay',
default=0,
min=0,
help=(
'Max value of the random part of the delay between scheduler '
'iterations, in seconds. '
'Full delay is defined as a sum of "fixed_delay" and a random '
'delay limited by this property.'
)
),
]
event_engine_opts = [
cfg.HostAddressOpt(
'host',
@ -336,6 +361,7 @@ CONF = cfg.CONF
API_GROUP = 'api'
ENGINE_GROUP = 'engine'
EXECUTOR_GROUP = 'executor'
SCHEDULER_GROUP = 'scheduler'
EVENT_ENGINE_GROUP = 'event_engine'
PECAN_GROUP = 'pecan'
COORDINATION_GROUP = 'coordination'
@ -354,6 +380,7 @@ CONF.register_opt(expiration_token_duration)
CONF.register_opts(api_opts, group=API_GROUP)
CONF.register_opts(engine_opts, group=ENGINE_GROUP)
CONF.register_opts(executor_opts, group=EXECUTOR_GROUP)
CONF.register_opts(scheduler_opts, group=SCHEDULER_GROUP)
CONF.register_opts(
execution_expiration_policy_opts,
group=EXECUTION_EXPIRATION_POLICY_GROUP

View File

@ -16,11 +16,12 @@
import copy
import datetime
import eventlet
import random
import threading
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import periodic_task
from oslo_service import threadgroup
from oslo_utils import importutils
from mistral import context
@ -33,8 +34,8 @@ LOG = logging.getLogger(__name__)
CONF = cfg.CONF
# {scheduler_instance: thread_group}
_schedulers = {}
# All schedulers.
_schedulers = set()
def schedule_call(factory_method_path, target_method_name,
@ -99,10 +100,41 @@ def schedule_call(factory_method_path, target_method_name,
db_api.create_delayed_call(values)
class CallScheduler(periodic_task.PeriodicTasks):
# TODO(rakhmerov): Think how to make 'spacing' configurable.
@periodic_task.periodic_task(spacing=1, run_immediately=True)
def run_delayed_calls(self, ctx=None):
class Scheduler(object):
def __init__(self):
self._stopped = False
self._thread = threading.Thread(target=self._loop)
self._thread.daemon = True
self._fixed_delay = CONF.scheduler.fixed_delay
self._random_delay = CONF.scheduler.random_delay
def start(self):
self._thread.start()
def stop(self, graceful=False):
self._stopped = True
if graceful:
self._thread.join()
def _loop(self):
while not self._stopped:
LOG.debug("Starting Scheduler loop [scheduler=%s]...", self)
try:
self._process_delayed_calls()
except Exception:
LOG.exception(
"Scheduler failed to process delayed calls"
" due to unexpected exception."
)
eventlet.sleep(
self._fixed_delay +
random.Random().randint(0, self._random_delay * 1000) * 0.001
)
def _process_delayed_calls(self, ctx=None):
"""Run delayed required calls.
This algorithm should work with transactions having at least
@ -155,6 +187,8 @@ class CallScheduler(periodic_task.PeriodicTasks):
if updated_cnt == 1:
result.append(db_call)
LOG.debug("Scheduler captured %s delayed calls.", len(result))
return result
@staticmethod
@ -174,7 +208,7 @@ class CallScheduler(periodic_task.PeriodicTasks):
for call in raw_calls:
LOG.debug(
'Processing next delayed call. '
'Preparing next delayed call. '
'[ID=%s, factory_method_path=%s, target_method_name=%s, '
'method_arguments=%s]', call.id, call.factory_method_path,
call.target_method_name, call.method_arguments
@ -253,37 +287,30 @@ class CallScheduler(periodic_task.PeriodicTasks):
"exception=%s]", call, e
)
LOG.debug("Scheduler deleted %s delayed calls.", len(db_calls))
def start():
tg = threadgroup.ThreadGroup()
sched = Scheduler()
sched = CallScheduler(CONF)
_schedulers.add(sched)
tg.add_dynamic_timer(
sched.run_periodic_tasks,
initial_delay=None,
periodic_interval_max=1,
context=None
)
_schedulers[sched] = tg
sched.start()
return sched
def stop_scheduler(sched, graceful=False):
if sched:
tg = _schedulers[sched]
if not sched:
return
tg.stop()
sched.stop(graceful)
del _schedulers[sched]
if graceful:
tg.wait()
_schedulers.remove(sched)
def stop_all_schedulers():
for scheduler, tg in _schedulers.items():
tg.stop()
del _schedulers[scheduler]
for sched in _schedulers:
sched.stop(graceful=True)
_schedulers.clear()