From 81ce132e900a1b00f9acdfbfab4c9aa147095ec9 Mon Sep 17 00:00:00 2001 From: Tim Simmons Date: Fri, 12 Feb 2016 23:24:05 +0000 Subject: [PATCH] Worker Model - More information about the actual worker code can be found in `designate/worker/README.md` and in the inline docstrings - Stand up a `designate-worker` process with an rpcapi, all the usual jazz - Implement a base `Task` class that defines the behavior of a task and exposes resources to the task. - Implement CUD Zone tasks, which includes Tasks that poll for zones, send Notifies, and update status. These are all done in parallel with threads using a shared threadpool, rather than iteratively. - Implement a `recover_shard` task that serves the function of a periodic recovery, but only for a shard. Call that task with various shards from the zone manager. - Put some shims in central and mdns so that the worker can be switched on/off with a few config values. - Changes Zone Manager -> Producer - Removes zm rpcapi - Adds startable designate-producer service - Makes zone-manager an alias for producer service with a warning log - Lots of renaming - Moves zone export to worker - API now uses central_api.export_zone to get zonefiles - Central uses worker_api.start_zone_export to init exports - Now including unit tests - Temporary workarounds for upgrade/migration move the logic into central if worker isn't available. - Deprecates Pool manager polling options and adds warning msg on starting designate-pool-manager - Get some devstack going - Changes powerdns backend to get new sqlalchemy sessions for each action - Sets the default number of threads in a worker process to 200, this is pretty much a shot in the dark, but 1000 seemed like too many, and 20 wasn't enough. - Grenade upgrade testing - Deprecation warnings for zone/pool mgr The way to run this is simple, just stop `designate-pool-manager` and `designate-zone-manager`, toggle the config settings in the `service:worker` section: enabled = true, notify = true and start `designate-worker` and `designate-producer` and you should be good to go. Change-Id: I259e9825d3a4eea58e082303ba3bdbdb7bf8c363 --- designate/__init__.py | 3 +- designate/api/v2/controllers/rest.py | 5 - .../api/v2/controllers/zones/tasks/exports.py | 4 +- designate/backend/impl_powerdns/__init__.py | 61 +- designate/central/service.py | 70 +- designate/cmd/pool_manager.py | 16 + designate/cmd/producer.py | 53 ++ designate/cmd/worker.py | 53 ++ designate/cmd/zone_manager.py | 24 +- designate/mdns/rpcapi.py | 5 + designate/objects/pool_target.py | 1 + designate/pool_manager/__init__.py | 20 +- designate/pool_manager/service.py | 6 +- designate/producer/__init__.py | 69 ++ designate/producer/service.py | 97 +++ designate/{zone_manager => producer}/tasks.py | 64 +- designate/tests/test_backend/test_powerdns.py | 52 +- .../__init__.py | 0 .../test_service.py | 4 +- .../test_tasks.py | 38 +- .../__init__.py | 0 .../tests/test_workers/test_base_task.py | 30 + .../tests/test_workers/test_processing.py | 46 ++ designate/tests/test_workers/test_service.py | 99 +++ .../tests/test_workers/test_zone_tasks.py | 574 +++++++++++++++++ .../tests/unit/test_central/test_basic.py | 8 +- .../tests/unit/test_producer/__init__.py | 0 .../tests/unit/test_producer/test_service.py | 60 ++ .../test_tasks.py | 10 +- .../unit/test_zone_manager/test_service.py | 101 --- designate/utils.py | 9 +- designate/worker/README.md | 206 ++++++ designate/worker/__init__.py | 60 ++ designate/worker/processing.py | 79 +++ designate/{zone_manager => worker}/rpcapi.py | 51 +- designate/worker/service.py | 172 +++++ designate/worker/tasks/__init__.py | 0 designate/worker/tasks/base.py | 127 ++++ designate/worker/tasks/zone.py | 609 ++++++++++++++++++ designate/worker/utils.py | 82 +++ designate/zone_manager/__init__.py | 37 -- designate/zone_manager/service.py | 154 ----- devstack/plugin.sh | 13 + devstack/settings | 2 + etc/designate/designate.conf.sample | 49 +- setup.cfg | 13 +- tests-py3.txt | 2 +- 47 files changed, 2774 insertions(+), 464 deletions(-) create mode 100644 designate/cmd/producer.py create mode 100644 designate/cmd/worker.py create mode 100644 designate/producer/__init__.py create mode 100644 designate/producer/service.py rename designate/{zone_manager => producer}/tasks.py (83%) rename designate/tests/{test_zone_manager => test_producer}/__init__.py (100%) rename designate/tests/{test_zone_manager => test_producer}/test_service.py (90%) rename designate/tests/{test_zone_manager => test_producer}/test_tasks.py (81%) rename designate/tests/{unit/test_zone_manager => test_workers}/__init__.py (100%) create mode 100644 designate/tests/test_workers/test_base_task.py create mode 100644 designate/tests/test_workers/test_processing.py create mode 100644 designate/tests/test_workers/test_service.py create mode 100644 designate/tests/test_workers/test_zone_tasks.py create mode 100644 designate/tests/unit/test_producer/__init__.py create mode 100644 designate/tests/unit/test_producer/test_service.py rename designate/tests/unit/{test_zone_manager => test_producer}/test_tasks.py (96%) delete mode 100644 designate/tests/unit/test_zone_manager/test_service.py create mode 100644 designate/worker/README.md create mode 100644 designate/worker/__init__.py create mode 100644 designate/worker/processing.py rename designate/{zone_manager => worker}/rpcapi.py (61%) create mode 100644 designate/worker/service.py create mode 100644 designate/worker/tasks/__init__.py create mode 100644 designate/worker/tasks/base.py create mode 100644 designate/worker/tasks/zone.py create mode 100644 designate/worker/utils.py delete mode 100644 designate/zone_manager/__init__.py delete mode 100644 designate/zone_manager/service.py diff --git a/designate/__init__.py b/designate/__init__.py index fe01ccb7f..4f9434e01 100644 --- a/designate/__init__.py +++ b/designate/__init__.py @@ -47,8 +47,7 @@ cfg.CONF.register_opts([ cfg.StrOpt('mdns-topic', default='mdns', help='mDNS Topic'), cfg.StrOpt('pool-manager-topic', default='pool_manager', help='Pool Manager Topic'), - cfg.StrOpt('zone-manager-topic', default='zone_manager', - help='Zone Manager Topic'), + cfg.StrOpt('worker-topic', default='worker', help='Worker Topic'), # Default TTL cfg.IntOpt('default-ttl', default=3600), diff --git a/designate/api/v2/controllers/rest.py b/designate/api/v2/controllers/rest.py index 29e72200d..e3879f979 100644 --- a/designate/api/v2/controllers/rest.py +++ b/designate/api/v2/controllers/rest.py @@ -34,7 +34,6 @@ from oslo_log import log as logging from designate import exceptions from designate.central import rpcapi as central_rpcapi from designate.pool_manager import rpcapi as pool_mgr_rpcapi -from designate.zone_manager import rpcapi as zone_manager_rpcapi from designate.i18n import _ @@ -60,10 +59,6 @@ class RestController(pecan.rest.RestController): def pool_mgr_api(self): return pool_mgr_rpcapi.PoolManagerAPI.get_instance() - @property - def zone_manager_api(self): - return zone_manager_rpcapi.ZoneManagerAPI.get_instance() - def _apply_filter_params(self, params, accepted_filters, criterion): invalid=[] for k in params: diff --git a/designate/api/v2/controllers/zones/tasks/exports.py b/designate/api/v2/controllers/zones/tasks/exports.py index 697cc20ae..b48fa25fb 100644 --- a/designate/api/v2/controllers/zones/tasks/exports.py +++ b/designate/api/v2/controllers/zones/tasks/exports.py @@ -39,8 +39,8 @@ class ZoneExportController(rest.RestController): export = self.central_api.get_zone_export(context, export_id) if export.location and export.location.startswith('designate://'): - return self.zone_manager_api.\ - render_zone(context, export['zone_id']) + return self.central_api.\ + export_zone(context, export['zone_id']) else: msg = 'Zone can not be exported synchronously' raise exceptions.BadRequest(msg) diff --git a/designate/backend/impl_powerdns/__init__.py b/designate/backend/impl_powerdns/__init__.py index 6266b6b63..41ca3f650 100644 --- a/designate/backend/impl_powerdns/__init__.py +++ b/designate/backend/impl_powerdns/__init__.py @@ -66,38 +66,29 @@ class PowerDNSBackend(base.Backend): self.connection = self.options.get('connection', default_connection) - @property - def session(self): - # NOTE: This uses a thread local store, allowing each greenthread to - # have it's own session stored correctly. Without this, each - # greenthread may end up using a single global session, which - # leads to bad things happening. - if not hasattr(self.local_store, 'session'): - self.local_store.session = session.get_session( - self.name, self.connection, self.target.id) + def get_session(self): + return session.get_session(self.name, self.connection, self.target.id) - return self.local_store.session - - def _create(self, table, values): + def _create(self, sess, table, values): query = table.insert() - resultproxy = self.session.execute(query, values) + resultproxy = sess.execute(query, values) # Refetch the row, for generated columns etc query = select([table])\ .where(table.c.id == resultproxy.inserted_primary_key[0]) - resultproxy = self.session.execute(query) + resultproxy = sess.execute(query) return _map_col(query.columns.keys(), resultproxy.fetchone()) - def _get(self, table, id_, exc_notfound, id_col=None): + def _get(self, sess, table, id_, exc_notfound, id_col=None): if id_col is None: id_col = table.c.id query = select([table])\ .where(id_col == id_) - resultproxy = self.session.execute(query) + resultproxy = sess.execute(query) results = resultproxy.fetchall() @@ -107,22 +98,25 @@ class PowerDNSBackend(base.Backend): # Map col keys to values in result return _map_col(query.columns.keys(), results[0]) - def _delete(self, table, id_, exc_notfound, id_col=None): + def _delete(self, sess, table, id_, exc_notfound, id_col=None): if id_col is None: id_col = table.c.id query = table.delete()\ .where(id_col == id_) - resultproxy = self.session.execute(query) + resultproxy = sess.execute(query) if resultproxy.rowcount != 1: raise exc_notfound() # Zone Methods def create_zone(self, context, zone): + # Get a new session + sess = self.get_session() + try: - self.session.begin() + sess.begin() def _parse_master(master): return '%s:%d' % (master.host, master.port) @@ -136,7 +130,7 @@ class PowerDNSBackend(base.Backend): 'account': context.tenant } - self._create(tables.domains, domain_values) + self._create(sess, tables.domains, domain_values) except DBDuplicateEntry: LOG.debug('Successful create of %s in pdns, zone already exists' % zone['name']) @@ -144,20 +138,28 @@ class PowerDNSBackend(base.Backend): pass except Exception: with excutils.save_and_reraise_exception(): - self.session.rollback() + sess.rollback() else: - self.session.commit() + sess.commit() self.mdns_api.notify_zone_changed( context, zone, self.host, self.port, self.timeout, self.retry_interval, self.max_retries, self.delay) def delete_zone(self, context, zone): - # TODO(kiall): We should make this match create_zone with regard to - # transactions. + # Get a new session + sess = self.get_session() + try: - self._get(tables.domains, zone['id'], exceptions.ZoneNotFound, + sess.begin() + + self._get(sess, tables.domains, zone['id'], + exceptions.ZoneNotFound, id_col=tables.domains.c.designate_id) + + self._delete(sess, tables.domains, zone['id'], + exceptions.ZoneNotFound, + id_col=tables.domains.c.designate_id) except exceptions.ZoneNotFound: # If the Zone is already gone, that's ok. We're deleting it # anyway, so just log and continue. @@ -165,7 +167,8 @@ class PowerDNSBackend(base.Backend): 'not present in the backend. ID: %s') % zone['id']) return - - self._delete(tables.domains, zone['id'], - exceptions.ZoneNotFound, - id_col=tables.domains.c.designate_id) + except Exception: + with excutils.save_and_reraise_exception(): + sess.rollback() + else: + sess.commit() diff --git a/designate/central/service.py b/designate/central/service.py index 3186f8865..013bb59eb 100644 --- a/designate/central/service.py +++ b/designate/central/service.py @@ -52,7 +52,7 @@ from designate import storage from designate.mdns import rpcapi as mdns_rpcapi from designate.pool_manager import rpcapi as pool_manager_rpcapi from designate.storage import transaction -from designate.zone_manager import rpcapi as zone_manager_rpcapi +from designate.worker import rpcapi as worker_rpcapi LOG = logging.getLogger(__name__) @@ -253,8 +253,15 @@ class Service(service.RPCService, service.Service): return pool_manager_rpcapi.PoolManagerAPI.get_instance() @property - def zone_manager_api(self): - return zone_manager_rpcapi.ZoneManagerAPI.get_instance() + def worker_api(self): + return worker_rpcapi.WorkerAPI.get_instance() + + @property + def zone_api(self): + # TODO(timsim): Remove this when pool_manager_api is gone + if cfg.CONF['service:worker'].enabled: + return self.worker_api + return self.pool_manager_api def _is_valid_zone_name(self, context, zone_name): # Validate zone name length @@ -898,7 +905,7 @@ class Service(service.RPCService, service.Service): zone = self._create_zone_in_storage(context, zone) - self.pool_manager_api.create_zone(context, zone) + self.zone_api.create_zone(context, zone) if zone.type == 'SECONDARY': self.mdns_api.perform_zone_xfr(context, zone) @@ -1038,7 +1045,7 @@ class Service(service.RPCService, service.Service): if 'masters' in changes: self.mdns_api.perform_zone_xfr(context, zone) - self.pool_manager_api.update_zone(context, zone) + self.zone_api.update_zone(context, zone) return zone @@ -1093,7 +1100,7 @@ class Service(service.RPCService, service.Service): zone = self.storage.delete_zone(context, zone.id) else: zone = self._delete_zone_in_storage(context, zone) - self.pool_manager_api.delete_zone(context, zone) + self.zone_api.delete_zone(context, zone) return zone @@ -1208,7 +1215,7 @@ class Service(service.RPCService, service.Service): self._touch_zone_in_storage(context, zone) - self.pool_manager_api.update_zone(context, zone) + self.zone_api.update_zone(context, zone) return zone @@ -1243,7 +1250,7 @@ class Service(service.RPCService, service.Service): recordset, zone = self._create_recordset_in_storage( context, zone, recordset, increment_serial=increment_serial) - self.pool_manager_api.update_zone(context, zone) + self.zone_api.update_zone(context, zone) recordset.zone_name = zone.name recordset.obj_reset_changes(['zone_name']) @@ -1405,7 +1412,7 @@ class Service(service.RPCService, service.Service): recordset, zone = self._update_recordset_in_storage( context, zone, recordset, increment_serial=increment_serial) - self.pool_manager_api.update_zone(context, zone) + self.zone_api.update_zone(context, zone) return recordset @@ -1468,7 +1475,7 @@ class Service(service.RPCService, service.Service): recordset, zone = self._delete_recordset_in_storage( context, zone, recordset, increment_serial=increment_serial) - self.pool_manager_api.update_zone(context, zone) + self.zone_api.update_zone(context, zone) recordset.zone_name = zone.name recordset.obj_reset_changes(['zone_name']) @@ -1536,7 +1543,7 @@ class Service(service.RPCService, service.Service): context, zone, recordset, record, increment_serial=increment_serial) - self.pool_manager_api.update_zone(context, zone) + self.zone_api.update_zone(context, zone) return record @@ -1647,7 +1654,7 @@ class Service(service.RPCService, service.Service): record, zone = self._update_record_in_storage( context, zone, record, increment_serial=increment_serial) - self.pool_manager_api.update_zone(context, zone) + self.zone_api.update_zone(context, zone) return record @@ -1708,7 +1715,7 @@ class Service(service.RPCService, service.Service): record, zone = self._delete_record_in_storage( context, zone, record, increment_serial=increment_serial) - self.pool_manager_api.update_zone(context, zone) + self.zone_api.update_zone(context, zone) return record @@ -1786,7 +1793,7 @@ class Service(service.RPCService, service.Service): policy.check('diagnostics_sync_record', context, target) - self.pool_manager_api.update_zone(context, zone) + self.zone_api.update_zone(context, zone) def ping(self, context): policy.check('diagnostics_ping', context) @@ -2769,9 +2776,40 @@ class Service(service.RPCService, service.Service): created_zone_export = self.storage.create_zone_export(context, zone_export) + if not cfg.CONF['service:worker'].enabled: + # So that we can maintain asynch behavior during the time that this + # lives in central, we'll return the 'PENDING' object, and then the + # 'COMPLETE'/'ERROR' status will be available on the first poll. + export = copy.deepcopy(created_zone_export) - self.zone_manager_api.start_zone_export(context, zone, - created_zone_export) + synchronous = cfg.CONF['service:zone_manager'].export_synchronous + criterion = {'zone_id': zone_id} + count = self.storage.count_recordsets(context, criterion) + + if synchronous: + try: + self.quota.limit_check( + context, context.tenant, api_export_size=count) + except exceptions.OverQuota: + LOG.debug('Zone Export too large to perform synchronously') + export.status = 'ERROR' + export.message = 'Zone is too large to export' + return export + + export.location = \ + "designate://v2/zones/tasks/exports/%(eid)s/export" % \ + {'eid': export.id} + + export.status = 'COMPLETE' + else: + LOG.debug('No method found to export zone') + export.status = 'ERROR' + export.message = 'No suitable method for export' + + self.update_zone_export(context, export) + else: + export = copy.deepcopy(created_zone_export) + self.worker_api.start_zone_export(context, zone, export) return created_zone_export diff --git a/designate/cmd/pool_manager.py b/designate/cmd/pool_manager.py index 7601abdf9..26e0e4e73 100644 --- a/designate/cmd/pool_manager.py +++ b/designate/cmd/pool_manager.py @@ -19,6 +19,8 @@ from oslo_config import cfg from oslo_log import log as logging from oslo_reports import guru_meditation_report as gmr +from designate.i18n import _LE +from designate.i18n import _LW from designate import service from designate import utils from designate import version @@ -26,6 +28,7 @@ from designate import hookpoints from designate.pool_manager import service as pool_manager_service +LOG = logging.getLogger(__name__) CONF = cfg.CONF CONF.import_opt('workers', 'designate.pool_manager', group='service:pool_manager') @@ -39,6 +42,19 @@ def main(): logging.setup(CONF, 'designate') gmr.TextGuruMeditation.setup_autorun(version) + # NOTE(timsim): This is to ensure people don't start the wrong + # services when the worker model is enabled. + if cfg.CONF['service:worker'].enabled: + LOG.error(_LE('You have designate-worker enabled, starting ' + 'designate-pool-manager is incompatible with ' + 'designate-worker. You need to start ' + 'designate-worker instead.')) + sys.exit(1) + + LOG.warning(_LW('designate-pool-manager is DEPRECATED in favor of ' + 'designate-worker and will be removed during the Ocata ' + 'cycle')) + server = pool_manager_service.Service( threads=CONF['service:pool_manager'].threads ) diff --git a/designate/cmd/producer.py b/designate/cmd/producer.py new file mode 100644 index 000000000..0c535c3b4 --- /dev/null +++ b/designate/cmd/producer.py @@ -0,0 +1,53 @@ +# Copyright 2016 Rackspace Inc. +# +# Author: Tim Simmons +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import sys + +from oslo_config import cfg +from oslo_log import log as logging +from oslo_reports import guru_meditation_report as gmr + +from designate.i18n import _LE +from designate import hookpoints +from designate import service +from designate import utils +from designate import version +from designate.producer import service as producer_service + + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF +CONF.import_opt('workers', 'designate.producer', group='service:producer') +CONF.import_opt('threads', 'designate.producer', group='service:producer') + + +def main(): + utils.read_config('designate', sys.argv) + logging.setup(CONF, 'designate') + gmr.TextGuruMeditation.setup_autorun(version) + + # NOTE(timsim): This is to ensure people don't start the wrong + # services when the worker model is enabled. + if not cfg.CONF['service:worker'].enabled: + LOG.error(_LE('You do not have designate-worker enabled, starting ' + 'designate-producer is not allowed. ' + 'You need to start designate-zone-manager instead.')) + sys.exit(1) + + hookpoints.log_hook_setup() + + server = producer_service.Service(threads=CONF['service:producer'].threads) + service.serve(server, workers=CONF['service:producer'].workers) + service.wait() diff --git a/designate/cmd/worker.py b/designate/cmd/worker.py new file mode 100644 index 000000000..186eb822a --- /dev/null +++ b/designate/cmd/worker.py @@ -0,0 +1,53 @@ +# Copyright 2016 Rackspace Inc. +# +# Author: Tim Simmons +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import sys + +from oslo_config import cfg +from oslo_log import log as logging +from oslo_reports import guru_meditation_report as gmr + +from designate.i18n import _LE +from designate import hookpoints +from designate import service +from designate import utils +from designate import version +from designate.worker import service as worker_service + + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF +CONF.import_opt('workers', 'designate.worker', group='service:worker') +CONF.import_opt('threads', 'designate.worker', group='service:worker') + + +def main(): + utils.read_config('designate', sys.argv) + logging.setup(CONF, 'designate') + gmr.TextGuruMeditation.setup_autorun(version) + + # NOTE(timsim): This is to ensure people don't start the wrong + # services when the worker model is enabled. + if not cfg.CONF['service:worker'].enabled: + LOG.error(_LE('You do not have designate-worker enabled, starting ' + 'designate-worker is not allowed. ' + 'You need to start designate-pool-manager instead.')) + sys.exit(1) + + hookpoints.log_hook_setup() + + server = worker_service.Service(threads=CONF['service:worker'].threads) + service.serve(server, workers=CONF['service:worker'].workers) + service.wait() diff --git a/designate/cmd/zone_manager.py b/designate/cmd/zone_manager.py index fa10073a8..fc02f7e1b 100644 --- a/designate/cmd/zone_manager.py +++ b/designate/cmd/zone_manager.py @@ -19,16 +19,19 @@ from oslo_config import cfg from oslo_log import log as logging from oslo_reports import guru_meditation_report as gmr +from designate.i18n import _LE +from designate.i18n import _LW from designate import service from designate import utils from designate import version -from designate.zone_manager import service as zone_manager_service +from designate.producer import service as producer_service +LOG = logging.getLogger(__name__) CONF = cfg.CONF -CONF.import_opt('workers', 'designate.zone_manager', +CONF.import_opt('workers', 'designate.producer', group='service:zone_manager') -CONF.import_opt('threads', 'designate.zone_manager', +CONF.import_opt('threads', 'designate.producer', group='service:zone_manager') @@ -37,7 +40,20 @@ def main(): logging.setup(CONF, 'designate') gmr.TextGuruMeditation.setup_autorun(version) - server = zone_manager_service.Service( + # NOTE(timsim): This is to ensure people don't start the wrong + # services when the worker model is enabled. + if cfg.CONF['service:worker'].enabled: + LOG.error(_LE('You have designate-worker enabled, starting ' + 'designate-zone-manager is incompatible with ' + 'designate-worker. You need to start ' + 'designate-producer instead.')) + sys.exit(1) + + LOG.warning(_LW('designate-zone-manager is DEPRECATED in favor of ' + 'designate-producer, starting designate-producer ' + 'under the zone-manager name')) + + server = producer_service.Service( threads=CONF['service:zone_manager'].threads) service.serve(server, workers=CONF['service:zone_manager'].workers) service.wait() diff --git a/designate/mdns/rpcapi.py b/designate/mdns/rpcapi.py index ded385efa..232d155aa 100644 --- a/designate/mdns/rpcapi.py +++ b/designate/mdns/rpcapi.py @@ -20,6 +20,7 @@ from designate.i18n import _LI from designate import rpc from designate.loggingutils import rpc_logging +CONF = cfg.CONF LOG = logging.getLogger(__name__) MDNS_API = None @@ -77,6 +78,10 @@ class MdnsAPI(object): def notify_zone_changed(self, context, zone, host, port, timeout, retry_interval, max_retries, delay): + if CONF['service:worker'].notify and CONF['service:worker'].enabled: + LOG.debug('Letting worker send NOTIFYs instead') + return True + LOG.info(_LI("notify_zone_changed: Calling mdns for zone '%(zone)s', " "serial '%(serial)s' to nameserver '%(host)s:%(port)s'"), {'zone': zone.name, 'serial': zone.serial, diff --git a/designate/objects/pool_target.py b/designate/objects/pool_target.py index 37c2339d9..828898c57 100644 --- a/designate/objects/pool_target.py +++ b/designate/objects/pool_target.py @@ -48,6 +48,7 @@ class PoolTarget(base.DictObjectMixin, base.PersistentObjectMixin, 'relation': True, 'relation_cls': 'PoolTargetOptionList' }, + 'backend': {} } STRING_KEYS = [ diff --git a/designate/pool_manager/__init__.py b/designate/pool_manager/__init__.py index dc87786b6..bd591f142 100644 --- a/designate/pool_manager/__init__.py +++ b/designate/pool_manager/__init__.py @@ -31,18 +31,28 @@ OPTS = [ 'Pool Manager'), cfg.IntOpt('threshold-percentage', default=100, help='The percentage of servers requiring a successful update ' - 'for a zone change to be considered active'), + 'for a zone change to be considered active', + deprecated_for_removal=True, + deprecated_reason='Migrated to designate-worker'), cfg.IntOpt('poll-timeout', default=30, - help='The time to wait for a response from a server'), + help='The time to wait for a response from a server', + deprecated_for_removal=True, + deprecated_reason='Migrated to designate-worker'), cfg.IntOpt('poll-retry-interval', default=15, help='The time between retrying to send a request and ' - 'waiting for a response from a server'), + 'waiting for a response from a server', + deprecated_for_removal=True, + deprecated_reason='Migrated to designate-worker'), cfg.IntOpt('poll-max-retries', default=10, help='The maximum number of times to retry sending a request ' - 'and wait for a response from a server'), + 'and wait for a response from a server', + deprecated_for_removal=True, + deprecated_reason='Migrated to designate-worker'), cfg.IntOpt('poll-delay', default=5, help='The time to wait before sending the first request ' - 'to a server'), + 'to a server', + deprecated_for_removal=True, + deprecated_reason='Migrated to designate-worker'), cfg.BoolOpt('enable-recovery-timer', default=True, help='The flag for the recovery timer'), cfg.IntOpt('periodic-recovery-interval', default=120, diff --git a/designate/pool_manager/service.py b/designate/pool_manager/service.py index 297de39eb..399c93b26 100644 --- a/designate/pool_manager/service.py +++ b/designate/pool_manager/service.py @@ -113,9 +113,8 @@ class Service(service.RPCService, coordination.CoordinationMixin, CONF['service:pool_manager'].periodic_sync_retry_interval # Compute a time (seconds) by which things should have propagated - self.max_prop_time = (self.timeout * self.max_retries + - self.max_retries * self.retry_interval + - self.delay) + self.max_prop_time = utils.max_prop_time(self.timeout, + self.max_retries, self.retry_interval, self.delay) def _setup_target_backends(self): self.target_backends = {} @@ -147,7 +146,6 @@ class Service(service.RPCService, coordination.CoordinationMixin, return topic def start(self): - # Build the Pool (and related) Object from Config context = DesignateContext.get_admin_context() pool_id = CONF['service:pool_manager'].pool_id diff --git a/designate/producer/__init__.py b/designate/producer/__init__.py new file mode 100644 index 000000000..357df3833 --- /dev/null +++ b/designate/producer/__init__.py @@ -0,0 +1,69 @@ +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Author: Endre Karlson +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from oslo_config import cfg + +CONF = cfg.CONF + +CONF.register_group(cfg.OptGroup( + name='service:producer', title="Configuration for Producer Service" +)) + +OPTS = [ + cfg.IntOpt('workers', + help='Number of Producer worker processes to spawn'), + cfg.IntOpt('threads', default=1000, + help='Number of Producer greenthreads to spawn'), + cfg.ListOpt('enabled_tasks', + help='Enabled tasks to run'), + cfg.StrOpt('storage-driver', default='sqlalchemy', + help='The storage driver to use'), + cfg.BoolOpt('export-synchronous', default=True, + help='Whether to allow synchronous zone exports', + deprecated_for_removal=True, + deprecated_reason='Migrated to designate-worker'), +] + +CONF.register_opts(OPTS, group='service:producer') + +# TODO(timsim): Remove these when zone-manager is removed +CONF.register_group(cfg.OptGroup( + name='service:zone_manager', title="Configuration for Zone Manager Service" +)) + +ZONEMGROPTS = [ + cfg.IntOpt('workers', + help='Number of Zone Manager worker processes to spawn', + deprecated_for_removal=True, + deprecated_reason='Migrated to designate-worker'), + cfg.IntOpt('threads', default=1000, + help='Number of Zone Manager greenthreads to spawn', + deprecated_for_removal=True, + deprecated_reason='Migrated to designate-worker'), + cfg.ListOpt('enabled_tasks', + help='Enabled tasks to run', + deprecated_for_removal=True, + deprecated_reason='Migrated to designate-worker'), + cfg.StrOpt('storage-driver', default='sqlalchemy', + help='The storage driver to use', + deprecated_for_removal=True, + deprecated_reason='Migrated to designate-worker'), + cfg.BoolOpt('export-synchronous', default=True, + help='Whether to allow synchronous zone exports', + deprecated_for_removal=True, + deprecated_reason='Migrated to designate-worker'), +] + +CONF.register_opts(ZONEMGROPTS, group='service:zone_manager') diff --git a/designate/producer/service.py b/designate/producer/service.py new file mode 100644 index 000000000..84b0874e4 --- /dev/null +++ b/designate/producer/service.py @@ -0,0 +1,97 @@ +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Author: Endre Karlson +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from oslo_config import cfg +from oslo_log import log as logging +import oslo_messaging as messaging + +from designate.i18n import _LI +from designate import coordination +from designate import quota +from designate import service +from designate import storage +from designate.central import rpcapi +from designate.producer import tasks + + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + +NS = 'designate.periodic_tasks' + + +class Service(service.RPCService, coordination.CoordinationMixin, + service.Service): + RPC_API_VERSION = '1.0' + + target = messaging.Target(version=RPC_API_VERSION) + + @property + def storage(self): + if not hasattr(self, '_storage'): + # TODO(timsim): Remove this when zone_mgr goes away + storage_driver = cfg.CONF['service:zone_manager'].storage_driver + if cfg.CONF['service:producer'].storage_driver != storage_driver: + storage_driver = cfg.CONF['service:producer'].storage_driver + self._storage = storage.get_storage(storage_driver) + return self._storage + + @property + def quota(self): + if not hasattr(self, '_quota'): + # Get a quota manager instance + self._quota = quota.get_quota() + return self._quota + + @property + def service_name(self): + return 'producer' + + @property + def central_api(self): + return rpcapi.CentralAPI.get_instance() + + def start(self): + super(Service, self).start() + + self._partitioner = coordination.Partitioner( + self._coordinator, self.service_name, self._coordination_id, + range(0, 4095)) + + self._partitioner.start() + self._partitioner.watch_partition_change(self._rebalance) + + # TODO(timsim): Remove this when zone_mgr goes away + zmgr_enabled_tasks = CONF['service:zone_manager'].enabled_tasks + producer_enabled_tasks = CONF['service:producer'].enabled_tasks + enabled = zmgr_enabled_tasks + if producer_enabled_tasks != []: + enabled = producer_enabled_tasks + + for task in tasks.PeriodicTask.get_extensions(enabled): + LOG.debug("Registering task %s", task) + + # Instantiate the task + task = task() + + # Subscribe for partition size updates. + self._partitioner.watch_partition_change(task.on_partition_change) + + interval = CONF[task.get_canonical_name()].interval + self.tg.add_timer(interval, task) + + def _rebalance(self, my_partitions, members, event): + LOG.info(_LI("Received rebalance event %s"), event) + self.partition_range = my_partitions diff --git a/designate/zone_manager/tasks.py b/designate/producer/tasks.py similarity index 83% rename from designate/zone_manager/tasks.py rename to designate/producer/tasks.py index 0e4040ecb..b75f1cfba 100644 --- a/designate/zone_manager/tasks.py +++ b/designate/producer/tasks.py @@ -19,8 +19,9 @@ from designate import context from designate import plugin from designate import rpc from designate.central import rpcapi +from designate.worker import rpcapi as worker_rpcapi +from designate.pool_manager import rpcapi as pool_manager_rpcapi from designate.i18n import _LI -from designate.pool_manager.rpcapi import PoolManagerAPI from oslo_config import cfg from oslo_log import log as logging @@ -30,10 +31,10 @@ LOG = logging.getLogger(__name__) class PeriodicTask(plugin.ExtensionPlugin): - """Abstract Zone Manager periodic task + """Abstract Producer periodic task """ - __plugin_ns__ = 'designate.zone_manager_tasks' - __plugin_type__ = 'zone_manager_task' + __plugin_ns__ = 'designate.producer_tasks' + __plugin_type__ = 'producer_task' __interval__ = None def __init__(self): @@ -56,6 +57,21 @@ class PeriodicTask(plugin.ExtensionPlugin): def central_api(self): return rpcapi.CentralAPI.get_instance() + @property + def worker_api(self): + return worker_rpcapi.WorkerAPI.get_instance() + + @property + def pool_manager_api(self): + return pool_manager_rpcapi.PoolManagerAPI.get_instance() + + @property + def zone_api(self): + # TODO(timsim): Remove this when pool_manager_api is gone + if cfg.CONF['service:worker'].enabled: + return self.worker_api + return self.pool_manager_api + def on_partition_change(self, my_partitions, members, event): """Refresh partitions attribute """ @@ -153,7 +169,7 @@ class PeriodicExistsTask(PeriodicTask): def __init__(self): super(PeriodicExistsTask, self).__init__() - self.notifier = rpc.get_notifier('zone_manager') + self.notifier = rpc.get_notifier('producer') @classmethod def get_cfg_opts(cls): @@ -211,7 +227,7 @@ class PeriodicSecondaryRefreshTask(PeriodicTask): def __call__(self): pstart, pend = self._my_range() - msg = _LI("Refreshing zones between for %(start)s to %(end)s") + msg = _LI("Refreshing zones for shards %(start)s to %(end)s") LOG.info(msg, {"start": pstart, "end": pend}) ctxt = context.DesignateContext.get_admin_context() @@ -269,7 +285,7 @@ class PeriodicGenerateDelayedNotifyTask(PeriodicTask): def __call__(self): """Fetch a list of zones with the delayed_notify flag set up to "batch_size" - Call Pool Manager to emit NOTIFY transactions, + Call Worker to emit NOTIFY transactions, Reset the flag. """ pstart, pend = self._my_range() @@ -293,8 +309,38 @@ class PeriodicGenerateDelayedNotifyTask(PeriodicTask): msg = _LI("Performing delayed NOTIFY for %(start)s to %(end)s: %(n)d") LOG.debug(msg % dict(start=pstart, end=pend, n=len(zones))) - pm_api = PoolManagerAPI.get_instance() for z in zones: - pm_api.update_zone(ctxt, z) + self.zone_api.update_zone(ctxt, z) z.delayed_notify = False self.central_api.update_zone(ctxt, z) + + +class WorkerPeriodicRecovery(PeriodicTask): + __plugin_name__ = 'worker_periodic_recovery' + __interval__ = 120 + + @classmethod + def get_cfg_opts(cls): + group = cfg.OptGroup(cls.get_canonical_name()) + options = cls.get_base_opts() + [ + cfg.IntOpt( + 'interval', + default=cls.__interval__, + help='Run interval in seconds' + ), + ] + return [(group, options)] + + def __call__(self): + # TODO(timsim): Remove this when worker is always on + if not cfg.CONF['service:worker'].enabled: + return + + pstart, pend = self._my_range() + msg = _LI("Recovering zones for shards %(start)s to %(end)s") + LOG.info(msg, {"start": pstart, "end": pend}) + + ctxt = context.DesignateContext.get_admin_context() + ctxt.all_tenants = True + + self.worker_api.recover_shard(ctxt, pstart, pend) diff --git a/designate/tests/test_backend/test_powerdns.py b/designate/tests/test_backend/test_powerdns.py index 6b39eabc3..2ef752047 100644 --- a/designate/tests/test_backend/test_powerdns.py +++ b/designate/tests/test_backend/test_powerdns.py @@ -52,10 +52,12 @@ class PowerDNSBackendTestCase(BackendTestCase): self.assertEqual(commit, session_mock.commit.call_count) self.assertEqual(rollback, session_mock.rollback.call_count) - # Tests for Public Methpds - @mock.patch.object(impl_powerdns.PowerDNSBackend, 'session', - new_callable=mock.MagicMock) - def test_create_zone(self, session_mock): + # Tests for Public Methods + @mock.patch.object(impl_powerdns.PowerDNSBackend, 'get_session') + def test_create_zone(self, get_session_mock): + session_mock = mock.MagicMock() + get_session_mock.return_value = session_mock + context = self.get_context() self.backend.create_zone(context, self.zone) @@ -80,11 +82,14 @@ class PowerDNSBackendTestCase(BackendTestCase): session_mock.execute.call_args_list[1][0][0], sqlalchemy.sql.selectable.Select) - @mock.patch.object(impl_powerdns.PowerDNSBackend, 'session', - new_callable=mock.Mock) + @mock.patch.object(impl_powerdns.PowerDNSBackend, 'get_session') @mock.patch.object(impl_powerdns.PowerDNSBackend, '_create', side_effect=Exception) - def test_create_zone_failure_on_create(self, create_mock, session_mock): + def test_create_zone_failure_on_create(self, create_mock, + get_session_mock): + session_mock = mock.MagicMock() + get_session_mock.return_value = session_mock + with testtools.ExpectedException(Exception): self.backend.create_zone(self.get_context(), self.zone) @@ -94,11 +99,14 @@ class PowerDNSBackendTestCase(BackendTestCase): # Ensure we called out into the _create method exactly once self.assertEqual(1, create_mock.call_count) - @mock.patch.object(impl_powerdns.PowerDNSBackend, 'session', - new_callable=mock.Mock) + @mock.patch.object(impl_powerdns.PowerDNSBackend, 'get_session') @mock.patch.object(impl_powerdns.PowerDNSBackend, '_create', return_value=None) - def test_create_zone_failure_on_commit(self, create_mock, session_mock): + def test_create_zone_failure_on_commit(self, create_mock, + get_session_mock): + session_mock = mock.MagicMock() + get_session_mock.return_value = session_mock + # Configure the Session mocks's commit method to raise an exception session_mock.commit.side_effect = Exception @@ -111,11 +119,13 @@ class PowerDNSBackendTestCase(BackendTestCase): # Ensure we called out into the _create method exactly once self.assertEqual(1, create_mock.call_count) - @mock.patch.object(impl_powerdns.PowerDNSBackend, 'session', - new_callable=mock.Mock) + @mock.patch.object(impl_powerdns.PowerDNSBackend, 'get_session') @mock.patch.object(impl_powerdns.PowerDNSBackend, '_get', return_value=None) - def test_delete_zone(self, get_mock, session_mock): + def test_delete_zone(self, get_mock, get_session_mock): + session_mock = mock.MagicMock() + get_session_mock.return_value = session_mock + # Configure the Session mocks's execute method to return a fudged # resultproxy. rp_mock = mock.Mock() @@ -128,8 +138,8 @@ class PowerDNSBackendTestCase(BackendTestCase): # Ensure the _get method was called with the correct arguments get_mock.assert_called_once_with( - tables.domains, self.zone.id, exceptions.ZoneNotFound, - id_col=tables.domains.c.designate_id) + session_mock, tables.domains, self.zone.id, + exceptions.ZoneNotFound, id_col=tables.domains.c.designate_id) # Ensure we have one query, a DELETE self.assertEqual(1, session_mock.execute.call_count) @@ -140,21 +150,23 @@ class PowerDNSBackendTestCase(BackendTestCase): # TODO(kiall): Validate the ID being deleted - @mock.patch.object(impl_powerdns.PowerDNSBackend, 'session', - new_callable=mock.Mock) + @mock.patch.object(impl_powerdns.PowerDNSBackend, 'get_session') @mock.patch.object(impl_powerdns.PowerDNSBackend, '_get', side_effect=exceptions.ZoneNotFound) @mock.patch.object(impl_powerdns.PowerDNSBackend, '_delete', return_value=None) def test_delete_zone_zone_not_found(self, delete_mock, get_mock, - session_mock): + get_session_mock): + session_mock = mock.MagicMock() + get_session_mock.return_value = session_mock + context = self.get_context() self.backend.delete_zone(context, self.zone) # Ensure the _get method was called with the correct arguments get_mock.assert_called_once_with( - tables.domains, self.zone.id, exceptions.ZoneNotFound, - id_col=tables.domains.c.designate_id) + session_mock, tables.domains, self.zone.id, + exceptions.ZoneNotFound, id_col=tables.domains.c.designate_id) # Ensure the _delete method was not called self.assertFalse(delete_mock.called) diff --git a/designate/tests/test_zone_manager/__init__.py b/designate/tests/test_producer/__init__.py similarity index 100% rename from designate/tests/test_zone_manager/__init__.py rename to designate/tests/test_producer/__init__.py diff --git a/designate/tests/test_zone_manager/test_service.py b/designate/tests/test_producer/test_service.py similarity index 90% rename from designate/tests/test_zone_manager/test_service.py rename to designate/tests/test_producer/test_service.py index 56b169390..18c052b1f 100644 --- a/designate/tests/test_zone_manager/test_service.py +++ b/designate/tests/test_producer/test_service.py @@ -20,8 +20,8 @@ from designate.tests import TestCase LOG = logging.getLogger(__name__) -class ZoneManagerServiceTest(TestCase): +class ProducerServiceTest(TestCase): def test_stop(self): # Test stopping the service - service = self.start_service("zone_manager") + service = self.start_service("producer") service.stop() diff --git a/designate/tests/test_zone_manager/test_tasks.py b/designate/tests/test_producer/test_tasks.py similarity index 81% rename from designate/tests/test_zone_manager/test_tasks.py rename to designate/tests/test_producer/test_tasks.py index 9ec74dcda..4f9135bf5 100644 --- a/designate/tests/test_zone_manager/test_tasks.py +++ b/designate/tests/test_producer/test_tasks.py @@ -16,17 +16,13 @@ import datetime -from mock import MagicMock from oslo_log import log as logging from oslo_utils import timeutils -from designate.pool_manager.rpcapi import PoolManagerAPI from designate.storage.impl_sqlalchemy import tables from designate.tests import TestCase from designate.tests import fixtures -from designate.zone_manager import tasks - -from fixtures import MockPatch +from designate.producer import tasks LOG = logging.getLogger(__name__) @@ -39,7 +35,7 @@ class TaskTest(TestCase): def _enable_tasks(self, tasks): self.config( enabled_tasks=tasks, - group="service:zone_manager") + group="service:producer") class DeletedzonePurgeTest(TaskTest): @@ -50,7 +46,7 @@ class DeletedzonePurgeTest(TaskTest): interval=3600, time_threshold=604800, batch_size=100, - group="zone_manager_task:zone_purge" + group="producer_task:zone_purge" ) self.purge_task_fixture = self.useFixture( @@ -99,7 +95,7 @@ class DeletedzonePurgeTest(TaskTest): return zones def test_purge_zones(self): - # Create 18 zones, run zone_manager, check if 7 zones are remaining + # Create 18 zones, run producer, check if 7 zones are remaining self.config(quota_zones=1000) self._create_deleted_zones() @@ -110,14 +106,6 @@ class DeletedzonePurgeTest(TaskTest): self.assertEqual(7, len(zones)) -fx_pool_manager = MockPatch( - 'designate.zone_manager.tasks.PoolManagerAPI.get_instance', - MagicMock(spec_set=[ - 'update_zone', - ]) -) - - class PeriodicGenerateDelayedNotifyTaskTest(TaskTest): def setUp(self): @@ -126,7 +114,7 @@ class PeriodicGenerateDelayedNotifyTaskTest(TaskTest): self.config( interval=5, batch_size=100, - group="zone_manager_task:delayed_notify" + group="producer_task:delayed_notify" ) self.generate_delayed_notify_task_fixture = self.useFixture( @@ -158,31 +146,21 @@ class PeriodicGenerateDelayedNotifyTaskTest(TaskTest): self.config( interval=1, batch_size=5, - group="zone_manager_task:delayed_notify" + group="producer_task:delayed_notify" ) self._create_zones() zones = self._fetch_zones(tables.zones.select().where( tables.zones.c.delayed_notify == True)) # nopep8 self.assertEqual(10, len(zones)) - # Run the task and check if it reset the delayed_notify flag - with fx_pool_manager: - pm_api = PoolManagerAPI.get_instance() - pm_api.update_zone.reset_mock() - - self.generate_delayed_notify_task_fixture.task() - - self.assertEqual(10, pm_api.update_zone.call_count) + self.generate_delayed_notify_task_fixture.task() zones = self._fetch_zones(tables.zones.select().where( tables.zones.c.delayed_notify == True)) # nopep8 self.assertEqual(5, len(zones)) # Run the task and check if it reset the delayed_notify flag - with fx_pool_manager: - self.generate_delayed_notify_task_fixture.task() - pm_api = PoolManagerAPI.get_instance() - self.assertEqual(20, pm_api.update_zone.call_count) + self.generate_delayed_notify_task_fixture.task() zones = self._fetch_zones(tables.zones.select().where( tables.zones.c.delayed_notify == True)) # nopep8 diff --git a/designate/tests/unit/test_zone_manager/__init__.py b/designate/tests/test_workers/__init__.py similarity index 100% rename from designate/tests/unit/test_zone_manager/__init__.py rename to designate/tests/test_workers/__init__.py diff --git a/designate/tests/test_workers/test_base_task.py b/designate/tests/test_workers/test_base_task.py new file mode 100644 index 000000000..bca6d55bc --- /dev/null +++ b/designate/tests/test_workers/test_base_task.py @@ -0,0 +1,30 @@ +# Copyright 2016 Rackspace Inc. +# +# Author: Eric Larson +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License.mport threading +from unittest import TestCase + +from designate.worker.tasks import base + + +class TestTask(TestCase): + + def setUp(self): + self.task = base.Task(None) + + def test_constructor(self): + assert self.task + + def test_call(self): + self.assertRaises(NotImplementedError, self.task) diff --git a/designate/tests/test_workers/test_processing.py b/designate/tests/test_workers/test_processing.py new file mode 100644 index 000000000..1918bf9fe --- /dev/null +++ b/designate/tests/test_workers/test_processing.py @@ -0,0 +1,46 @@ +# Copyright 2016 Rackspace Inc. +# +# Author: Eric Larson +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License.mport threading +from unittest import TestCase + +from designate.worker import processing + + +class TestProcessingExecutor(TestCase): + + def test_execute_multiple_tasks(self): + def t1(): + return 1 + + def t2(): + return 2 + + tasks = [t1, t2, t1, t2, t1] + exe = processing.Executor() + + results = exe.run(tasks) + assert results == [1, 2, 1, 2, 1] + + def test_execute_single_task(self): + def t1(): + return 1 + + def t2(): + return 2 + + exe = processing.Executor() + + results = exe.run(t1) + assert results == [1] diff --git a/designate/tests/test_workers/test_service.py b/designate/tests/test_workers/test_service.py new file mode 100644 index 000000000..f8d576ce2 --- /dev/null +++ b/designate/tests/test_workers/test_service.py @@ -0,0 +1,99 @@ +# Copyright 2016 Rackspace Inc. +# +# Author: Eric Larson +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License.mport threading +from unittest import TestCase + +import mock + +from designate.worker import service + + +class TestService(TestCase): + + def setUp(self): + self.context = mock.Mock() + self.zone = mock.Mock() + self.service = service.Service() + + def test_create_zone(self): + self.service._do_zone_action = mock.Mock() + + self.service.create_zone(self.context, self.zone) + + self.service._do_zone_action.assert_called_with( + self.context, self.zone + ) + + def test_delete_zone(self): + self.service._do_zone_action = mock.Mock() + + self.service.delete_zone(self.context, self.zone) + + self.service._do_zone_action.assert_called_with( + self.context, self.zone + ) + + def test_update_zone(self): + self.service._do_zone_action = mock.Mock() + + self.service.update_zone(self.context, self.zone) + + self.service._do_zone_action.assert_called_with( + self.context, self.zone + ) + + @mock.patch.object(service.zonetasks, 'ZoneAction') + def test_do_zone_action(self, ZoneAction): + self.service._executor = mock.Mock() + self.service._pool = mock.Mock() + self.service.get_pool = mock.Mock() + pool = mock.Mock() + self.service.get_pool.return_value = pool + + self.service._do_zone_action(self.context, self.zone) + + ZoneAction.assert_called_with( + self.service.executor, + self.context, + pool, + self.zone, + self.zone.action + ) + + self.service._executor.run.assert_called_with(ZoneAction()) + + def test_get_pool(self): + pool = mock.Mock() + self.service.load_pool = mock.Mock() + self.service.load_pool.return_value = pool + self.service._pools_map = {'1': pool} + + assert self.service.get_pool('1') == pool + assert self.service.get_pool('2') == pool + + @mock.patch.object(service.zonetasks, 'RecoverShard') + def test_recover_shard(self, RecoverShard): + self.service._executor = mock.Mock() + self.service._pool = mock.Mock() + + self.service.recover_shard(self.context, 1, 10) + + RecoverShard.assert_called_with( + self.service.executor, + self.context, + 1, 10 + ) + + self.service.executor.run.assert_called_with(RecoverShard()) diff --git a/designate/tests/test_workers/test_zone_tasks.py b/designate/tests/test_workers/test_zone_tasks.py new file mode 100644 index 000000000..51bb2ab83 --- /dev/null +++ b/designate/tests/test_workers/test_zone_tasks.py @@ -0,0 +1,574 @@ +# Copyright 2016 Rackspace Inc. +# +# Author: Eric Larson +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License.mport threading +from unittest import TestCase + +import mock +import testtools +from oslo_config import cfg + +from functionaltests.common import utils +from designate import exceptions +from designate.worker.tasks import zone +from designate.worker import processing + + +class TestZoneAction(TestCase): + + def setUp(self): + self.context = mock.Mock() + self.pool = 'default_pool' + self.executor = mock.Mock() + self.task = zone.ZoneAction( + self.executor, self.context, self.pool, mock.Mock(), 'CREATE' + ) + self.task._wait_for_nameservers = mock.Mock() + + def test_constructor(self): + assert self.task + + def test_call(self): + self.task._zone_action_on_targets = mock.Mock(return_value=True) + self.task._poll_for_zone = mock.Mock(return_value=True) + result = self.task() + assert result is True + + assert self.task._wait_for_nameservers.called + assert self.task._zone_action_on_targets.called + assert self.task._poll_for_zone.called + + def test_call_on_delete(self): + myzone = mock.Mock() + task = zone.ZoneAction( + self.executor, self.context, self.pool, myzone, 'DELETE' + ) + task._zone_action_on_targets = mock.Mock(return_value=True) + task._poll_for_zone = mock.Mock(return_value=True) + task._wait_for_nameservers = mock.Mock() + + assert task() + + assert myzone.serial == 0 + + def test_call_fails_on_zone_targets(self): + self.task._zone_action_on_targets = mock.Mock(return_value=False) + assert not self.task() + + def test_call_fails_on_poll_for_zone(self): + self.task._zone_action_on_targets = mock.Mock(return_value=False) + assert not self.task() + + @mock.patch.object(zone, 'time') + def test_wait_for_nameservers(self, time): + # It is just a time.sleep :( + task = zone.ZoneAction( + self.executor, self.context, self.pool, mock.Mock(), 'CREATE' + ) + task._wait_for_nameservers() + time.sleep.assert_called_with(task.delay) + + +class TestZoneActor(TestCase): + """The zone actor runs actions for zones in multiple threads and + ensures the result meets the required thresholds for calling it + done. + """ + + def setUp(self): + self.context = mock.Mock() + self.pool = mock.Mock() + self.executor = mock.Mock() + self.actor = zone.ZoneActor( + self.executor, + self.context, + self.pool, + mock.Mock(action='CREATE'), + ) + + def test_invalid_action(self): + with testtools.ExpectedException(Exception, "Bad Action"): + self.actor._validate_action('BAD') + + def test_threshold_from_config(self): + actor = zone.ZoneActor( + self.executor, self.context, self.pool, mock.Mock(action='CREATE') + ) + + default = cfg.CONF['service:worker'].threshold_percentage + assert actor.threshold == default + + def test_execute(self): + self.pool.targets = ['target 1'] + self.actor.executor.run.return_value = ['foo'] + + results = self.actor._execute() + + assert results == ['foo'] + + def test_call(self): + self.actor.pool.targets = ['target 1'] + self.actor.executor.run.return_value = [True] + assert self.actor() is True + + def test_threshold_met_true(self): + self.actor._threshold = 80 + + results = [True for i in range(8)] + [False, False] + + assert self.actor._threshold_met(results) + + def test_threshold_met_false(self): + self.actor._threshold = 90 + self.actor._update_status = mock.Mock() + + results = [False] + [True for i in range(8)] + [False] + + assert not self.actor._threshold_met(results) + assert self.actor._update_status.called + assert self.actor.zone.status == 'ERROR' + + +QUERY_RESULTS = { + 'delete_success_all': { + 'case': { + 'action': 'DELETE', + 'results': [0, 0, 0, 0], + 'zone_serial': 1, + 'positives': 4, + 'no_zones': 4, + 'consensus_serial': 0 + } + }, + 'delete_success_half': { + 'case': { + 'action': 'DELETE', + 'results': [1, 0, 1, 0], + 'zone_serial': 1, + 'positives': 2, + 'no_zones': 2, + 'consensus_serial': 0 + }, + }, + 'update_success_all': { + 'case': { + 'action': 'UPDATE', + 'results': [2, 2, 2, 2], + 'zone_serial': 2, + 'positives': 4, + 'no_zones': 0, + 'consensus_serial': 2 + }, + }, + 'update_fail_all': { + 'case': { + 'action': 'UPDATE', + 'results': [1, 1, 1, 1], + 'zone_serial': 2, + 'positives': 0, + 'no_zones': 0, + # The consensus serial is never updated b/c the nameserver + # serials are less than the zone serial. + 'consensus_serial': 0 + }, + }, + 'update_success_with_higher_serial': { + 'case': { + 'action': 'UPDATE', + 'results': [2, 1, 0, 3], + 'zone_serial': 2, + 'positives': 2, + 'no_zones': 1, + 'consensus_serial': 2 + }, + }, + 'update_success_all_higher_serial': { + 'case': { + 'action': 'UPDATE', + 'results': [3, 3, 3, 3], + 'zone_serial': 2, + 'positives': 4, + 'no_zones': 0, + 'consensus_serial': 3, + } + }, +} + + +@utils.parameterized_class +class TestParseQueryResults(TestCase): + + @utils.parameterized(QUERY_RESULTS) + def test_result_cases(self, case): + z = mock.Mock(action=case['action']) + if case.get('zone_serial'): + z.serial = case['zone_serial'] + + result = zone.parse_query_results( + case['results'], z + ) + + assert result.positives == case['positives'] + assert result.no_zones == case['no_zones'] + assert result.consensus_serial == case['consensus_serial'] + + +class TestZonePoller(TestCase): + + def setUp(self): + self.context = mock.Mock() + self.pool = mock.Mock() + self.zone = mock.Mock(name='example.com.', serial=1) + self.threshold = 80 + self.executor = mock.Mock() + self.poller = zone.ZonePoller( + self.executor, + self.context, + self.pool, + self.zone, + ) + self.poller._threshold = self.threshold + + def test_constructor(self): + assert self.poller + assert self.poller.threshold == self.threshold + + def test_call_on_success(self): + ns_results = [2 for i in range(8)] + [0, 0] + result = zone.DNSQueryResult( + positives=8, + no_zones=2, + consensus_serial=2, + results=ns_results, + ) + self.poller.zone.action = 'UPDATE' + self.poller.zone.serial = 2 + self.poller._do_poll = mock.Mock(return_value=result) + self.poller._on_success = mock.Mock(return_value=True) + self.poller._update_status = mock.Mock() + + assert self.poller() + + self.poller._on_success.assert_called_with(result, 'SUCCESS') + self.poller._update_status.called + self.poller.zone.serial = 2 + self.poller.zone.status = 'SUCCESS' + + def test_threshold_met_true(self): + ns_results = [2 for i in range(8)] + [0, 0] + result = zone.DNSQueryResult( + positives=8, + no_zones=2, + consensus_serial=2, + results=ns_results, + ) + + success, status = self.poller._threshold_met(result) + + assert success + assert status == 'SUCCESS' + + def test_threshold_met_false_low_positives(self): + # 6 positives, 4 behind the serial (aka 0 no_zones) + ns_results = [2 for i in range(6)] + [1 for i in range(4)] + result = zone.DNSQueryResult( + positives=6, + no_zones=0, + consensus_serial=2, + results=ns_results, + ) + + success, status = self.poller._threshold_met(result) + + assert not success + assert status == 'ERROR' + + def test_threshold_met_true_no_zones(self): + # Change is looking for serial 2 + # 4 positives, 4 no zones, 2 behind the serial + ns_results = [2 for i in range(4)] + [0 for i in range(4)] + [1, 1] + result = zone.DNSQueryResult( + positives=4, + no_zones=4, + consensus_serial=1, + results=ns_results, + ) + + # Set the threshold to 30% + self.poller._threshold = 30 + self.poller.zone.action = 'UPDATE' + + success, status = self.poller._threshold_met(result) + + assert success + assert status == 'SUCCESS' + + def test_threshold_met_false_no_zones(self): + # Change is looking for serial 2 + # 4 positives, 4 no zones + ns_results = [2 for i in range(4)] + [0 for i in range(4)] + result = zone.DNSQueryResult( + positives=4, + no_zones=4, + consensus_serial=2, + results=ns_results, + ) + + # Set the threshold to 100% + self.poller._threshold = 100 + self.poller.zone.action = 'UPDATE' + + success, status = self.poller._threshold_met(result) + + assert not success + assert status == 'NO_ZONE' + + def test_threshold_met_false_no_zones_one_result(self): + # Change is looking for serial 2 + # 4 positives, 4 no zones + ns_results = [0] + result = zone.DNSQueryResult( + positives=0, + no_zones=1, + consensus_serial=2, + results=ns_results, + ) + + # Set the threshold to 100% + self.poller._threshold = 100 + self.poller.zone.action = 'UPDATE' + + success, status = self.poller._threshold_met(result) + + assert not success + assert status == 'NO_ZONE' + + def test_on_success(self): + query_result = mock.Mock(consensus_serial=10) + + result = self.poller._on_success(query_result, 'FOO') + + assert result is True + assert self.zone.serial == 10 + assert self.zone.status == 'FOO' + + def test_on_error_failure(self): + result = self.poller._on_failure('FOO') + + assert result is False + assert self.zone.status == 'FOO' + + def test_on_no_zones_failure(self): + result = self.poller._on_failure('NO_ZONE') + + assert result is False + assert self.zone.status == 'NO_ZONE' + assert self.zone.action == 'CREATE' + + +class TestZonePollerPolling(TestCase): + + def setUp(self): + self.executor = processing.Executor() + self.context = mock.Mock() + self.zone = mock.Mock(name='example.com.', action='UPDATE', serial=10) + self.pool = mock.Mock(nameservers=['ns1', 'ns2']) + self.threshold = 80 + + self.poller = zone.ZonePoller( + self.executor, + self.context, + self.pool, + self.zone, + ) + + self.max_retries = 4 + self.retry_interval = 2 + self.poller._max_retries = self.max_retries + self.poller._retry_interval = self.retry_interval + + @mock.patch.object(zone, 'PollForZone') + def test_do_poll(self, PollForZone): + PollForZone.return_value = mock.Mock(return_value=10) + result = self.poller._do_poll() + + assert result + + assert result.positives == 2 + assert result.no_zones == 0 + assert result.results == [10, 10] + + @mock.patch.object(zone, 'time', mock.Mock()) + def test_do_poll_with_retry(self): + exe = mock.Mock() + exe.run.side_effect = [ + [0, 0], [10, 10] + ] + self.poller.executor = exe + + result = self.poller._do_poll() + + assert result + + zone.time.sleep.assert_called_with(self.retry_interval) + + # retried once + assert len(zone.time.sleep.mock_calls) == 1 + + @mock.patch.object(zone, 'time', mock.Mock()) + def test_do_poll_with_retry_until_fail(self): + exe = mock.Mock() + exe.run.return_value = [0, 0] + + self.poller.executor = exe + + self.poller._do_poll() + + assert len(zone.time.sleep.mock_calls) == self.max_retries + + +class TestUpdateStatus(TestCase): + + def setUp(self): + self.executor = processing.Executor() + self.task = zone.UpdateStatus(self.executor, mock.Mock(), mock.Mock()) + self.task._central_api = mock.Mock() + + def test_call_on_delete(self): + self.task.zone.action = 'DELETE' + + self.task() + + assert self.task.zone.action == 'NONE' + assert self.task.zone.status == 'NO_ZONE' + assert self.task.central_api.update_status.called + + def test_call_on_success(self): + self.task.zone.status = 'SUCCESS' + + self.task() + + assert self.task.zone.action == 'NONE' + assert self.task.central_api.update_status.called + + def test_call_central_call(self): + self.task.zone.status = 'SUCCESS' + + self.task() + + self.task.central_api.update_status.assert_called_with( + self.task.context, + self.task.zone.id, + self.task.zone.status, + self.task.zone.serial, + ) + + def test_call_on_delete_error(self): + self.task.zone.action = 'DELETE' + self.task.zone.status = 'ERROR' + + self.task() + + assert self.task.zone.action == 'DELETE' + assert self.task.zone.status == 'ERROR' + assert self.task.central_api.update_status.called + + def test_call_on_create_error(self): + self.task.zone.action = 'CREATE' + self.task.zone.status = 'ERROR' + + self.task() + + assert self.task.zone.action == 'CREATE' + assert self.task.zone.status == 'ERROR' + assert self.task.central_api.update_status.called + + def test_call_on_update_error(self): + self.task.zone.action = 'UPDATE' + self.task.zone.status = 'ERROR' + + self.task() + + assert self.task.zone.action == 'UPDATE' + assert self.task.zone.status == 'ERROR' + assert self.task.central_api.update_status.called + + +class TestPollForZone(TestCase): + + def setUp(self): + self.zone = mock.Mock(serial=1) + self.zone.name = 'example.org.' + self.executor = processing.Executor() + + self.ns = mock.Mock(host='ns.example.org', port=53) + self.task = zone.PollForZone(self.executor, self.zone, self.ns) + self.task._max_retries = 3 + self.task._retry_interval = 2 + + @mock.patch.object(zone.wutils, 'get_serial', mock.Mock(return_value=10)) + def test_get_serial(self): + assert self.task._get_serial() == 10 + + zone.wutils.get_serial.assert_called_with( + 'example.org.', + 'ns.example.org', + port=53 + ) + + def test_call(self): + self.task._get_serial = mock.Mock(return_value=10) + + result = self.task() + + assert result == 10 + + +class TestExportZone(TestCase): + + def setUp(self): + self.zone = mock.Mock(name='example.com.', serial=1) + self.export = mock.Mock() + self.export.id = '1' + self.executor = processing.Executor() + self.context = mock.Mock() + + self.task = zone.ExportZone( + self.executor, self.context, self.zone, self.export) + self.task._central_api = mock.Mock() + self.task._storage = mock.Mock() + self.task._quota = mock.Mock() + + self.task._quota.limit_check = mock.Mock() + self.task._storage.count_recordsets = mock.Mock(return_value=1) + self.task._synchronous_export = mock.Mock(return_value=True) + + def test_sync_export_right_size(self): + self.task() + assert self.export.status == 'COMPLETE' + s = "designate://v2/zones/tasks/exports/%s/export" % self.export.id + assert self.export.location == s + + def test_sync_export_wrong_size_fails(self): + self.task._quota.limit_check = mock.Mock( + side_effect=exceptions.OverQuota) + + self.task() + assert self.export.status == 'ERROR' + + def test_async_export_fails(self): + self.task._synchronous_export = mock.Mock(return_value=False) + + self.task() + assert self.export.status == 'ERROR' diff --git a/designate/tests/unit/test_central/test_basic.py b/designate/tests/unit/test_central/test_basic.py index e1d340116..dc04b4965 100644 --- a/designate/tests/unit/test_central/test_basic.py +++ b/designate/tests/unit/test_central/test_basic.py @@ -1912,16 +1912,18 @@ class CentralZoneExportTests(CentralBasic): ) self.service.storage.create_zone_export = Mock( - return_value=RoObject( + return_value=RwObject( + id='1', zone_id='123', task_type='EXPORT', status='PENDING', message=None, - tenant_id='t' + tenant_id='t', + location=None, ) ) - self.service.zone_manager_api.start_zone_export = Mock() + self.service.worker_api.start_zone_export = Mock() out = self.service.create_zone_export( self.context, diff --git a/designate/tests/unit/test_producer/__init__.py b/designate/tests/unit/test_producer/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/designate/tests/unit/test_producer/test_service.py b/designate/tests/unit/test_producer/test_service.py new file mode 100644 index 000000000..e083b169a --- /dev/null +++ b/designate/tests/unit/test_producer/test_service.py @@ -0,0 +1,60 @@ +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Author: Federico Ceratto +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Unit-test Producer service +""" + +import mock +from oslotest import base as test + +from designate.tests.unit import RoObject +import designate.producer.service as ps + + +@mock.patch.object(ps.rpcapi.CentralAPI, 'get_instance') +class ProducerTest(test.BaseTestCase): + + def setUp(self): + ps.CONF = RoObject({ + 'service:producer': RoObject({ + 'enabled_tasks': None, # enable all tasks + }), + # TODO(timsim): Remove this + 'service:zone_manager': RoObject({ + 'enabled_tasks': None, # enable all tasks + 'export_synchronous': True + }), + 'producer_task:zone_purge': '', + }) + super(ProducerTest, self).setUp() + self.tm = ps.Service() + self.tm._storage = mock.Mock() + self.tm._rpc_server = mock.Mock() + self.tm._quota = mock.Mock() + self.tm.quota.limit_check = mock.Mock() + + def test_service_name(self, _): + self.assertEqual('producer', self.tm.service_name) + + def test_central_api(self, _): + capi = self.tm.central_api + assert isinstance(capi, mock.MagicMock) + + @mock.patch.object(ps.tasks, 'PeriodicTask') + @mock.patch.object(ps.coordination, 'Partitioner') + def test_stark(self, _, mock_partitioner, mock_PeriodicTask): + self.tm.start() diff --git a/designate/tests/unit/test_zone_manager/test_tasks.py b/designate/tests/unit/test_producer/test_tasks.py similarity index 96% rename from designate/tests/unit/test_zone_manager/test_tasks.py rename to designate/tests/unit/test_producer/test_tasks.py index f4694692c..2a2eb5dc4 100644 --- a/designate/tests/unit/test_zone_manager/test_tasks.py +++ b/designate/tests/unit/test_producer/test_tasks.py @@ -15,7 +15,7 @@ # under the License. """ -Unit test Zone Manager tasks +Unit test Producer tasks """ import datetime import uuid @@ -29,7 +29,7 @@ import testtools from designate.central import rpcapi as central_api from designate import context from designate import rpc -from designate.zone_manager import tasks +from designate.producer import tasks from designate.tests.unit import RoObject @@ -55,7 +55,7 @@ class PeriodicTest(TaskTest): super(PeriodicTest, self).setUp() opts = { - "zone_manager_task:dummy": RoObject({ + "producer_task:dummy": RoObject({ "per_page": 100, }) } @@ -118,7 +118,7 @@ class PeriodicExistsTest(TaskTest): super(PeriodicExistsTest, self).setUp() opts = { - "zone_manager_task:periodic_exists": RoObject({ + "producer_task:periodic_exists": RoObject({ "per_page": 100, "interval": 5 }) @@ -204,7 +204,7 @@ class PeriodicSecondaryRefreshTest(TaskTest): super(PeriodicSecondaryRefreshTest, self).setUp() opts = { - "zone_manager_task:periodic_secondary_refresh": RoObject({ + "producer_task:periodic_secondary_refresh": RoObject({ "per_page": 100 }) } diff --git a/designate/tests/unit/test_zone_manager/test_service.py b/designate/tests/unit/test_zone_manager/test_service.py deleted file mode 100644 index 63c8d57f6..000000000 --- a/designate/tests/unit/test_zone_manager/test_service.py +++ /dev/null @@ -1,101 +0,0 @@ -# Copyright 2015 Hewlett-Packard Development Company, L.P. -# -# Author: Federico Ceratto -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Unit-test Zone Manager service -""" - -import mock -from oslotest import base as test - -from designate import exceptions -from designate.tests.unit import RoObject -import designate.zone_manager.service as zms - - -@mock.patch.object(zms.rpcapi.CentralAPI, 'get_instance') -class ZoneManagerTest(test.BaseTestCase): - - def setUp(self): - zms.CONF = RoObject({ - 'service:zone_manager': RoObject({ - 'enabled_tasks': None, # enable all tasks - 'export_synchronous': True - }), - 'zone_manager_task:zone_purge': '', - }) - super(ZoneManagerTest, self).setUp() - self.tm = zms.Service() - self.tm._storage = mock.Mock() - self.tm._rpc_server = mock.Mock() - self.tm._quota = mock.Mock() - self.tm.quota.limit_check = mock.Mock() - - def test_service_name(self, _): - self.assertEqual('zone_manager', self.tm.service_name) - - def test_central_api(self, _): - capi = self.tm.central_api - assert isinstance(capi, mock.MagicMock) - - @mock.patch.object(zms.tasks, 'PeriodicTask') - @mock.patch.object(zms.coordination, 'Partitioner') - def test_stark(self, _, mock_partitioner, mock_PeriodicTask): - self.tm.start() - - def test_start_zone_export(self, _): - zone = RoObject(id=3) - context = mock.Mock() - export = {} - self.tm.storage.count_recordsets.return_value = 1 - assert self.tm.storage.count_recordsets() == 1 - self.tm._determine_export_method = mock.Mock() - self.tm.start_zone_export(context, zone, export) - assert self.tm._determine_export_method.called - assert self.tm.central_api.update_zone_export.called - call_args = self.tm._determine_export_method.call_args_list[0][0] - self.assertEqual((context, export, 1), call_args) - - def test_determine_export_method(self, _): - context = mock.Mock() - export = dict(location=None, id=4) - size = mock.Mock() - out = self.tm._determine_export_method(context, export, size) - self.assertDictEqual( - { - 'status': 'COMPLETE', 'id': 4, - 'location': 'designate://v2/zones/tasks/exports/4/export' - }, - out - ) - - def test_exceed_size_quota(self, _): - context = mock.Mock() - export = dict(location=None, id=4) - size = 9999999999 - - self.tm.quota.limit_check.side_effect = exceptions.OverQuota() - out = self.tm._determine_export_method(context, export, size) - self.tm.quota.limit_check.side_effect = None - - self.assertDictEqual( - { - 'status': 'ERROR', - 'id': 4, - 'location': None, - 'message': 'Zone is too large to export' - }, - out - ) diff --git a/designate/utils.py b/designate/utils.py index 4433c5c19..11b6ffcd2 100644 --- a/designate/utils.py +++ b/designate/utils.py @@ -119,8 +119,9 @@ def register_plugin_opts(): # Avoid circular dependency imports from designate import plugin - plugin.Plugin.register_cfg_opts('designate.zone_manager_tasks') - plugin.Plugin.register_extra_cfg_opts('designate.zone_manager_tasks') + # Register Producer Tasks + plugin.Plugin.register_cfg_opts('designate.producer_tasks') + plugin.Plugin.register_extra_cfg_opts('designate.producer_tasks') # Register Backend Plugin Config Options plugin.Plugin.register_cfg_opts('designate.backend') @@ -536,3 +537,7 @@ def bind_udp(host, port): LOG.info(_LI('Listening on UDP port %(port)d'), {'port': newport}) return sock_udp + + +def max_prop_time(timeout, max_retries, retry_interval, delay): + return timeout * max_retries + max_retries * retry_interval + delay diff --git a/designate/worker/README.md b/designate/worker/README.md new file mode 100644 index 000000000..9c5d99208 --- /dev/null +++ b/designate/worker/README.md @@ -0,0 +1,206 @@ +# Worker Model Code + +The general service looks like any other Designate RPC service. Available +RPC calls are defined in `rpcapi.py` and implemented in `service.py`. Where +this differs is that the `service.py` implementations generally spawn threads +with a directive to invoke some sort of "task". + +# Tasks + +Tasks are discrete units of work that are represented in the form +of *_callable_* python objects. They can optionally return a value to be +used in the caller. + +For (abbreviated) example: +```python +class SendNotify(base.Task): + """ + Send a NOTIFY packet for a zone to a target + + :return: Success/Failure delivering the notify (bool) + """ + def __init__(self, executor, zone, target): + super(SendNotify, self).__init__(executor) + self.zone = zone + self.target = target + + def __call__(self): + host = self.target.options.get('host') + port = int(self.target.options.get('port')) + + try: + wutils.notify(self.zone.name, host, port=port) + return True + except Exception: + return False +``` + +To invoke: + +If you're ok executing it on the local thread: `SendNotify(executor, zone, target)()` +If you want to schedule it in it's own thread, allowing it to yield to others: +```python +self.executor.run(zonetasks.SendNotify( + self.executor, zone, target +)) +``` + +Most tasks are executed using the executor at the top-level, for example when +the worker gets a message to `create_zone`, it will say "pop a thread to create +this zone on the pool", which will eventually flow to "I need to create this +zone on N targets", which will result in a: +```python +results = self.executor.run([ + ZoneActionOnTarget(self.executor, self.context, self.zone, target) + for target in self.pool.targets +]) +``` + +You can find the tasks in `designate/worker/tasks`, most tasks inherit from a base +that gives basic access like other rpcapis, storage, etc. + +So the one thread for doing the entire zone create will use N threads in the +pool to go and do that, and when they're finished, the task will be back down +to using one thread as it evaluates results. Then it will do something similar +when it needs to poll N nameservers. + +# Execution in Threads + +The core of how this works is using the +[Python ThreadPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor). +This is plugabble, someone could certainly add a different executor, +but it's a simple idea that lets you map callables (tasks) across threads. + +Here's an example that shows how you can make multiple calls to a single +ThreadPoolExecutor from concurrent threads (similar to how tasks calling +subtasks would do it). + +```python +import concurrent.futures + +# Initialize 4 executors + +# e is so that we can make two concurrent calls to another executor +e = concurrent.futures.ThreadPoolExecutor(2) + +# e_one is the executor that shows that we can make multiple calls from +# different threads to one executor +e_one = concurrent.futures.ThreadPoolExecutor(2) + +# e_two and e_three are just separate pools to be used to print numbers +e_two = concurrent.futures.ThreadPoolExecutor(5) +e_three = concurrent.futures.ThreadPoolExecutor(5) + +def do(task): + task() + +def one(): + print '1' + +def two(): + print '2' + +def do_one(tup): + """ + Call the callable len(tup[1]) times concurrently + + Since e_one only has two threads in it's pool, it will only be + able to handle two concurrent "jobs" + + tup is (callable, list(list)) + + If one were to pass in (func, [[1]]) the resulting function calls would be: + func([1]) + + If it was (func, [1, 2]) it would be + func(1) + func(2) + """ + print 'mapping e_one for a list of len %d' % len(tup[1]) + e_one.map(tup[0], tup[1]) + +def do_a(alist): + print 'using e_two to map a list of len %d using do()' % len(alist) + e_two.map(do, alist) + +def do_b(alist): + print 'using e_three to map a list of len %d using do()' % len(alist) + e_three.map(do, alist) + +# init lists of five callables that will just print a number +ones = [one] * 5 +twos = [two] * 5 + +# a list of tuples, len two that include a function to be mapped eventually, and a list of callables +ones_twos = [(do_a, [ones]), (do_b, [twos])] + +# We call do_one twice concurrently on the two tuples +# This makes two concurrent calls to e_one.map, each of which make only +# _one_ call to another function that executes the lists of five callables +# in parallel. +# We do this so that we can see that two concurrent calls to e_one from +# different threads will work concurrently if there is enough room +# in the thread pool. +e.map(do_one, ones_twos) + +# Example output: +# $ python threadexectest.py +# mapping e_one for a list of len 1 +# mapping e_one for a list of len 1 +# +# mapping e_two for a list of len 5 +# mapping e_three for a list of len 5 +# 1 +# 2 +# 2 +# 1 +# 2 +# 1 +# 2 +# 1 +# 2 +# 1 +``` + +# Metrics + +I ran a few tests that did used the old code vs the new code. There are obviously +a ton of different variables here (number of apis/centrals, dns server used, database +setup, rabbit setup), but other tests that I've done in different random configurations +have shown similar results to these two, so I think it's a good representation of what +the differences are. + +Pool Manager Test + +- 8 Nameservers +- 12 `designate-pool-manager` processes +- 1 hour +- Testing actual DNS propagation + +Results: +| Operation | Number | Propagation Stats | +| --------------- | ------ | --------------------------------------------- | +| Creates/Imports | 5700 | Avg propagation 19s >99% propagation in 30min | +| Zone Deletes | 4600 | Avg propagation 16s >99% propagation in 30min | +| Zone Updates | 18057 | Avg propagation 384s ~90 propagation in 30min | + +Propagation Graph: ![](http://i.imgur.com/g3kodip.png) +Notice the prop times are increasing as time went on, so a longer test would +almost certainly show even worse times. + +Worker Test + +- 8 Nameservers +- 12 `designate-worker` processes +- 1 hour +- Testing actual DNS propagation + +Results: + +| Operation | Number | Propagation Stats | +| --------------- | ------ | ---------------------------------------------- | +| Creates/Imports | 6413 | Avg propagation 8s >99.99% propagation in 5min | +| Zone Deletes | 2077 | Avg propagation 4s 100% propagation in 5min | +| Zone Updates | 23750 | Avg propagation 5s ~99.99% propagation in 5min | + +Propagation Graph: ![](http://i.imgur.com/fM9J9l9.png) diff --git a/designate/worker/__init__.py b/designate/worker/__init__.py new file mode 100644 index 000000000..345c79883 --- /dev/null +++ b/designate/worker/__init__.py @@ -0,0 +1,60 @@ +# Copyright 2016 Rackspace Inc. +# +# Author: Tim Simmons +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from oslo_config import cfg + +CONF = cfg.CONF + +CONF.register_group(cfg.OptGroup( + name='service:worker', title="Configuration for the Worker Service" +)) + +OPTS = [ + cfg.BoolOpt('enabled', default=False, + help='Whether to send events to worker instead of ' + 'Pool Manager', + deprecated_for_removal=True, + deprecated_reason='In Newton, this option will disappear' + 'because worker will be enabled by default'), + cfg.IntOpt('workers', + help='Number of Worker worker processes to spawn'), + cfg.IntOpt('threads', default=200, + help='Number of Worker threads to spawn per process'), + # cfg.ListOpt('enabled_tasks', + # help='Enabled tasks to run'), + cfg.StrOpt('storage-driver', default='sqlalchemy', + help='The storage driver to use'), + cfg.IntOpt('threshold-percentage', default=100, + help='The percentage of servers requiring a successful update ' + 'for a domain change to be considered active'), + cfg.IntOpt('poll-timeout', default=30, + help='The time to wait for a response from a server'), + cfg.IntOpt('poll-retry-interval', default=15, + help='The time between retrying to send a request and ' + 'waiting for a response from a server'), + cfg.IntOpt('poll-max-retries', default=10, + help='The maximum number of times to retry sending a request ' + 'and wait for a response from a server'), + cfg.IntOpt('poll-delay', default=5, + help='The time to wait before sending the first request ' + 'to a server'), + cfg.BoolOpt('notify', default=True, + help='Whether to allow worker to send NOTIFYs, this will ' + 'noop NOTIFYs in mdns if true'), + cfg.BoolOpt('export-synchronous', default=True, + help='Whether to allow synchronous zone exports') +] + +CONF.register_opts(OPTS, group='service:worker') diff --git a/designate/worker/processing.py b/designate/worker/processing.py new file mode 100644 index 000000000..1bef590df --- /dev/null +++ b/designate/worker/processing.py @@ -0,0 +1,79 @@ +# Copyright 2016 Rackspace Inc. +# +# Author: Eric Larson +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import time + +from concurrent import futures +from oslo_log import log as logging +from oslo_config import cfg + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +def default_executor(): + thread_count = 5 + try: + thread_count = CONF['service:worker'].threads + except Exception: + pass + + return futures.ThreadPoolExecutor(thread_count) + + +class Executor(object): + """ + Object to facilitate the running of a task, or a set of tasks on an + executor that can map multiple tasks across a configurable number of + threads + """ + def __init__(self, executor=None): + self._executor = executor or default_executor() + + @staticmethod + def do(task): + return task() + + def task_name(self, task): + if hasattr(task, 'task_name'): + return str(task.task_name) + if hasattr(task, 'func_name'): + return str(task.func_name) + return 'UnnamedTask' + + def run(self, tasks): + """ + Run task or set of tasks + :param tasks: the task or tasks you want to execute in the + executor's pool + + :return: The results of the tasks (list) + + If a single task is pass + """ + self.start_time = time.time() + + if callable(tasks): + tasks = [tasks] + results = [r for r in self._executor.map(self.do, tasks)] + + self.end_time = time.time() + self.task_time = self.end_time - self.start_time + + task_names = [self.task_name(t) for t in tasks] + LOG.debug("Finished Tasks %(tasks)s in %(time)fs", + {'tasks': task_names, 'time': self.task_time}) + + return results diff --git a/designate/zone_manager/rpcapi.py b/designate/worker/rpcapi.py similarity index 61% rename from designate/zone_manager/rpcapi.py rename to designate/worker/rpcapi.py index 3dc7ebd20..e18aa16fd 100644 --- a/designate/zone_manager/rpcapi.py +++ b/designate/worker/rpcapi.py @@ -1,4 +1,4 @@ -# Copyright 2015 Rackspace Inc. +# Copyright 2016 Rackspace Inc. # # Author: Tim Simmons # @@ -20,21 +20,15 @@ import oslo_messaging as messaging from designate import rpc from designate.loggingutils import rpc_logging - LOG = logging.getLogger(__name__) -ZONE_MANAGER_API = None +WORKER_API = None -def reset(): - global ZONE_MANAGER_API - ZONE_MANAGER_API = None - - -@rpc_logging(LOG, 'zone_manager') -class ZoneManagerAPI(object): +@rpc_logging(LOG, 'worker') +class WorkerAPI(object): """ - Client side of the zone manager RPC API. + Client side of the worker RPC API. API version history: @@ -43,7 +37,7 @@ class ZoneManagerAPI(object): RPC_API_VERSION = '1.0' def __init__(self, topic=None): - topic = topic if topic else cfg.CONF.zone_manager_topic + topic = topic if topic else cfg.CONF.worker_topic target = messaging.Target(topic=topic, version=self.RPC_API_VERSION) self.client = rpc.get_client(target, version_cap='1.0') @@ -57,16 +51,27 @@ class ZoneManagerAPI(object): This fixes that by creating the rpcapi when demanded. """ - global ZONE_MANAGER_API - if not ZONE_MANAGER_API: - ZONE_MANAGER_API = cls() - return ZONE_MANAGER_API + global WORKER_API + if not WORKER_API: + WORKER_API = cls() + return WORKER_API + + def create_zone(self, context, zone): + return self.client.cast( + context, 'create_zone', zone=zone) + + def update_zone(self, context, zone): + return self.client.cast( + context, 'update_zone', zone=zone) + + def delete_zone(self, context, zone): + return self.client.cast( + context, 'delete_zone', zone=zone) + + def recover_shard(self, context, begin, end): + return self.client.cast( + context, 'recover_shard', begin=begin, end=end) - # Zone Export def start_zone_export(self, context, zone, export): - return self.client.cast(context, 'start_zone_export', zone=zone, - export=export) - - def render_zone(self, context, zone_id): - return self.client.call(context, 'render_zone', - zone_id=zone_id) + return self.client.cast( + context, 'start_zone_export', zone=zone, export=export) diff --git a/designate/worker/service.py b/designate/worker/service.py new file mode 100644 index 000000000..b1988e746 --- /dev/null +++ b/designate/worker/service.py @@ -0,0 +1,172 @@ +# Copyright 2016 Rackspace Inc. +# +# Author: Tim Simmons +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import time + +from oslo_config import cfg +from oslo_log import log as logging +import oslo_messaging as messaging + +from designate.i18n import _LI +from designate.i18n import _LE +from designate import backend +from designate import exceptions +from designate import service +from designate import storage +from designate.central import rpcapi as central_api +from designate.context import DesignateContext +from designate.worker.tasks import zone as zonetasks +from designate.worker import processing + + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class Service(service.RPCService, service.Service): + RPC_API_VERSION = '1.0' + + target = messaging.Target(version=RPC_API_VERSION) + + @property + def central_api(self): + if not hasattr(self, '_central_api'): + self._central_api = central_api.CentralAPI.get_instance() + return self._central_api + + def _setup_target_backends(self, pool): + for target in pool.targets: + # Fetch an instance of the Backend class + target.backend = backend.get_backend( + target.type, target) + + LOG.info(_LI('%d targets setup'), len(pool.targets)) + + if len(pool.targets) == 0: + raise exceptions.NoPoolTargetsConfigured() + + return pool + + def load_pool(self, pool_id): + # Build the Pool (and related) Object from Config + context = DesignateContext.get_admin_context() + + pool = None + has_targets = False + + while not has_targets: + try: + pool = self.central_api.get_pool(context, pool_id) + + if len(pool.targets) > 0: + has_targets = True + else: + LOG.error(_LE("No targets for %s found."), pool) + time.sleep(5) + + # Pool data may not have migrated to the DB yet + except exceptions.PoolNotFound: + LOG.error(_LE("Pool ID %s not found."), pool_id) + time.sleep(5) + # designate-central service may not have started yet + except messaging.exceptions.MessagingTimeout: + time.sleep(0.2) + + return self._setup_target_backends(pool) + + @property + def service_name(self): + return 'worker' + + @property + def storage(self): + if not hasattr(self, '_storage'): + storage_driver = cfg.CONF['service:worker'].storage_driver + self._storage = storage.get_storage(storage_driver) + return self._storage + + @property + def executor(self): + if not hasattr(self, '_executor'): + # TODO(elarson): Create this based on config + self._executor = processing.Executor() + return self._executor + + @property + def pools_map(self): + if not hasattr(self, '_pools_map'): + self._pools_map = {} + return self._pools_map + + def get_pool(self, pool_id): + if pool_id not in self.pools_map: + LOG.info(_LI("Lazily loading pool %s"), pool_id) + self.pools_map[pool_id] = self.load_pool(pool_id) + return self.pools_map[pool_id] + + def start(self): + super(Service, self).start() + LOG.info(_LI('Started worker')) + + def _do_zone_action(self, context, zone): + pool = self.get_pool(zone.pool_id) + task = zonetasks.ZoneAction( + self.executor, context, pool, zone, zone.action + ) + return self.executor.run(task) + + def create_zone(self, context, zone): + """ + :param context: Security context information. + :param zone: Zone to be created + :return: None + """ + self._do_zone_action(context, zone) + + def update_zone(self, context, zone): + """ + :param context: Security context information. + :param zone: Zone to be updated + :return: None + """ + self._do_zone_action(context, zone) + + def delete_zone(self, context, zone): + """ + :param context: Security context information. + :param zone: Zone to be deleted + :return: None + """ + self._do_zone_action(context, zone) + + def recover_shard(self, context, begin, end): + """ + :param begin: the beginning of the shards to recover + :param end: the end of the shards to recover + :return: None + """ + return self.executor.run(zonetasks.RecoverShard( + self.executor, context, begin, end + )) + + def start_zone_export(self, context, zone, export): + """ + :param zone: Zone to be exported + :param export: Zone Export object to update + :return: None + """ + return self.executor.run(zonetasks.ExportZone( + self.executor, context, zone, export + )) diff --git a/designate/worker/tasks/__init__.py b/designate/worker/tasks/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/designate/worker/tasks/base.py b/designate/worker/tasks/base.py new file mode 100644 index 000000000..4a83e0e9a --- /dev/null +++ b/designate/worker/tasks/base.py @@ -0,0 +1,127 @@ +# Copyright 2016 Rackspace Inc. +# +# Author: Tim Simmons +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License.mport threading + +from oslo_config import cfg +from oslo_log import log as logging + +from designate.central import rpcapi as central_rpcapi +from designate import quota +from designate import storage +from designate import utils +from designate.worker import rpcapi as worker_rpcapi + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class TaskConfig(object): + """ + Configuration mixin for the various configuration settings that + a task may want to access + """ + @property + def config(self): + if not hasattr(self, '_config'): + self._config = CONF['service:worker'] + return self._config + + @property + def threshold_percentage(self): + if not hasattr(self, '_threshold_percentage'): + self._threshold_percentage = self.config.threshold_percentage + return self._threshold_percentage + + @property + def timeout(self): + if not hasattr(self, '_timeout'): + self._timeout = self.config.poll_timeout + return self._timeout + + @property + def retry_interval(self): + if not hasattr(self, '_retry_interval'): + self._retry_interval = self.config.poll_retry_interval + return self._retry_interval + + @property + def max_retries(self): + if not hasattr(self, '_max_retries'): + self._max_retries = self.config.poll_max_retries + return self._max_retries + + @property + def delay(self): + if not hasattr(self, '_delay'): + self._delay = self.config.poll_delay + return self._delay + + @property + def max_prop_time(self): + # Compute a time (seconds) by which things should have propagated + if not hasattr(self, '_max_prop_time'): + self._max_prop_time = utils.max_prop_time( + self.timeout, + self.max_retries, + self.retry_interval, + self.delay + ) + return self._max_prop_time + + +class Task(TaskConfig): + """ + Base task interface that includes some helpful connections to other + services and the basic skeleton for tasks. + + Tasks are: + - Callable + - Take an executor as their first parameter + - Can optionally return something + """ + def __init__(self, executor, **kwargs): + self.executor = executor + self.task_name = self.__class__.__name__ + self.options = {} + + @property + def storage(self): + if not hasattr(self, '_storage'): + # Get a storage connection + storage_driver = cfg.CONF['service:central'].storage_driver + self._storage = storage.get_storage(storage_driver) + return self._storage + + @property + def quota(self): + if not hasattr(self, '_quota'): + # Get a quota manager instance + self._quota = quota.get_quota() + return self._quota + + @property + def central_api(self): + if not hasattr(self, '_central_api'): + self._central_api = central_rpcapi.CentralAPI.get_instance() + return self._central_api + + @property + def worker_api(self): + if not hasattr(self, '_worker_api'): + self._worker_api = worker_rpcapi.WorkerAPI.get_instance() + return self._worker_api + + def __call__(self): + raise NotImplementedError diff --git a/designate/worker/tasks/zone.py b/designate/worker/tasks/zone.py new file mode 100644 index 000000000..4adae2650 --- /dev/null +++ b/designate/worker/tasks/zone.py @@ -0,0 +1,609 @@ +# Copyright 2016 Rackspace Inc. +# +# Author: Tim Simmons +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import time +from collections import namedtuple + +import dns +from oslo_config import cfg +from oslo_log import log as logging + +from designate.i18n import _LI +from designate.i18n import _LW +from designate.worker import utils as wutils +from designate.worker.tasks import base +from designate import exceptions +from designate import utils + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +def percentage(part, whole): + if whole == 0: + return 0 + return 100 * float(part) / float(whole) + + +class ThresholdMixin(object): + @property + def threshold(self): + if not hasattr(self, '_threshold') or self._threshold is None: + self._threshold = CONF['service:worker'].threshold_percentage + return self._threshold + + def _compare_threshold(self, successes, total): + p = percentage(successes, total) + return p >= self.threshold + + +###################### +# CRUD Zone Operations +###################### + +class ZoneActionOnTarget(base.Task): + """ + Perform a Create/Update/Delete of the zone on a pool target + + :return: Success/Failure of the target action (bool) + """ + def __init__(self, executor, context, zone, target): + super(ZoneActionOnTarget, self).__init__(executor) + self.zone = zone + self.action = zone.action + self.target = target + self.context = context + self.task_name = 'ZoneActionOnTarget-%s' % self.action.title() + + def __call__(self): + LOG.debug("Attempting %(action)s zone %(zone)s on %(target)s", + {'action': self.action, 'zone': self.zone.name, + 'target': self.target}) + + for retry in range(0, self.max_retries): + try: + if self.action == 'CREATE': + self.target.backend.create_zone(self.context, self.zone) + SendNotify(self.executor, self.zone, self.target)() + elif self.action == 'UPDATE': + self.target.backend.update_zone(self.context, self.zone) + SendNotify(self.executor, self.zone, self.target)() + elif self.action == 'DELETE': + self.target.backend.delete_zone(self.context, self.zone) + + LOG.debug("Successful %s zone %s on %s", + self.action, self.zone.name, self.target) + return True + except Exception as e: + LOG.info(_LI('Failed to %(action)s zone %(zone)s on ' + 'target %(target)s on attempt %(attempt)d, ' + 'Error: %(error)s.'), {'action': self.action, + 'zone': self.zone.name, 'target': self.target.id, + 'attempt': retry + 1, 'error': str(e)}) + time.sleep(self.retry_interval) + + return False + + +class SendNotify(base.Task): + """ + Send a NOTIFY packet and retry on failure to receive + + :raises: Various exceptions from dnspython + :return: Success/Failure delivering the notify (bool) + """ + def __init__(self, executor, zone, target): + super(SendNotify, self).__init__(executor) + self.zone = zone + self.target = target + + def __call__(self): + if not CONF['service:worker'].notify: + # TODO(timsim): Remove this someday + return True + + host = self.target.options.get('host') + port = int(self.target.options.get('port')) + + try: + wutils.notify(self.zone.name, host, port=port) + LOG.debug('Sent NOTIFY to %(host)s:%(port)s for zone ' + '%(zone)s', {'host': host, + 'port': port, 'zone': self.zone.name}) + return True + except dns.exception.Timeout as e: + LOG.info(_LI('Timeout on NOTIFY to %(host)s:%(port)s for zone ' + '%(zone)s'), {'host': host, + 'port': port, 'zone': self.zone.name}) + raise e + + return False + + +class ZoneActor(base.Task, ThresholdMixin): + """ + Orchestrate the Create/Update/Delete action on targets and update status + if it fails. We would only update status here on an error to perform the + necessary backend CRUD action. If there's a failure in propagating all + the way to the nameservers, that will be picked up in a ZonePoller. + + :return: Whether the ActionOnTarget got to a satisfactory number + of targets (bool) + """ + def __init__(self, executor, context, pool, zone): + self.executor = executor + self.context = context + self.pool = pool + self.zone = zone + + def _validate_action(self, action): + if action not in ['CREATE', 'UPDATE', 'DELETE']: + raise Exception('Bad Action') + + def _execute(self): + results = self.executor.run([ + ZoneActionOnTarget(self.executor, self.context, self.zone, target) + for target in self.pool.targets + ]) + return results + + def _update_status(self): + task = UpdateStatus(self.executor, self.context, self.zone) + task() + + def _threshold_met(self, results): + # If we don't meet threshold for action, update status + met_action_threshold = self._compare_threshold( + results.count(True), len(results)) + + if not met_action_threshold: + LOG.info(_LI('Could not %(action)s %(zone)s on enough targets. ' + 'Updating status to ERROR'), + {'action': self.zone.action, 'zone': self.zone.name}) + self.zone.status = 'ERROR' + self._update_status() + return False + return True + + def __call__(self): + self._validate_action(self.zone.action) + results = self._execute() + return self._threshold_met(results) + + +class ZoneAction(base.Task): + """ + Orchestrate a complete Create/Update/Delete of the specified zone on the + pool and the polling for the change + + :return: Success/Failure of the change propagating to a satisfactory + number of nameservers (bool) + """ + def __init__(self, executor, context, pool, zone, action): + super(ZoneAction, self).__init__(executor) + self.context = context + self.pool = pool + self.zone = zone + self.action = action + self.task_name = 'ZoneAction-%s' % self.action.title() + + def _wait_for_nameservers(self): + """ + Pause to give the nameservers a chance to update + """ + time.sleep(self.delay) + + def _zone_action_on_targets(self): + actor = ZoneActor( + self.executor, self.context, self.pool, self.zone + ) + return actor() + + def _poll_for_zone(self): + poller = ZonePoller(self.executor, self.context, self.pool, self.zone) + return poller() + + def __call__(self): + LOG.info(_LI('Attempting %(action)s on zone %(name)s'), + {'action': self.action, 'name': self.zone.name}) + + if not self._zone_action_on_targets(): + return False + + self._wait_for_nameservers() + + if self.action == 'DELETE': + self.zone.serial = 0 + + if not self._poll_for_zone(): + return False + + return True + + +############## +# Zone Polling +############## + +DNSQueryResult = namedtuple( + 'DNSQueryResult', [ + 'positives', + 'no_zones', + 'consensus_serial', + 'results' + ] +) + + +def parse_query_results(results, zone): + """ + results is a [serial/None, ...] + """ + delete = zone.action == 'DELETE' + positives = 0 + no_zones = 0 + low_serial = 0 + + for serial in results: + if serial is None: + # Intentionally don't handle None + continue + if delete: + if serial == 0: + no_zones += 1 + positives += 1 + else: + if serial >= zone.serial: + positives += 1 + + # Update the lowest valid serial aka the consensus + # serial + if low_serial == 0 or serial < low_serial: + low_serial = serial + else: + if serial == 0: + no_zones += 1 + + result = DNSQueryResult(positives, no_zones, low_serial, results) + LOG.debug('Results for polling %(zone)s-%(serial)d: %(tup)s', + {'zone': zone.name, 'serial': zone.serial, 'tup': result}) + return result + + +class PollForZone(base.Task): + """ + Send SOA queries to a nameserver for the zone. This could be a serial + number, or that the zone does not exist. + + :return: A serial number if the zone exists (int), None if the zone + does not exist + """ + def __init__(self, executor, zone, ns): + super(PollForZone, self).__init__(executor) + self.zone = zone + self.ns = ns + + def _get_serial(self): + return wutils.get_serial( + self.zone.name, + self.ns.host, + port=self.ns.port + ) + + def __call__(self): + LOG.debug('Polling for zone %(zone)s serial %(serial)s on %(ns)s', + {'zone': self.zone.name, 'serial': self.zone.serial, + 'ns': self.ns}) + + try: + serial = self._get_serial() + LOG.debug('Found serial %(serial)d on %(host)s for zone ' + '%(zone)s', {'serial': serial, 'host': self.ns.host, + 'zone': self.zone.name}) + return serial + # TODO(timsim): cache if it's higher than cache + except dns.exception.Timeout: + LOG.info(_LI('Timeout polling for serial %(serial)d ' + '%(host)s for zone %(zone)s'), {'serial': self.zone.serial, + 'host': self.ns.host, 'zone': self.zone.name}) + except Exception as e: + LOG.warning(_LW('Unexpected failure polling for serial %(serial)d ' + '%(host)s for zone %(zone)s. Error: %(error)s'), + {'serial': self.zone.serial, 'host': self.ns.host, + 'zone': self.zone.name, 'error': str(e)}) + + return None + + +class ZonePoller(base.Task, ThresholdMixin): + """ + Orchestrate polling for a change across the nameservers in a pool + and compute the proper zone status, and update it. + + :return: Whether the change was succesfully polled for on a satisfactory + number of nameservers in the pool + """ + def __init__(self, executor, context, pool, zone): + self.executor = executor + self.context = context + self.pool = pool + self.zone = zone + + def _update_status(self): + task = UpdateStatus(self.executor, self.context, self.zone) + task() + + def _do_poll(self): + """ + Poll nameservers, compute basic success, return detailed query results + for further computation. Retry on failure to poll (meet threshold for + success). + + :return: a DNSQueryResult object with the results of polling + """ + nameservers = self.pool.nameservers + + retry_interval = self.retry_interval + query_result = DNSQueryResult(0, 0, 0, 0) + results = [] + for retry in range(0, self.max_retries): + results = self.executor.run([ + PollForZone(self.executor, self.zone, ns) + for ns in nameservers + ]) + + query_result = parse_query_results(results, self.zone) + + if self._compare_threshold(query_result.positives, len(results)): + LOG.debug('Successful poll for %(zone)s', + {'zone': self.zone.name}) + break + + LOG.debug('Unsuccessful poll for %(zone)s on attempt %(n)d', + {'zone': self.zone.name, 'n': retry + 1}) + time.sleep(retry_interval) + + return query_result + + def _on_failure(self, error_status): + LOG.info(_LI('Could not find %(serial)s for %(zone)s on enough ' + 'nameservers.'), + {'serial': self.zone.serial, 'zone': self.zone.name}) + + self.zone.status = error_status + + if error_status == 'NO_ZONE': + self.zone.action = 'CREATE' + + return False + + def _on_success(self, query_result, status): + # TODO(timsim): Change this back to active, so dumb central + self.zone.status = status + LOG.debug('Found success for %(zone)s at serial %(serial)d', + {'zone': self.zone.name, 'serial': self.zone.serial}) + + self.zone.serial = query_result.consensus_serial + return True + + def _threshold_met(self, query_result): + """ + Compute whether the thresholds were met. Provide an answer, + and an error status if there was a failure. + + The error status should be either: + + - ERROR: the operation failed + - NO_ZONE: the zone doesn't exist on enough name servers + + :return: Whether the the polling was succesful, and a status + describing the state (bool, str) + """ + + total = len(query_result.results) + is_not_delete = self.zone.action != 'DELETE' + + # Ensure if we don't have too many nameservers that + # don't have the zone. + over_no_zone_threshold = self._compare_threshold( + (total - query_result.no_zones), total + ) + + if not over_no_zone_threshold and is_not_delete: + return False, 'NO_ZONE' + + # The action should have been pushed out to a minimum + # number of nameservers. + if not self._compare_threshold(query_result.positives, total): + return False, 'ERROR' + + # We have success of the action on the nameservers and enough + # nameservers have the zone to call this a success. + return True, 'SUCCESS' + + def __call__(self): + query_result = self._do_poll() + result = None + success, status = self._threshold_met(query_result) + if success: + result = self._on_success(query_result, status) + else: + result = self._on_failure(status) + + self._update_status() + return result + + +################### +# Status Management +################### + +class UpdateStatus(base.Task): + """ + Inspect the zone object and call central's update_status method. + Some logic is applied that could be removed when central's logic + for updating status is sane + + :return: No return value + """ + def __init__(self, executor, context, zone): + super(UpdateStatus, self).__init__(executor) + self.zone = zone + self.context = context + + def __call__(self): + # TODO(timsim): Fix this when central's logic is sane + if self.zone.action == 'DELETE' and self.zone.status != 'ERROR': + self.zone.action = 'NONE' + self.zone.status = 'NO_ZONE' + + if self.zone.status == 'SUCCESS': + self.zone.action = 'NONE' + + # This log message will always have action as NONE and then we + # don't use the action in the update_status call. + LOG.debug('Updating status for %(zone)s to %(status)s:%(action)s', + {'zone': self.zone.name, 'status': self.zone.status, + 'action': self.zone.action}) + + self.central_api.update_status( + self.context, + self.zone.id, + self.zone.status, + self.zone.serial + ) + + +################### +# Periodic Recovery +################### + +class RecoverShard(base.Task): + """ + Given a beginning and ending shard, create the work to recover any + zones in an undesirable state within those shards. + + :return: No return value + """ + def __init__(self, executor, context, begin, end): + super(RecoverShard, self).__init__(executor) + self.context = context + self.begin_shard = begin + self.end_shard = end + + def _get_zones(self): + criterion = { + 'shard': "BETWEEN %s,%s" % (self.begin_shard, self.end_shard), + 'status': 'ERROR' + } + error_zones = self.storage.find_zones(self.context, criterion) + + # Include things that have been hanging out in PENDING + # status for longer than they should + # Generate the current serial, will provide a UTC Unix TS. + current = utils.increment_serial() + stale_criterion = { + 'shard': "BETWEEN %s,%s" % (self.begin_shard, self.end_shard), + 'status': 'PENDING', + 'serial': "<%s" % (current - self.max_prop_time) + } + + stale_zones = self.storage.find_zones(self.context, stale_criterion) + if stale_zones: + LOG.warn(_LW('Found %(len)d zones PENDING for more than %(sec)d ' + 'seconds'), {'len': len(stale_zones), + 'sec': self.max_prop_time}) + error_zones.extend(stale_zones) + + return error_zones + + def __call__(self): + zones = self._get_zones() + + for zone in zones: + if zone.action == 'CREATE': + self.worker_api.create_zone(self.context, zone) + elif zone.action == 'UPDATE': + self.worker_api.update_zone(self.context, zone) + elif zone.action == 'DELETE': + self.worker_api.delete_zone(self.context, zone) + + +############## +# Zone Exports +############## + +class ExportZone(base.Task): + """ + Given a zone, determine the proper method, based on size, and + perform the necessary actions to Export the zone, and update the + export row in storage via central. + """ + def __init__(self, executor, context, zone, export): + super(ExportZone, self).__init__(executor) + self.context = context + self.zone = zone + self.export = export + + def _synchronous_export(self): + return CONF['service:worker'].export_synchronous + + def _determine_export_method(self, context, export, size): + # NOTE(timsim): + # The logic here with swift will work like this: + # cfg.CONF.export_swift_enabled: + # An export will land in their swift container, even if it's + # small, but the link that comes back will be the synchronous + # link (unless export_syncronous is False, in which case it + # will behave like the next option) + # cfg.CONF.export_swift_preffered: + # The link that the user gets back will always be the swift + # container, and status of the export resource will depend + # on the Swift process. + # If the export is too large for synchronous, or synchronous is not + # enabled and swift is not enabled, it will fall through to ERROR + # swift = False + synchronous = self._synchronous_export() + + if synchronous: + try: + self.quota.limit_check( + context, context.tenant, api_export_size=size) + except exceptions.OverQuota: + LOG.debug('Zone Export too large to perform synchronously') + export.status = 'ERROR' + export.message = 'Zone is too large to export' + return export + + export.location = \ + 'designate://v2/zones/tasks/exports/%(eid)s/export' % \ + {'eid': export.id} + + export.status = 'COMPLETE' + else: + LOG.debug('No method found to export zone') + export.status = 'ERROR' + export.message = 'No suitable method for export' + + return export + + def __call__(self): + criterion = {'zone_id': self.zone.id} + count = self.storage.count_recordsets(self.context, criterion) + + export = self._determine_export_method( + self.context, self.export, count) + + self.central_api.update_zone_export(self.context, export) diff --git a/designate/worker/utils.py b/designate/worker/utils.py new file mode 100644 index 000000000..f82d5432e --- /dev/null +++ b/designate/worker/utils.py @@ -0,0 +1,82 @@ +# Copyright 2016 Rackspace Inc. +# +# Author: Tim Simmons +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License.mport threading +import dns +import dns.exception +import dns.query +from oslo_config import cfg +from oslo_log import log as logging + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +def prepare_msg(zone_name, rdatatype=dns.rdatatype.SOA, notify=False): + """ + Do the needful to set up a dns packet with dnspython + """ + dns_message = dns.message.make_query(zone_name, rdatatype) + if notify: + dns_message.set_opcode(dns.opcode.NOTIFY) + else: + dns_message.set_opcode(dns.opcode.QUERY) + return dns_message + + +def dig(zone_name, host, rdatatype, port=53): + """ + Set up and send a regular dns query, datatype configurable + """ + query = prepare_msg(zone_name, rdatatype=rdatatype) + + return send_dns_msg(query, host, port=port) + + +def notify(zone_name, host, port=53): + """ + Set up a notify packet and send it + """ + msg = prepare_msg(zone_name, notify=True) + + return send_dns_msg(msg, host, port=port) + + +def send_dns_msg(dns_message, host, port=53): + """ + Send the dns message and return the response + + :return: dns.Message of the response to the dns query + """ + # This can raise some exceptions, but we'll catch them elsewhere + if not CONF['service:mdns'].all_tcp: + return dns.query.udp( + dns_message, host, port=port, timeout=10) + else: + return dns.query.tcp( + dns_message, host, port=port, timeout=10) + + +def get_serial(zone_name, host, port=53): + """ + Possibly raises dns.exception.Timeout or dns.query.BadResponse. + Possibly returns 0 if, e.g., the answer section is empty. + """ + resp = dig(zone_name, host, dns.rdatatype.SOA, port=port) + if not resp.answer: + return 0 + rdataset = resp.answer[0].to_rdataset() + if not rdataset: + return 0 + return rdataset[0].serial diff --git a/designate/zone_manager/__init__.py b/designate/zone_manager/__init__.py deleted file mode 100644 index 2b9943b11..000000000 --- a/designate/zone_manager/__init__.py +++ /dev/null @@ -1,37 +0,0 @@ -# Copyright 2015 Hewlett-Packard Development Company, L.P. -# -# Author: Endre Karlson -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -from oslo_config import cfg - -CONF = cfg.CONF - -CONF.register_group(cfg.OptGroup( - name='service:zone_manager', title="Configuration for Zone Manager Service" -)) - -OPTS = [ - cfg.IntOpt('workers', - help='Number of Zone Manager worker processes to spawn'), - cfg.IntOpt('threads', default=1000, - help='Number of Zone Manager greenthreads to spawn'), - cfg.ListOpt('enabled_tasks', - help='Enabled tasks to run'), - cfg.StrOpt('storage-driver', default='sqlalchemy', - help='The storage driver to use'), - cfg.BoolOpt('export-synchronous', default=True, - help='Whether to allow synchronous zone exports'), -] - -CONF.register_opts(OPTS, group='service:zone_manager') diff --git a/designate/zone_manager/service.py b/designate/zone_manager/service.py deleted file mode 100644 index 6ba26d37c..000000000 --- a/designate/zone_manager/service.py +++ /dev/null @@ -1,154 +0,0 @@ -# Copyright 2015 Hewlett-Packard Development Company, L.P. -# -# Author: Endre Karlson -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -from oslo_config import cfg -from oslo_log import log as logging -import oslo_messaging as messaging - -from designate.i18n import _LI -from designate import coordination -from designate import exceptions -from designate import quota -from designate import service -from designate import storage -from designate import utils -from designate.central import rpcapi -from designate.zone_manager import tasks - - -LOG = logging.getLogger(__name__) -CONF = cfg.CONF - -NS = 'designate.periodic_tasks' - - -class Service(service.RPCService, coordination.CoordinationMixin, - service.Service): - RPC_API_VERSION = '1.0' - - target = messaging.Target(version=RPC_API_VERSION) - - @property - def storage(self): - if not hasattr(self, '_storage'): - storage_driver = cfg.CONF['service:zone_manager'].storage_driver - self._storage = storage.get_storage(storage_driver) - return self._storage - - @property - def quota(self): - if not hasattr(self, '_quota'): - # Get a quota manager instance - self._quota = quota.get_quota() - return self._quota - - @property - def service_name(self): - return 'zone_manager' - - @property - def central_api(self): - return rpcapi.CentralAPI.get_instance() - - def start(self): - super(Service, self).start() - - self._partitioner = coordination.Partitioner( - self._coordinator, self.service_name, self._coordination_id, - range(0, 4095)) - - self._partitioner.start() - self._partitioner.watch_partition_change(self._rebalance) - - enabled = CONF['service:zone_manager'].enabled_tasks - for task in tasks.PeriodicTask.get_extensions(enabled): - LOG.debug("Registering task %s", task) - - # Instantiate the task - task = task() - - # Subscribe for partition size updates. - self._partitioner.watch_partition_change(task.on_partition_change) - - interval = CONF[task.get_canonical_name()].interval - self.tg.add_timer(interval, task) - - def _rebalance(self, my_partitions, members, event): - LOG.info(_LI("Received rebalance event %s"), event) - self.partition_range = my_partitions - - # Begin RPC Implementation - - # Zone Export - def start_zone_export(self, context, zone, export): - criterion = {'zone_id': zone.id} - count = self.storage.count_recordsets(context, criterion) - - export = self._determine_export_method(context, export, count) - - self.central_api.update_zone_export(context, export) - - def render_zone(self, context, zone_id): - return self._export_zone(context, zone_id) - - def _determine_export_method(self, context, export, size): - synchronous = CONF['service:zone_manager'].export_synchronous - - # NOTE(timsim): - # The logic here with swift will work like this: - # cfg.CONF.export_swift_enabled: - # An export will land in their swift container, even if it's - # small, but the link that comes back will be the synchronous - # link (unless export_syncronous is False, in which case it - # will behave like the next option) - # cfg.CONF.export_swift_preffered: - # The link that the user gets back will always be the swift - # container, and status of the export resource will depend - # on the Swift process. - # If the export is too large for synchronous, or synchronous is not - # enabled and swift is not enabled, it will fall through to ERROR - # swift = False - - if synchronous: - try: - self.quota.limit_check( - context, context.tenant, api_export_size=size) - except exceptions.OverQuota: - LOG.debug('Zone Export too large to perform synchronously') - export['status'] = 'ERROR' - export['message'] = 'Zone is too large to export' - return export - - export['location'] = \ - "designate://v2/zones/tasks/exports/%(eid)s/export" % \ - {'eid': export['id']} - - export['status'] = 'COMPLETE' - else: - LOG.debug('No method found to export zone') - export['status'] = 'ERROR' - export['message'] = 'No suitable method for export' - - return export - - def _export_zone(self, context, zone_id): - zone = self.central_api.get_zone(context, zone_id) - - criterion = {'zone_id': zone_id} - recordsets = self.storage.find_recordsets_export(context, criterion) - - return utils.render_template('export-zone.jinja2', - zone=zone, - recordsets=recordsets) diff --git a/devstack/plugin.sh b/devstack/plugin.sh index 9f84bdbc4..b0207daf9 100755 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -96,6 +96,14 @@ function configure_designate { # mDNS Configuration iniset $DESIGNATE_CONF service:mdns listen ${DESIGNATE_SERVICE_HOST}:${DESIGNATE_SERVICE_PORT_MDNS} + # Worker Configuration + if is_service_enabled designate-worker; then + iniset $DESIGNATE_CONF service:worker enabled True + iniset $DESIGNATE_CONF service:worker notify True + iniset $DESIGNATE_CONF service:worker poll_max_retries $DESIGNATE_POLL_RETRIES + iniset $DESIGNATE_CONF service:worker poll_retry_interval $DESIGNATE_POLL_INTERVAL + fi + # Set up Notifications/Ceilometer Integration iniset $DESIGNATE_CONF DEFAULT notification_driver "$DESIGNATE_NOTIFICATION_DRIVER" iniset $DESIGNATE_CONF DEFAULT notification_topics "$DESIGNATE_NOTIFICATION_TOPICS" @@ -300,6 +308,9 @@ function start_designate { run_process designate-mdns "$DESIGNATE_BIN_DIR/designate-mdns --config-file $DESIGNATE_CONF" run_process designate-agent "$DESIGNATE_BIN_DIR/designate-agent --config-file $DESIGNATE_CONF" run_process designate-sink "$DESIGNATE_BIN_DIR/designate-sink --config-file $DESIGNATE_CONF" + run_process designate-worker "$DESIGNATE_BIN_DIR/designate-worker --config-file $DESIGNATE_CONF" + run_process designate-producer "$DESIGNATE_BIN_DIR/designate-producer --config-file $DESIGNATE_CONF" + # Start proxies if enabled if is_service_enabled designate-api && is_service_enabled tls-proxy; then @@ -321,6 +332,8 @@ function stop_designate { stop_process designate-mdns stop_process designate-agent stop_process designate-sink + stop_process designate-worker + stop_process designate-producer stop_designate_backend } diff --git a/devstack/settings b/devstack/settings index a171ac642..6c3295f99 100644 --- a/devstack/settings +++ b/devstack/settings @@ -9,6 +9,8 @@ DESIGNATE_NOTIFICATION_TOPICS=${DESIGNATE_NOTIFICATION_TOPICS:-notifications} DESIGNATE_PERIODIC_RECOVERY_INTERVAL=${DESIGNATE_PERIODIC_RECOVERY_INTERVAL:-120} DESIGNATE_PERIODIC_SYNC_INTERVAL=${DESIGNATE_PERIODIC_SYNC_INTERVAL:-1800} DESIGNATE_COORDINATION_URL=${DESIGNATE_COORDINATION_URL:-} +DESIGNATE_POLL_INTERVAL=${DESIGNATE_POLL_INTERVAL:-5} +DESIGNATE_POLL_RETRIES=${DESIGNATE_POLL_RETRIES:-6} # Quota Options DESIGNATE_QUOTA_ZONES=${DESIGNATE_QUOTA_ZONES:-100} diff --git a/etc/designate/designate.conf.sample b/etc/designate/designate.conf.sample index d46216a1e..baa38f82b 100644 --- a/etc/designate/designate.conf.sample +++ b/etc/designate/designate.conf.sample @@ -275,7 +275,7 @@ debug = False #----------------------- # Zone Manager Service #----------------------- -[service:zone_manager] +[service:producer] # Number of Zone Manager worker processes to spawn #workers = None @@ -292,7 +292,7 @@ debug = False #------------------------ # Deleted domains purging #------------------------ -[zone_manager_task:domain_purge] +[producer_task:domain_purge] # How frequently to purge deleted domains, in seconds #interval = 3600 # 1h @@ -305,12 +305,16 @@ debug = False #------------------------ # Delayed zones NOTIFY #------------------------ -[zone_manager_task:delayed_notify] +[producer_task:delayed_notify] # How frequently to scan for zones pending NOTIFY, in seconds #interval = 5 -# How many zones to receive NOTIFY on each run -#batch_size = 100 +#------------------------ +# Worker Periodic Recovery +#------------------------ +[producer_task:worker_periodic_recovery] +# How frequently to scan for zones pending NOTIFY, in seconds +#interval = 120 #----------------------- # Pool Manager Service @@ -365,6 +369,41 @@ debug = False # The cache driver to use #cache_driver = memcache + +#----------------------- +# Worker Service +#----------------------- +[service:worker] +# Whether to send events to worker instead of Pool Manager +# enabled = False + +# Number of Worker processes to spawn +#workers = None + +# Number of Worker greenthreads to spawn +#threads = 1000 + +# The percentage of servers requiring a successful update for a zone change +# to be considered active +#threshold_percentage = 100 + +# The time to wait for a response from a server +#poll_timeout = 30 + +# The time between retrying to send a request and waiting for a response from a +# server +#poll_retry_interval = 15 + +# The maximum number of times to retry sending a request and wait for a +# response from a server +#poll_max_retries = 10 + +# The time to wait before sending the first request to a server +#poll_delay = 5 + +# Whether to allow worker to send NOTIFYs. NOTIFY requests to mdns will noop +# notify = False + ################################### ## Pool Manager Cache Configuration ################################### diff --git a/setup.cfg b/setup.cfg index c1db2076d..c4a5da614 100644 --- a/setup.cfg +++ b/setup.cfg @@ -47,6 +47,8 @@ console_scripts = designate-zone-manager = designate.cmd.zone_manager:main designate-sink = designate.cmd.sink:main designate-agent = designate.cmd.agent:main + designate-worker = designate.cmd.worker:main + designate-producer = designate.cmd.producer:main designate.api.v1 = domains = designate.api.v1.domains:blueprint @@ -122,11 +124,12 @@ designate.manage = powerdns = designate.manage.powerdns:DatabaseCommands tlds = designate.manage.tlds:TLDCommands -designate.zone_manager_tasks = - zone_purge = designate.zone_manager.tasks:DeletedZonePurgeTask - periodic_exists = designate.zone_manager.tasks:PeriodicExistsTask - periodic_secondary_refresh = designate.zone_manager.tasks:PeriodicSecondaryRefreshTask - delayed_notify = designate.zone_manager.tasks:PeriodicGenerateDelayedNotifyTask +designate.producer_tasks = + zone_purge = designate.producer.tasks:DeletedZonePurgeTask + periodic_exists = designate.producer.tasks:PeriodicExistsTask + periodic_secondary_refresh = designate.producer.tasks:PeriodicSecondaryRefreshTask + delayed_notify = designate.producer.tasks:PeriodicGenerateDelayedNotifyTask + worker_periodic_recovery = designate.producer.tasks:WorkerPeriodicRecovery designate.heartbeat_emitter = noop = designate.service_status:NoopEmitter diff --git a/tests-py3.txt b/tests-py3.txt index de3022676..256b536ac 100644 --- a/tests-py3.txt +++ b/tests-py3.txt @@ -18,5 +18,5 @@ designate.tests.unit.test_api.test_api_v2 designate.tests.unit.test_backend.test_designate designate.tests.unit.test_central.test_basic designate.tests.unit.test_pool -designate.tests.unit.test_zone_manager.test_tasks +designate.tests.unit.test_producer.test_tasks