Ensure Pool Manager Cache is Cleared

When actions (create, delete, update) are completed for a domain, remove
the statuses from the pool manager cache.

Change-Id: Icb452e3c83dca5bbcbf38ab44394b61a26867a3b
Closes-Bug: #1408202
This commit is contained in:
rjrjr 2015-01-21 10:02:21 -07:00
parent 8f59a264ec
commit 565e5026eb
2 changed files with 170 additions and 162 deletions

View File

@ -19,6 +19,7 @@ from decimal import Decimal
from oslo.config import cfg
from oslo import messaging
from oslo_log import log as logging
from oslo_concurrency import lockutils
from designate import backend
from designate import exceptions
@ -40,6 +41,7 @@ ERROR_STATUS = 'ERROR'
CREATE_ACTION = 'CREATE'
DELETE_ACTION = 'DELETE'
UPDATE_ACTION = 'UPDATE'
MAXIMUM_THRESHOLD = 100
@contextmanager
@ -161,7 +163,7 @@ class Service(service.RPCService):
for server_backend in self.server_backends:
server = server_backend['server']
create_status = self._create_create_status(server, domain)
create_status = self._build_create_status(server, domain)
self._create_domain_on_server(
context, create_status, domain, server_backend)
@ -175,11 +177,12 @@ class Service(service.RPCService):
for server_backend in self.server_backends:
server = server_backend['server']
delete_status = self._create_delete_status(server, domain)
delete_status = self._build_delete_status(server, domain)
self._delete_domain_on_server(
context, delete_status, domain, server_backend)
if not self._is_delete_consensus(context, domain):
if self._is_in_cache(context, domain, DELETE_ACTION) \
and not self._is_delete_consensus(context, domain):
status = ERROR_STATUS
LOG.warn(_LW('Consensus not reached '
'for deleting domain %(domain)s') %
@ -211,8 +214,13 @@ class Service(service.RPCService):
"""
LOG.debug("Calling update_status for %s" % domain.name)
update_status = self._retrieve_from_cache(
with lockutils.lock('update-status-%s' % domain.id):
try:
update_status = self._retrieve_one_from_cache(
context, server, domain, UPDATE_ACTION)
except exceptions.PoolManagerStatusNotFound:
update_status = self._build_update_status(server, domain)
self._store_in_cache(context, update_status)
cache_serial = update_status.serial_number
LOG.debug('For domain %s on server %s the cache serial is %s '
@ -231,7 +239,6 @@ class Service(service.RPCService):
'the consensus serial is %(consensus_serial)s.') %
{'domain': domain.name,
'consensus_serial': consensus_serial})
self.central_api.update_status(
context, domain.id, SUCCESS_STATUS, consensus_serial)
@ -246,6 +253,11 @@ class Service(service.RPCService):
self.central_api.update_status(
context, domain.id, ERROR_STATUS, error_serial)
if consensus_serial == domain.serial \
and self._is_update_consensus(context, domain,
MAXIMUM_THRESHOLD):
self._clear_cache(context, domain, UPDATE_ACTION)
def periodic_recovery(self):
"""
:return:
@ -255,8 +267,8 @@ class Service(service.RPCService):
context = DesignateContext.get_admin_context(all_tenants=True)
try:
self._periodic_create_domains_that_failed(context)
self._periodic_delete_domains_that_failed(context)
self._periodic_create_domains_that_failed(context)
self._periodic_update_domains_that_failed(context)
except Exception:
LOG.exception(_LE('An unhandled exception in periodic recovery '
@ -301,18 +313,13 @@ class Service(service.RPCService):
{'domain': domain.name,
'server': self._get_destination(server)})
update_status = self._create_update_status(server, domain)
update_status.serial_number = 0
# Setting the update status to ERROR ensures the periodic
# recovery is run if there is a problem.
update_status.status = ERROR_STATUS
self._store_in_cache(context, update_status)
if self._is_create_consensus(context, domain, MAXIMUM_THRESHOLD):
self._clear_cache(context, domain, CREATE_ACTION)
# PowerDNS needs to explicitly send a NOTIFY for the AXFR to
# happen whereas BIND9 does an AXFR implicitly after the domain
# is created. Sending a NOTIFY for all cases.
self._notify_zone_changed(context, domain, server)
self._poll_for_serial_number(context, domain, server)
self._update_domain_on_server(context, domain, server_backend)
except exceptions.Backend:
create_status.status = ERROR_STATUS
self._store_in_cache(context, create_status)
@ -323,8 +330,8 @@ class Service(service.RPCService):
def _periodic_create_domains_that_failed(self, context):
create_statuses = self._find_pool_manager_statuses(
context, CREATE_ACTION, status=ERROR_STATUS)
create_statuses = self._retrieve_from_cache(
context, action=CREATE_ACTION, status=ERROR_STATUS)
for create_status in create_statuses:
domain = self.central_api.get_domain(
@ -349,6 +356,7 @@ class Service(service.RPCService):
'from server %(server)s.') %
{'domain': domain.name,
'server': self._get_destination(server)})
if not consensus_existed \
and self._is_delete_consensus(context, domain):
LOG.info(_LI('Consensus reached '
@ -356,6 +364,10 @@ class Service(service.RPCService):
{'domain': domain.name})
self.central_api.update_status(
context, domain.id, SUCCESS_STATUS, domain.serial)
if self._is_delete_consensus(context, domain,
MAXIMUM_THRESHOLD):
self._clear_cache(context, domain)
except exceptions.Backend:
delete_status.status = ERROR_STATUS
self._store_in_cache(context, delete_status)
@ -366,8 +378,8 @@ class Service(service.RPCService):
def _periodic_delete_domains_that_failed(self, context):
delete_statuses = self._find_pool_manager_statuses(
context, DELETE_ACTION, status=ERROR_STATUS)
delete_statuses = self._retrieve_from_cache(
context, action=DELETE_ACTION, status=ERROR_STATUS)
# Used to retrieve a domain from Central that may have already been
# "deleted".
@ -384,30 +396,17 @@ class Service(service.RPCService):
server = server_backend['server']
try:
update_status = self._retrieve_from_cache(
context, server, domain, UPDATE_ACTION)
if update_status.status == ERROR_STATUS \
or update_status.serial_number < domain.serial:
self._notify_zone_changed(context, domain, server)
self._poll_for_serial_number(context, domain, server)
LOG.info(_LI('Updating domain %(domain)s '
'on server %(server)s.') %
{'domain': domain.name,
'server': self._get_destination(server)})
else:
# TODO(Ron): Do not log this warning on a periodic_sync.
LOG.warn(_LW('No need to update domain %(domain)s '
'on server %(server)s.') %
{'domain': domain.name,
'server': self._get_destination(server)})
except exceptions.PoolManagerStatusNotFound:
pass
def _periodic_update_domains_that_failed(self, context):
update_statuses = self._find_pool_manager_statuses(
context, UPDATE_ACTION, status=ERROR_STATUS)
update_statuses = self._retrieve_from_cache(
context, action=UPDATE_ACTION, status=ERROR_STATUS)
for update_status in update_statuses:
domain = self.central_api.get_domain(
@ -439,22 +438,9 @@ class Service(service.RPCService):
def _percentage(count, total_count):
return (Decimal(count) / Decimal(total_count)) * Decimal(100)
def _exceed_or_meet_threshold(self, count):
def _exceed_or_meet_threshold(self, count, threshold):
return self._percentage(
count, len(self.server_backends)) >= Decimal(self.threshold)
def _find_pool_manager_statuses(self, context, action,
domain=None, status=None):
criterion = {
'action': action
}
if domain:
criterion['domain_id'] = domain.id
if status:
criterion['status'] = status
return self.cache.find_pool_manager_statuses(
context, criterion=criterion)
count, len(self.server_backends)) >= Decimal(threshold)
@staticmethod
def _get_sorted_serials(pool_manager_statuses, descending=False):
@ -470,28 +456,39 @@ class Service(service.RPCService):
def _get_serials_descending(self, pool_manager_statuses):
return self._get_sorted_serials(pool_manager_statuses, descending=True)
def _is_success_consensus(self, context, domain, action):
def _is_success_consensus(self, context, domain, action, threshold=None):
success_count = 0
pool_manager_statuses = self._find_pool_manager_statuses(
context, action, domain=domain)
pool_manager_statuses = self._retrieve_from_cache(
context, domain=domain, action=action)
for pool_manager_status in pool_manager_statuses:
if pool_manager_status.status == SUCCESS_STATUS:
success_count += 1
return self._exceed_or_meet_threshold(success_count)
if threshold is None:
threshold = self.threshold
return self._exceed_or_meet_threshold(success_count, threshold)
def _is_delete_consensus(self, context, domain):
return self._is_success_consensus(context, domain, DELETE_ACTION)
def _is_create_consensus(self, context, domain, threshold=None):
return self._is_success_consensus(
context, domain, CREATE_ACTION, threshold)
def _is_delete_consensus(self, context, domain, threshold=None):
return self._is_success_consensus(
context, domain, DELETE_ACTION, threshold)
def _is_update_consensus(self, context, domain, threshold=None):
return self._is_success_consensus(
context, domain, UPDATE_ACTION, threshold)
def _get_consensus_serial(self, context, domain):
consensus_serial = 0
update_statuses = self._find_pool_manager_statuses(
context, UPDATE_ACTION, domain=domain)
update_statuses = self._retrieve_from_cache(
context, domain=domain, action=UPDATE_ACTION)
for serial in self._get_serials_descending(update_statuses):
serial_count = 0
for update_status in update_statuses:
if update_status.serial_number >= serial:
serial_count += 1
if self._exceed_or_meet_threshold(serial_count):
if self._exceed_or_meet_threshold(serial_count, self.threshold):
consensus_serial = serial
break
return consensus_serial
@ -499,8 +496,8 @@ class Service(service.RPCService):
def _get_error_serial(self, context, domain, consensus_serial):
error_serial = 0
if not self._is_success_consensus(context, domain, UPDATE_ACTION):
update_statuses = self._find_pool_manager_statuses(
context, UPDATE_ACTION, domain=domain)
update_statuses = self._retrieve_from_cache(
context, domain=domain, action=UPDATE_ACTION)
for serial in self._get_serials_ascending(update_statuses):
if serial > consensus_serial:
error_serial = serial
@ -508,26 +505,56 @@ class Service(service.RPCService):
return error_serial
@staticmethod
def _create_pool_manager_status(server, domain, action):
def _build_status_object(server, domain, action, serial_number,
status=None):
values = {
'server_id': server.id,
'domain_id': domain.id,
'status': None,
'serial_number': domain.serial,
'status': status,
'serial_number': serial_number,
'action': action
}
return objects.PoolManagerStatus(**values)
def _create_create_status(self, server, domain):
return self._create_pool_manager_status(server, domain, CREATE_ACTION)
def _build_create_status(self, server, domain):
return self._build_status_object(
server, domain, CREATE_ACTION, domain.serial)
def _create_delete_status(self, server, domain):
return self._create_pool_manager_status(server, domain, DELETE_ACTION)
def _build_delete_status(self, server, domain):
return self._build_status_object(
server, domain, DELETE_ACTION, domain.serial)
def _create_update_status(self, server, domain):
return self._create_pool_manager_status(server, domain, UPDATE_ACTION)
def _build_update_status(self, server, domain):
# Setting the update status to ERROR ensures the periodic
# recovery is run if there is a problem.
return self._build_status_object(
server, domain, UPDATE_ACTION, 0, status='ERROR')
def _retrieve_from_cache(self, context, server, domain, action):
# Methods for manipulating the cache.
def _clear_cache(self, context, domain, action=None):
pool_manager_statuses = self._retrieve_from_cache(
context, domain=domain, action=action)
for pool_manager_status in pool_manager_statuses:
self.cache.delete_pool_manager_status(
context, pool_manager_status.id)
def _is_in_cache(self, context, domain, action):
return len(self._retrieve_from_cache(
context, domain=domain, action=action)) > 0
def _retrieve_from_cache(self, context,
domain=None, action=None, status=None):
criterion = {}
if domain:
criterion['domain_id'] = domain.id
if action:
criterion['action'] = action
if status:
criterion['status'] = status
return self.cache.find_pool_manager_statuses(
context, criterion=criterion)
def _retrieve_one_from_cache(self, context, server, domain, action):
criterion = {
'server_id': server.id,
'domain_id': domain.id,

View File

@ -96,17 +96,7 @@ class PoolManagerServiceTest(PoolManagerTestCase):
create_statuses = self._find_pool_manager_statuses(
self.admin_context, 'CREATE', domain)
self.assertEqual(2, len(create_statuses))
self.assertEqual('SUCCESS', create_statuses[0].status)
self.assertEqual('SUCCESS', create_statuses[1].status)
update_statuses = self._find_pool_manager_statuses(
self.admin_context, 'UPDATE', domain)
self.assertEqual(2, len(update_statuses))
self.assertEqual('ERROR', update_statuses[0].status)
self.assertEqual(0, update_statuses[0].serial_number)
self.assertEqual('ERROR', update_statuses[1].status)
self.assertEqual(0, update_statuses[1].serial_number)
self.assertEqual(0, len(create_statuses))
# Ensure notify_zone_changed and poll_for_serial_number
# was called for each backend server.
@ -176,12 +166,6 @@ class PoolManagerServiceTest(PoolManagerTestCase):
self.assertEqual('SUCCESS', create_statuses[0].status)
self.assertEqual('ERROR', create_statuses[1].status)
update_statuses = self._find_pool_manager_statuses(
self.admin_context, 'UPDATE', domain)
self.assertEqual(1, len(update_statuses))
self.assertEqual('ERROR', update_statuses[0].status)
self.assertEqual(0, update_statuses[0].serial_number)
mock_notify_zone_changed.assert_called_once_with(
self.admin_context, domain,
self.service.server_backends[0]['server'], 30, 2, 3, 0)
@ -197,17 +181,13 @@ class PoolManagerServiceTest(PoolManagerTestCase):
create_statuses = self._find_pool_manager_statuses(
self.admin_context, 'CREATE', domain)
self.assertEqual(2, len(create_statuses))
self.assertEqual('SUCCESS', create_statuses[0].status)
self.assertEqual('SUCCESS', create_statuses[1].status)
self.assertEqual(0, len(create_statuses))
self.service.delete_domain(self.admin_context, domain)
delete_statuses = self._find_pool_manager_statuses(
self.admin_context, 'DELETE', domain)
self.assertEqual(2, len(delete_statuses))
self.assertEqual('SUCCESS', delete_statuses[0].status)
self.assertEqual('SUCCESS', delete_statuses[1].status)
self.assertEqual(0, len(delete_statuses))
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'SUCCESS', domain.serial)
@ -222,9 +202,7 @@ class PoolManagerServiceTest(PoolManagerTestCase):
create_statuses = self._find_pool_manager_statuses(
self.admin_context, 'CREATE', domain)
self.assertEqual(2, len(create_statuses))
self.assertEqual('SUCCESS', create_statuses[0].status)
self.assertEqual('SUCCESS', create_statuses[1].status)
self.assertEqual(0, len(create_statuses))
mock_delete_domain.side_effect = exceptions.Backend
@ -249,9 +227,7 @@ class PoolManagerServiceTest(PoolManagerTestCase):
create_statuses = self._find_pool_manager_statuses(
self.admin_context, 'CREATE', domain)
self.assertEqual(2, len(create_statuses))
self.assertEqual('SUCCESS', create_statuses[0].status)
self.assertEqual('SUCCESS', create_statuses[1].status)
self.assertEqual(0, len(create_statuses))
mock_delete_domain.side_effect = [None, exceptions.Backend]
@ -266,6 +242,38 @@ class PoolManagerServiceTest(PoolManagerTestCase):
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'ERROR', domain.serial)
@patch.object(impl_fake.FakeBackend, 'delete_domain')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_delete_domain_backend_one_failure_success(
self, mock_update_status, mock_delete_domain):
self.service.stop()
self.config(
threshold_percentage=50,
group='service:pool_manager')
self.service = self.start_service('pool_manager')
domain = self.create_domain(name='example.org.')
self.service.create_domain(self.admin_context, domain)
create_statuses = self._find_pool_manager_statuses(
self.admin_context, 'CREATE', domain)
self.assertEqual(0, len(create_statuses))
mock_delete_domain.side_effect = [None, exceptions.Backend]
self.service.delete_domain(self.admin_context, domain)
delete_statuses = self._find_pool_manager_statuses(
self.admin_context, 'DELETE', domain)
self.assertEqual(2, len(delete_statuses))
self.assertEqual('SUCCESS', delete_statuses[0].status)
self.assertEqual('ERROR', delete_statuses[1].status)
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'SUCCESS', domain.serial)
@patch.object(mdns_rpcapi.MdnsAPI, 'poll_for_serial_number')
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
def test_update_domain(self, mock_notify_zone_changed,
@ -276,9 +284,7 @@ class PoolManagerServiceTest(PoolManagerTestCase):
create_statuses = self._find_pool_manager_statuses(
self.admin_context, 'CREATE', domain)
self.assertEqual(2, len(create_statuses))
self.assertEqual('SUCCESS', create_statuses[0].status)
self.assertEqual('SUCCESS', create_statuses[1].status)
self.assertEqual(0, len(create_statuses))
# Reset the mock call attributes.
mock_notify_zone_changed.reset_mock()
@ -311,17 +317,7 @@ class PoolManagerServiceTest(PoolManagerTestCase):
create_statuses = self._find_pool_manager_statuses(
self.admin_context, 'CREATE', domain)
self.assertEqual(2, len(create_statuses))
self.assertEqual('SUCCESS', create_statuses[0].status)
self.assertEqual('SUCCESS', create_statuses[1].status)
update_statuses = self._find_pool_manager_statuses(
self.admin_context, 'UPDATE', domain)
self.assertEqual(2, len(update_statuses))
self.assertEqual('ERROR', update_statuses[0].status)
self.assertEqual(0, update_statuses[0].serial_number)
self.assertEqual('ERROR', update_statuses[1].status)
self.assertEqual(0, update_statuses[1].serial_number)
self.assertEqual(0, len(create_statuses))
self.service.update_status(self.admin_context, domain,
self.service.server_backends[0]['server'],
@ -329,11 +325,9 @@ class PoolManagerServiceTest(PoolManagerTestCase):
update_statuses = self._find_pool_manager_statuses(
self.admin_context, 'UPDATE', domain)
self.assertEqual(2, len(update_statuses))
self.assertEqual(1, len(update_statuses))
self.assertEqual('SUCCESS', update_statuses[0].status)
self.assertEqual(domain.serial, update_statuses[0].serial_number)
self.assertEqual('ERROR', update_statuses[1].status)
self.assertEqual(0, update_statuses[1].serial_number)
# Ensure update_status was not called.
self.assertEqual(False, mock_update_status.called)
@ -344,24 +338,11 @@ class PoolManagerServiceTest(PoolManagerTestCase):
update_statuses = self._find_pool_manager_statuses(
self.admin_context, 'UPDATE', domain)
self.assertEqual(2, len(update_statuses))
self.assertEqual('SUCCESS', update_statuses[0].status)
self.assertEqual(domain.serial, update_statuses[0].serial_number)
self.assertEqual('SUCCESS', update_statuses[1].status)
self.assertEqual(domain.serial, update_statuses[1].serial_number)
self.assertEqual(0, len(update_statuses))
mock_update_status.assert_called_once_with(
self.admin_context, domain.id, 'SUCCESS', domain.serial)
def test_update_status_missing_status(self):
domain = self.create_domain(name='example.org.')
with testtools.ExpectedException(exceptions.PoolManagerStatusNotFound):
self.service.update_status(
self.admin_context, domain,
self.service.server_backends[0]['server'], 'SUCCESS',
domain.serial)
def _find_pool_manager_statuses(self, context, action,
domain=None, status=None):
criterion = {