separate launching audit scheduler

Now there are only one scheduler for launching audit task and
executing audit jobs. We have found an exception where the scheduler
stops for some reason when executing audit.
In order to keep launching audit task normal, we need to split into
two schedulers.

Change-Id: I45dccaf062290cfc7d7fcfc27fe11d6f87f38afa
This commit is contained in:
licanwei
2019-04-19 14:30:09 +08:00
parent f52716fcf9
commit 0def7b4d38
2 changed files with 22 additions and 10 deletions

View File

@@ -21,7 +21,6 @@
import datetime import datetime
from dateutil import tz from dateutil import tz
from apscheduler.jobstores import memory
from croniter import croniter from croniter import croniter
from watcher.common import context from watcher.common import context
@@ -40,21 +39,29 @@ CONF = conf.CONF
class ContinuousAuditHandler(base.AuditHandler): class ContinuousAuditHandler(base.AuditHandler):
def __init__(self): def __init__(self):
super(ContinuousAuditHandler, self).__init__() super(ContinuousAuditHandler, self).__init__()
self._scheduler = None # scheduler for executing audits
self._audit_scheduler = None
# scheduler for a periodic task to launch audit
self._period_scheduler = None
self.context_show_deleted = context.RequestContext(is_admin=True, self.context_show_deleted = context.RequestContext(is_admin=True,
show_deleted=True) show_deleted=True)
@property @property
def scheduler(self): def scheduler(self):
if self._scheduler is None: if self._audit_scheduler is None:
self._scheduler = scheduling.BackgroundSchedulerService( self._audit_scheduler = scheduling.BackgroundSchedulerService(
jobstores={ jobstores={
'default': job_store.WatcherJobStore( 'default': job_store.WatcherJobStore(
engine=sq_api.get_engine()), engine=sq_api.get_engine()),
'memory': memory.MemoryJobStore()
} }
) )
return self._scheduler return self._audit_scheduler
@property
def period_scheduler(self):
if self._period_scheduler is None:
self._period_scheduler = scheduling.BackgroundSchedulerService()
return self._period_scheduler
def _is_audit_inactive(self, audit): def _is_audit_inactive(self, audit):
audit = objects.Audit.get_by_uuid( audit = objects.Audit.get_by_uuid(
@@ -135,6 +142,10 @@ class ContinuousAuditHandler(base.AuditHandler):
return False return False
def launch_audits_periodically(self): def launch_audits_periodically(self):
# if audit scheduler stop, restart it
if not self.scheduler.running:
self.scheduler.start()
audit_context = context.RequestContext(is_admin=True) audit_context = context.RequestContext(is_admin=True)
audit_filters = { audit_filters = {
'audit_type': objects.audit.AuditType.CONTINUOUS.value, 'audit_type': objects.audit.AuditType.CONTINUOUS.value,
@@ -207,10 +218,11 @@ class ContinuousAuditHandler(base.AuditHandler):
audit.save() audit.save()
def start(self): def start(self):
self.scheduler.add_job( self.period_scheduler.add_job(
self.launch_audits_periodically, self.launch_audits_periodically,
'interval', 'interval',
seconds=CONF.watcher_decision_engine.continuous_audit_interval, seconds=CONF.watcher_decision_engine.continuous_audit_interval,
next_run_time=datetime.datetime.now(), next_run_time=datetime.datetime.now())
jobstore='memory') self.period_scheduler.start()
# audit scheduler start
self.scheduler.start() self.scheduler.start()

View File

@@ -462,7 +462,7 @@ class TestContinuousAuditHandler(base.DbTestCase):
def test_is_audit_inactive(self, mock_jobs): def test_is_audit_inactive(self, mock_jobs):
audit_handler = continuous.ContinuousAuditHandler() audit_handler = continuous.ContinuousAuditHandler()
mock_jobs.return_value = mock.MagicMock() mock_jobs.return_value = mock.MagicMock()
audit_handler._scheduler = mock.MagicMock() audit_handler._audit_scheduler = mock.MagicMock()
ap_jobs = [job.Job(mock.MagicMock(), name='execute_audit', ap_jobs = [job.Job(mock.MagicMock(), name='execute_audit',
func=audit_handler.execute_audit, func=audit_handler.execute_audit,