From 565e5026eb39a06b86636661176e21cd9013b90a Mon Sep 17 00:00:00 2001 From: rjrjr Date: Wed, 21 Jan 2015 10:02:21 -0700 Subject: [PATCH] Ensure Pool Manager Cache is Cleared When actions (create, delete, update) are completed for a domain, remove the statuses from the pool manager cache. Change-Id: Icb452e3c83dca5bbcbf38ab44394b61a26867a3b Closes-Bug: #1408202 --- designate/pool_manager/service.py | 231 ++++++++++-------- .../tests/test_pool_manager/test_service.py | 101 ++++---- 2 files changed, 170 insertions(+), 162 deletions(-) diff --git a/designate/pool_manager/service.py b/designate/pool_manager/service.py index c904a9603..44ff40255 100644 --- a/designate/pool_manager/service.py +++ b/designate/pool_manager/service.py @@ -19,6 +19,7 @@ from decimal import Decimal from oslo.config import cfg from oslo import messaging from oslo_log import log as logging +from oslo_concurrency import lockutils from designate import backend from designate import exceptions @@ -40,6 +41,7 @@ ERROR_STATUS = 'ERROR' CREATE_ACTION = 'CREATE' DELETE_ACTION = 'DELETE' UPDATE_ACTION = 'UPDATE' +MAXIMUM_THRESHOLD = 100 @contextmanager @@ -161,7 +163,7 @@ class Service(service.RPCService): for server_backend in self.server_backends: server = server_backend['server'] - create_status = self._create_create_status(server, domain) + create_status = self._build_create_status(server, domain) self._create_domain_on_server( context, create_status, domain, server_backend) @@ -175,11 +177,12 @@ class Service(service.RPCService): for server_backend in self.server_backends: server = server_backend['server'] - delete_status = self._create_delete_status(server, domain) + delete_status = self._build_delete_status(server, domain) self._delete_domain_on_server( context, delete_status, domain, server_backend) - if not self._is_delete_consensus(context, domain): + if self._is_in_cache(context, domain, DELETE_ACTION) \ + and not self._is_delete_consensus(context, domain): status = ERROR_STATUS LOG.warn(_LW('Consensus not reached ' 'for deleting domain %(domain)s') % @@ -211,40 +214,49 @@ class Service(service.RPCService): """ LOG.debug("Calling update_status for %s" % domain.name) - update_status = self._retrieve_from_cache( - context, server, domain, UPDATE_ACTION) - cache_serial = update_status.serial_number + with lockutils.lock('update-status-%s' % domain.id): + try: + update_status = self._retrieve_one_from_cache( + context, server, domain, UPDATE_ACTION) + except exceptions.PoolManagerStatusNotFound: + update_status = self._build_update_status(server, domain) + self._store_in_cache(context, update_status) + cache_serial = update_status.serial_number - LOG.debug('For domain %s on server %s the cache serial is %s ' - 'and the actual serial is %s.' % - (domain.name, self._get_destination(server), - cache_serial, actual_serial)) - if actual_serial and cache_serial < actual_serial: - update_status.status = status - update_status.serial_number = actual_serial - self._store_in_cache(context, update_status) + LOG.debug('For domain %s on server %s the cache serial is %s ' + 'and the actual serial is %s.' % + (domain.name, self._get_destination(server), + cache_serial, actual_serial)) + if actual_serial and cache_serial < actual_serial: + update_status.status = status + update_status.serial_number = actual_serial + self._store_in_cache(context, update_status) - consensus_serial = self._get_consensus_serial(context, domain) + consensus_serial = self._get_consensus_serial(context, domain) - if cache_serial < consensus_serial: - LOG.info(_LI('For domain %(domain)s ' - 'the consensus serial is %(consensus_serial)s.') % - {'domain': domain.name, - 'consensus_serial': consensus_serial}) - - self.central_api.update_status( - context, domain.id, SUCCESS_STATUS, consensus_serial) - - if status == ERROR_STATUS: - error_serial = self._get_error_serial( - context, domain, consensus_serial) - if error_serial > consensus_serial or error_serial == 0: - LOG.warn(_LW('For domain %(domain)s ' - 'the error serial is %(error_serial)s.') % + if cache_serial < consensus_serial: + LOG.info(_LI('For domain %(domain)s ' + 'the consensus serial is %(consensus_serial)s.') % {'domain': domain.name, - 'error_serial': error_serial}) + 'consensus_serial': consensus_serial}) self.central_api.update_status( - context, domain.id, ERROR_STATUS, error_serial) + context, domain.id, SUCCESS_STATUS, consensus_serial) + + if status == ERROR_STATUS: + error_serial = self._get_error_serial( + context, domain, consensus_serial) + if error_serial > consensus_serial or error_serial == 0: + LOG.warn(_LW('For domain %(domain)s ' + 'the error serial is %(error_serial)s.') % + {'domain': domain.name, + 'error_serial': error_serial}) + self.central_api.update_status( + context, domain.id, ERROR_STATUS, error_serial) + + if consensus_serial == domain.serial \ + and self._is_update_consensus(context, domain, + MAXIMUM_THRESHOLD): + self._clear_cache(context, domain, UPDATE_ACTION) def periodic_recovery(self): """ @@ -255,8 +267,8 @@ class Service(service.RPCService): context = DesignateContext.get_admin_context(all_tenants=True) try: - self._periodic_create_domains_that_failed(context) self._periodic_delete_domains_that_failed(context) + self._periodic_create_domains_that_failed(context) self._periodic_update_domains_that_failed(context) except Exception: LOG.exception(_LE('An unhandled exception in periodic recovery ' @@ -301,18 +313,13 @@ class Service(service.RPCService): {'domain': domain.name, 'server': self._get_destination(server)}) - update_status = self._create_update_status(server, domain) - update_status.serial_number = 0 - # Setting the update status to ERROR ensures the periodic - # recovery is run if there is a problem. - update_status.status = ERROR_STATUS - self._store_in_cache(context, update_status) + if self._is_create_consensus(context, domain, MAXIMUM_THRESHOLD): + self._clear_cache(context, domain, CREATE_ACTION) # PowerDNS needs to explicitly send a NOTIFY for the AXFR to # happen whereas BIND9 does an AXFR implicitly after the domain # is created. Sending a NOTIFY for all cases. - self._notify_zone_changed(context, domain, server) - self._poll_for_serial_number(context, domain, server) + self._update_domain_on_server(context, domain, server_backend) except exceptions.Backend: create_status.status = ERROR_STATUS self._store_in_cache(context, create_status) @@ -323,8 +330,8 @@ class Service(service.RPCService): def _periodic_create_domains_that_failed(self, context): - create_statuses = self._find_pool_manager_statuses( - context, CREATE_ACTION, status=ERROR_STATUS) + create_statuses = self._retrieve_from_cache( + context, action=CREATE_ACTION, status=ERROR_STATUS) for create_status in create_statuses: domain = self.central_api.get_domain( @@ -349,6 +356,7 @@ class Service(service.RPCService): 'from server %(server)s.') % {'domain': domain.name, 'server': self._get_destination(server)}) + if not consensus_existed \ and self._is_delete_consensus(context, domain): LOG.info(_LI('Consensus reached ' @@ -356,6 +364,10 @@ class Service(service.RPCService): {'domain': domain.name}) self.central_api.update_status( context, domain.id, SUCCESS_STATUS, domain.serial) + + if self._is_delete_consensus(context, domain, + MAXIMUM_THRESHOLD): + self._clear_cache(context, domain) except exceptions.Backend: delete_status.status = ERROR_STATUS self._store_in_cache(context, delete_status) @@ -366,8 +378,8 @@ class Service(service.RPCService): def _periodic_delete_domains_that_failed(self, context): - delete_statuses = self._find_pool_manager_statuses( - context, DELETE_ACTION, status=ERROR_STATUS) + delete_statuses = self._retrieve_from_cache( + context, action=DELETE_ACTION, status=ERROR_STATUS) # Used to retrieve a domain from Central that may have already been # "deleted". @@ -384,30 +396,17 @@ class Service(service.RPCService): server = server_backend['server'] - try: - update_status = self._retrieve_from_cache( - context, server, domain, UPDATE_ACTION) - if update_status.status == ERROR_STATUS \ - or update_status.serial_number < domain.serial: - self._notify_zone_changed(context, domain, server) - self._poll_for_serial_number(context, domain, server) - LOG.info(_LI('Updating domain %(domain)s ' - 'on server %(server)s.') % - {'domain': domain.name, - 'server': self._get_destination(server)}) - else: - # TODO(Ron): Do not log this warning on a periodic_sync. - LOG.warn(_LW('No need to update domain %(domain)s ' - 'on server %(server)s.') % - {'domain': domain.name, - 'server': self._get_destination(server)}) - except exceptions.PoolManagerStatusNotFound: - pass + self._notify_zone_changed(context, domain, server) + self._poll_for_serial_number(context, domain, server) + LOG.info(_LI('Updating domain %(domain)s ' + 'on server %(server)s.') % + {'domain': domain.name, + 'server': self._get_destination(server)}) def _periodic_update_domains_that_failed(self, context): - update_statuses = self._find_pool_manager_statuses( - context, UPDATE_ACTION, status=ERROR_STATUS) + update_statuses = self._retrieve_from_cache( + context, action=UPDATE_ACTION, status=ERROR_STATUS) for update_status in update_statuses: domain = self.central_api.get_domain( @@ -439,22 +438,9 @@ class Service(service.RPCService): def _percentage(count, total_count): return (Decimal(count) / Decimal(total_count)) * Decimal(100) - def _exceed_or_meet_threshold(self, count): + def _exceed_or_meet_threshold(self, count, threshold): return self._percentage( - count, len(self.server_backends)) >= Decimal(self.threshold) - - def _find_pool_manager_statuses(self, context, action, - domain=None, status=None): - criterion = { - 'action': action - } - if domain: - criterion['domain_id'] = domain.id - if status: - criterion['status'] = status - - return self.cache.find_pool_manager_statuses( - context, criterion=criterion) + count, len(self.server_backends)) >= Decimal(threshold) @staticmethod def _get_sorted_serials(pool_manager_statuses, descending=False): @@ -470,28 +456,39 @@ class Service(service.RPCService): def _get_serials_descending(self, pool_manager_statuses): return self._get_sorted_serials(pool_manager_statuses, descending=True) - def _is_success_consensus(self, context, domain, action): + def _is_success_consensus(self, context, domain, action, threshold=None): success_count = 0 - pool_manager_statuses = self._find_pool_manager_statuses( - context, action, domain=domain) + pool_manager_statuses = self._retrieve_from_cache( + context, domain=domain, action=action) for pool_manager_status in pool_manager_statuses: if pool_manager_status.status == SUCCESS_STATUS: success_count += 1 - return self._exceed_or_meet_threshold(success_count) + if threshold is None: + threshold = self.threshold + return self._exceed_or_meet_threshold(success_count, threshold) - def _is_delete_consensus(self, context, domain): - return self._is_success_consensus(context, domain, DELETE_ACTION) + def _is_create_consensus(self, context, domain, threshold=None): + return self._is_success_consensus( + context, domain, CREATE_ACTION, threshold) + + def _is_delete_consensus(self, context, domain, threshold=None): + return self._is_success_consensus( + context, domain, DELETE_ACTION, threshold) + + def _is_update_consensus(self, context, domain, threshold=None): + return self._is_success_consensus( + context, domain, UPDATE_ACTION, threshold) def _get_consensus_serial(self, context, domain): consensus_serial = 0 - update_statuses = self._find_pool_manager_statuses( - context, UPDATE_ACTION, domain=domain) + update_statuses = self._retrieve_from_cache( + context, domain=domain, action=UPDATE_ACTION) for serial in self._get_serials_descending(update_statuses): serial_count = 0 for update_status in update_statuses: if update_status.serial_number >= serial: serial_count += 1 - if self._exceed_or_meet_threshold(serial_count): + if self._exceed_or_meet_threshold(serial_count, self.threshold): consensus_serial = serial break return consensus_serial @@ -499,8 +496,8 @@ class Service(service.RPCService): def _get_error_serial(self, context, domain, consensus_serial): error_serial = 0 if not self._is_success_consensus(context, domain, UPDATE_ACTION): - update_statuses = self._find_pool_manager_statuses( - context, UPDATE_ACTION, domain=domain) + update_statuses = self._retrieve_from_cache( + context, domain=domain, action=UPDATE_ACTION) for serial in self._get_serials_ascending(update_statuses): if serial > consensus_serial: error_serial = serial @@ -508,26 +505,56 @@ class Service(service.RPCService): return error_serial @staticmethod - def _create_pool_manager_status(server, domain, action): + def _build_status_object(server, domain, action, serial_number, + status=None): values = { 'server_id': server.id, 'domain_id': domain.id, - 'status': None, - 'serial_number': domain.serial, + 'status': status, + 'serial_number': serial_number, 'action': action } return objects.PoolManagerStatus(**values) - def _create_create_status(self, server, domain): - return self._create_pool_manager_status(server, domain, CREATE_ACTION) + def _build_create_status(self, server, domain): + return self._build_status_object( + server, domain, CREATE_ACTION, domain.serial) - def _create_delete_status(self, server, domain): - return self._create_pool_manager_status(server, domain, DELETE_ACTION) + def _build_delete_status(self, server, domain): + return self._build_status_object( + server, domain, DELETE_ACTION, domain.serial) - def _create_update_status(self, server, domain): - return self._create_pool_manager_status(server, domain, UPDATE_ACTION) + def _build_update_status(self, server, domain): + # Setting the update status to ERROR ensures the periodic + # recovery is run if there is a problem. + return self._build_status_object( + server, domain, UPDATE_ACTION, 0, status='ERROR') - def _retrieve_from_cache(self, context, server, domain, action): + # Methods for manipulating the cache. + def _clear_cache(self, context, domain, action=None): + pool_manager_statuses = self._retrieve_from_cache( + context, domain=domain, action=action) + for pool_manager_status in pool_manager_statuses: + self.cache.delete_pool_manager_status( + context, pool_manager_status.id) + + def _is_in_cache(self, context, domain, action): + return len(self._retrieve_from_cache( + context, domain=domain, action=action)) > 0 + + def _retrieve_from_cache(self, context, + domain=None, action=None, status=None): + criterion = {} + if domain: + criterion['domain_id'] = domain.id + if action: + criterion['action'] = action + if status: + criterion['status'] = status + return self.cache.find_pool_manager_statuses( + context, criterion=criterion) + + def _retrieve_one_from_cache(self, context, server, domain, action): criterion = { 'server_id': server.id, 'domain_id': domain.id, diff --git a/designate/tests/test_pool_manager/test_service.py b/designate/tests/test_pool_manager/test_service.py index fe6c0a85a..bcdf3185d 100644 --- a/designate/tests/test_pool_manager/test_service.py +++ b/designate/tests/test_pool_manager/test_service.py @@ -96,17 +96,7 @@ class PoolManagerServiceTest(PoolManagerTestCase): create_statuses = self._find_pool_manager_statuses( self.admin_context, 'CREATE', domain) - self.assertEqual(2, len(create_statuses)) - self.assertEqual('SUCCESS', create_statuses[0].status) - self.assertEqual('SUCCESS', create_statuses[1].status) - - update_statuses = self._find_pool_manager_statuses( - self.admin_context, 'UPDATE', domain) - self.assertEqual(2, len(update_statuses)) - self.assertEqual('ERROR', update_statuses[0].status) - self.assertEqual(0, update_statuses[0].serial_number) - self.assertEqual('ERROR', update_statuses[1].status) - self.assertEqual(0, update_statuses[1].serial_number) + self.assertEqual(0, len(create_statuses)) # Ensure notify_zone_changed and poll_for_serial_number # was called for each backend server. @@ -176,12 +166,6 @@ class PoolManagerServiceTest(PoolManagerTestCase): self.assertEqual('SUCCESS', create_statuses[0].status) self.assertEqual('ERROR', create_statuses[1].status) - update_statuses = self._find_pool_manager_statuses( - self.admin_context, 'UPDATE', domain) - self.assertEqual(1, len(update_statuses)) - self.assertEqual('ERROR', update_statuses[0].status) - self.assertEqual(0, update_statuses[0].serial_number) - mock_notify_zone_changed.assert_called_once_with( self.admin_context, domain, self.service.server_backends[0]['server'], 30, 2, 3, 0) @@ -197,17 +181,13 @@ class PoolManagerServiceTest(PoolManagerTestCase): create_statuses = self._find_pool_manager_statuses( self.admin_context, 'CREATE', domain) - self.assertEqual(2, len(create_statuses)) - self.assertEqual('SUCCESS', create_statuses[0].status) - self.assertEqual('SUCCESS', create_statuses[1].status) + self.assertEqual(0, len(create_statuses)) self.service.delete_domain(self.admin_context, domain) delete_statuses = self._find_pool_manager_statuses( self.admin_context, 'DELETE', domain) - self.assertEqual(2, len(delete_statuses)) - self.assertEqual('SUCCESS', delete_statuses[0].status) - self.assertEqual('SUCCESS', delete_statuses[1].status) + self.assertEqual(0, len(delete_statuses)) mock_update_status.assert_called_once_with( self.admin_context, domain.id, 'SUCCESS', domain.serial) @@ -222,9 +202,7 @@ class PoolManagerServiceTest(PoolManagerTestCase): create_statuses = self._find_pool_manager_statuses( self.admin_context, 'CREATE', domain) - self.assertEqual(2, len(create_statuses)) - self.assertEqual('SUCCESS', create_statuses[0].status) - self.assertEqual('SUCCESS', create_statuses[1].status) + self.assertEqual(0, len(create_statuses)) mock_delete_domain.side_effect = exceptions.Backend @@ -249,9 +227,7 @@ class PoolManagerServiceTest(PoolManagerTestCase): create_statuses = self._find_pool_manager_statuses( self.admin_context, 'CREATE', domain) - self.assertEqual(2, len(create_statuses)) - self.assertEqual('SUCCESS', create_statuses[0].status) - self.assertEqual('SUCCESS', create_statuses[1].status) + self.assertEqual(0, len(create_statuses)) mock_delete_domain.side_effect = [None, exceptions.Backend] @@ -266,6 +242,38 @@ class PoolManagerServiceTest(PoolManagerTestCase): mock_update_status.assert_called_once_with( self.admin_context, domain.id, 'ERROR', domain.serial) + @patch.object(impl_fake.FakeBackend, 'delete_domain') + @patch.object(central_rpcapi.CentralAPI, 'update_status') + def test_delete_domain_backend_one_failure_success( + self, mock_update_status, mock_delete_domain): + + self.service.stop() + self.config( + threshold_percentage=50, + group='service:pool_manager') + self.service = self.start_service('pool_manager') + + domain = self.create_domain(name='example.org.') + + self.service.create_domain(self.admin_context, domain) + + create_statuses = self._find_pool_manager_statuses( + self.admin_context, 'CREATE', domain) + self.assertEqual(0, len(create_statuses)) + + mock_delete_domain.side_effect = [None, exceptions.Backend] + + self.service.delete_domain(self.admin_context, domain) + + delete_statuses = self._find_pool_manager_statuses( + self.admin_context, 'DELETE', domain) + self.assertEqual(2, len(delete_statuses)) + self.assertEqual('SUCCESS', delete_statuses[0].status) + self.assertEqual('ERROR', delete_statuses[1].status) + + mock_update_status.assert_called_once_with( + self.admin_context, domain.id, 'SUCCESS', domain.serial) + @patch.object(mdns_rpcapi.MdnsAPI, 'poll_for_serial_number') @patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed') def test_update_domain(self, mock_notify_zone_changed, @@ -276,9 +284,7 @@ class PoolManagerServiceTest(PoolManagerTestCase): create_statuses = self._find_pool_manager_statuses( self.admin_context, 'CREATE', domain) - self.assertEqual(2, len(create_statuses)) - self.assertEqual('SUCCESS', create_statuses[0].status) - self.assertEqual('SUCCESS', create_statuses[1].status) + self.assertEqual(0, len(create_statuses)) # Reset the mock call attributes. mock_notify_zone_changed.reset_mock() @@ -311,17 +317,7 @@ class PoolManagerServiceTest(PoolManagerTestCase): create_statuses = self._find_pool_manager_statuses( self.admin_context, 'CREATE', domain) - self.assertEqual(2, len(create_statuses)) - self.assertEqual('SUCCESS', create_statuses[0].status) - self.assertEqual('SUCCESS', create_statuses[1].status) - - update_statuses = self._find_pool_manager_statuses( - self.admin_context, 'UPDATE', domain) - self.assertEqual(2, len(update_statuses)) - self.assertEqual('ERROR', update_statuses[0].status) - self.assertEqual(0, update_statuses[0].serial_number) - self.assertEqual('ERROR', update_statuses[1].status) - self.assertEqual(0, update_statuses[1].serial_number) + self.assertEqual(0, len(create_statuses)) self.service.update_status(self.admin_context, domain, self.service.server_backends[0]['server'], @@ -329,11 +325,9 @@ class PoolManagerServiceTest(PoolManagerTestCase): update_statuses = self._find_pool_manager_statuses( self.admin_context, 'UPDATE', domain) - self.assertEqual(2, len(update_statuses)) + self.assertEqual(1, len(update_statuses)) self.assertEqual('SUCCESS', update_statuses[0].status) self.assertEqual(domain.serial, update_statuses[0].serial_number) - self.assertEqual('ERROR', update_statuses[1].status) - self.assertEqual(0, update_statuses[1].serial_number) # Ensure update_status was not called. self.assertEqual(False, mock_update_status.called) @@ -344,24 +338,11 @@ class PoolManagerServiceTest(PoolManagerTestCase): update_statuses = self._find_pool_manager_statuses( self.admin_context, 'UPDATE', domain) - self.assertEqual(2, len(update_statuses)) - self.assertEqual('SUCCESS', update_statuses[0].status) - self.assertEqual(domain.serial, update_statuses[0].serial_number) - self.assertEqual('SUCCESS', update_statuses[1].status) - self.assertEqual(domain.serial, update_statuses[1].serial_number) + self.assertEqual(0, len(update_statuses)) mock_update_status.assert_called_once_with( self.admin_context, domain.id, 'SUCCESS', domain.serial) - def test_update_status_missing_status(self): - domain = self.create_domain(name='example.org.') - - with testtools.ExpectedException(exceptions.PoolManagerStatusNotFound): - self.service.update_status( - self.admin_context, domain, - self.service.server_backends[0]['server'], 'SUCCESS', - domain.serial) - def _find_pool_manager_statuses(self, context, action, domain=None, status=None): criterion = {