Prepare retry decorator to move to plugin level

Retrying mutating operations at the API layer caused a
couple of problems. First, when components would call
the core plugin using the neutron manager, they would
have to handle the retriable failures on their own or
undo any work they had done so far and allow retriable
failures to be propagated up to the API. Second, retrying
at the API makes composite operations (e.g. auto allocate,
add_router_interface, etc) painful because they have to
consider if exceptions are retriable before raising
fatal exceptions on failures of core plugin calls.

This patch begins the process of moving them down to the
core operations with a new decorator called
'retry_if_session_inactive', which ensures that the
retry logic isn't triggered if there is an ongoing transaction
since retrying inside of a transaction is normally ineffective.
Follow-up patches apply them to various parts of the code-base.

Additionally, the args and kwargs of the method are automatically
deep copied in retries to ensure that any mangling the methods
do to their arguments don't impact their retriability.

Finally, since we are leaving the API decorators in place for now,
the retry logic will not be triggered by another decorator if an
exception has already been retried. This prevents an exponential
explosion of retries on nested retry decorators.

The ultimate goal will be to get rid of the API decorators entirely
so retries are up to each individual plugin.

Partial-Bug: #1596075
Partial-Bug: #1612798
Change-Id: I7b8a4a105aabfa1b5f5dd7a638099007b0933e66
This commit is contained in:
Kevin Benton 2016-09-07 18:27:49 -07:00
parent 93153f94da
commit 09c87425fa
5 changed files with 318 additions and 6 deletions

View File

@ -80,6 +80,7 @@ Neutron Internals
network_ip_availability network_ip_availability
tag tag
provisioning_blocks provisioning_blocks
retries
l3_agent_extensions l3_agent_extensions
Testing Testing

View File

@ -0,0 +1,178 @@
..
Licensed under the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with the License. You may obtain
a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
Convention for heading levels in Neutron devref:
======= Heading 0 (reserved for the title in a document)
------- Heading 1
~~~~~~~ Heading 2
+++++++ Heading 3
''''''' Heading 4
(Avoid deeper levels because they do not render well.)
Retrying Operations
===================
Inside of the neutron.db.api module there is a decorator called
'retry_if_session_inactive'. This should be used to protect any
functions that perform DB operations. This decorator will capture
any deadlock errors, RetryRequests, connection errors, and unique
constraint violations that are thrown by the function it is
protecting.
This decorator will not retry an operation if the function it is
applied to is called within an active session. This is because the
majority of the exceptions it captures put the session into a
partially rolled back state so it is no longer usable. It is important
to ensure there is a decorator outside of the start of the transaction.
The decorators are safe to nest if a function is sometimes called inside
of another transaction.
If a function is being protected that does not take context as an
argument the 'retry_db_errors' decorator function may be used instead.
It retries the same exceptions and has the same anti-nesting behavior
as 'retry_if_session_active', but it does not check if a session is
attached to any context keywords. ('retry_if_session_active' just uses
'retry_db_errors' internally after checking the session)
Idempotency on Failures
-----------------------
The function that is being decorated should always fully cleanup whenever
it encounters an exception so its safe to retry the operation. So if a
function creates a DB object, commits, then creates another, the function
must have a cleanup handler to remove the first DB object in the case that
the second one fails. Assume any DB operation can throw a retriable error.
You may see some retry decorators at the API layers in Neutron; however,
we are trying to eliminate them because each API operation has many
independent steps that makes ensuring idempotency on partial failures
very difficult.
Argument Mutation
-----------------
A decorated function should not mutate any complex arguments which are
passed into it. If it does, it should have an exception handler that reverts
the change so it's safe to retry.
The decorator will automatically create deep copies of sets, lists,
and dicts which are passed through it, but it will leave the other arguments
alone.
Retrying to Handle Race Conditions
----------------------------------
One of the difficulties with detecting race conditions to create a DB record
with a unique constraint is determining where to put the exception handler
because a constraint violation can happen immediately on flush or it may not
happen all of the way until the transaction is being committed on the exit
of the session context manager. So we would end up with code that looks
something like this:
::
def create_port(context, ip_address, mac_address):
_ensure_mac_not_in_use(context, mac_address)
_ensure_ip_not_in_use(context, ip_address)
try:
with context.session.begin():
port_obj = Port(ip=ip_address, mac=mac_address)
do_expensive_thing(...)
do_extra_other_thing(...)
return port_obj
except DBDuplicateEntry as e:
# code to parse columns
if 'mac' in e.columns:
raise MacInUse(mac_address)
if 'ip' in e.columns:
raise IPAddressInUse(ip)
def _ensure_mac_not_in_use(context, mac):
if context.session.query(Port).filter_by(mac=mac).count():
raise MacInUse(mac)
def _ensure_ip_not_in_use(context, ip):
if context.session.query(Port).filter_by(ip=ip).count():
raise IPAddressInUse(ip)
So we end up with an exception handler that has to understand where things
went wrong and convert them into appropriate exceptions for the end-users.
This distracts significantly from the main purpose of create_port.
Since the retry decorator will automatically catch and retry DB duplicate
errors for us, we can allow it to retry on this race condition which will
give the original validation logic to be re-executed and raise the
appropriate error. This keeps validation logic in one place and makes the
code cleaner.
::
from neutron.db import api as db_api
@db_api.retry_if_session_inactive()
def create_port(context, ip_address, mac_address):
_ensure_mac_not_in_use(context, mac_address)
_ensure_ip_not_in_use(context, ip_address)
with context.session.begin():
port_obj = Port(ip=ip_address, mac=mac_address)
do_expensive_thing(...)
do_extra_other_thing(...)
return port_obj
def _ensure_mac_not_in_use(context, mac):
if context.session.query(Port).filter_by(mac=mac).count():
raise MacInUse(mac)
def _ensure_ip_not_in_use(context, ip):
if context.session.query(Port).filter_by(ip=ip).count():
raise IPAddressInUse(ip)
Nesting
-------
Once the decorator retries an operation the maximum number of times, it
will attach a flag to the exception it raises further up that will prevent
decorators around the calling functions from retrying the error again.
This prevents an exponential increase in the number of retries if they are
layered.
Usage
-----
Here are some usage examples:
::
from neutron.db import api as db_api
@db_api.retry_if_session_inactive()
def create_elephant(context, elephant_details):
....
@db_api.retry_if_session_inactive()
def atomic_bulk_create_elephants(context, elephants):
with context.session.begin():
for elephant in elephants:
# note that if create_elephant throws a retriable
# exception, the decorator around it will not retry
# because the session is active. The decorator around
# atomic_bulk_create_elephants will be responsible for
# retrying the entire operation.
create_elephant(context, elephant)
# sample usage when session is attached to a var other than 'context'
@db_api.retry_if_session_inactive(context_var_name='ctx')
def some_function(ctx):
...

View File

@ -208,7 +208,7 @@ class Controller(object):
except oslo_policy.PolicyNotAuthorized: except oslo_policy.PolicyNotAuthorized:
msg = _('The resource could not be found.') msg = _('The resource could not be found.')
raise webob.exc.HTTPNotFound(msg) raise webob.exc.HTTPNotFound(msg)
body = copy.deepcopy(kwargs.pop('body', None)) body = kwargs.pop('body', None)
# Explicit comparison with None to distinguish from {} # Explicit comparison with None to distinguish from {}
if body is not None: if body is not None:
arg_list.append(body) arg_list.append(body)
@ -405,7 +405,7 @@ class Controller(object):
"""Creates a new instance of the requested entity.""" """Creates a new instance of the requested entity."""
parent_id = kwargs.get(self._parent_id_name) parent_id = kwargs.get(self._parent_id_name)
body = Controller.prepare_request_body(request.context, body = Controller.prepare_request_body(request.context,
copy.deepcopy(body), True, body, True,
self._resource, self._attr_info, self._resource, self._attr_info,
allow_bulk=self._allow_bulk) allow_bulk=self._allow_bulk)
action = self._plugin_handlers[self.CREATE] action = self._plugin_handlers[self.CREATE]
@ -577,7 +577,7 @@ class Controller(object):
@db_api.retry_db_errors @db_api.retry_db_errors
def _update(self, request, id, body, **kwargs): def _update(self, request, id, body, **kwargs):
body = Controller.prepare_request_body(request.context, body = Controller.prepare_request_body(request.context,
copy.deepcopy(body), False, body, False,
self._resource, self._attr_info, self._resource, self._attr_info,
allow_bulk=self._allow_bulk) allow_bulk=self._allow_bulk)
action = self._plugin_handlers[self.UPDATE] action = self._plugin_handlers[self.UPDATE]

View File

@ -14,6 +14,7 @@
# under the License. # under the License.
import contextlib import contextlib
import copy
from debtcollector import moves from debtcollector import moves
from debtcollector import removals from debtcollector import removals
@ -30,6 +31,7 @@ import sqlalchemy
from sqlalchemy.orm import exc from sqlalchemy.orm import exc
import traceback import traceback
from neutron._i18n import _LE
from neutron.common import profiler # noqa from neutron.common import profiler # noqa
@ -49,6 +51,8 @@ LOG = logging.getLogger(__name__)
def is_retriable(e): def is_retriable(e):
if getattr(e, '_RETRY_EXCEEDED', False):
return False
if _is_nested_instance(e, (db_exc.DBDeadlock, exc.StaleDataError, if _is_nested_instance(e, (db_exc.DBDeadlock, exc.StaleDataError,
db_exc.DBConnectionError, db_exc.DBConnectionError,
db_exc.DBDuplicateEntry, db_exc.RetryRequest)): db_exc.DBDuplicateEntry, db_exc.RetryRequest)):
@ -67,14 +71,51 @@ _retry_db_errors = oslo_db_api.wrap_db_retry(
) )
def retry_db_errors(f): def _tag_retriables_as_unretriable(f):
"""Log retriable exceptions before retry to help debugging.""" """Puts a flag on retriable exceptions so is_retriable returns False.
@_retry_db_errors This decorator can be used outside of a retry decorator to prevent
decorators higher up from retrying again.
"""
@six.wraps(f) @six.wraps(f)
def wrapped(*args, **kwargs): def wrapped(*args, **kwargs):
try: try:
return f(*args, **kwargs) return f(*args, **kwargs)
except Exception as e:
with excutils.save_and_reraise_exception():
if is_retriable(e):
setattr(e, '_RETRY_EXCEEDED', True)
return wrapped
def _copy_if_lds(item):
"""Deepcopy lists/dicts/sets, leave everything else alone."""
return copy.deepcopy(item) if isinstance(item, (list, dict, set)) else item
def retry_db_errors(f):
"""Nesting-safe retry decorator with auto-arg-copy and logging.
Retry decorator for all functions which do not accept a context as an
argument. If the function accepts a context, use
'retry_if_session_inactive' below.
If retriable errors are retried and exceed the count, they will be tagged
with a flag so is_retriable will no longer recognize them as retriable.
This prevents multiple applications of this decorator (and/or the one
below) from retrying the same exception.
"""
@_tag_retriables_as_unretriable
@_retry_db_errors
@six.wraps(f)
def wrapped(*args, **kwargs):
try:
# copy mutable args and kwargs to make retries safe. this doesn't
# prevent mutations of complex objects like the context or 'self'
dup_args = [_copy_if_lds(a) for a in args]
dup_kwargs = {k: _copy_if_lds(v) for k, v in kwargs.items()}
return f(*dup_args, **dup_kwargs)
except Exception as e: except Exception as e:
with excutils.save_and_reraise_exception(): with excutils.save_and_reraise_exception():
if is_retriable(e): if is_retriable(e):
@ -83,6 +124,39 @@ def retry_db_errors(f):
return wrapped return wrapped
def retry_if_session_inactive(context_var_name='context'):
"""Retries only if the session in the context is inactive.
Calls a retry_db_errors wrapped version of the function if the context's
session passed in is inactive, otherwise it just calls the function
directly. This is useful to avoid retrying things inside of a transaction
which is ineffective for DB races/errors.
This should be used in all cases where retries are desired and the method
accepts a context.
"""
def decorator(f):
try:
ctx_arg_index = f.__code__.co_varnames.index(context_var_name)
except ValueError:
raise RuntimeError(_LE("Could not find position of var %s")
% context_var_name)
f_with_retry = retry_db_errors(f)
@six.wraps(f)
def wrapped(*args, **kwargs):
# only use retry wrapper if we aren't nested in an active
# transaction
if context_var_name in kwargs:
context = kwargs[context_var_name]
else:
context = args[ctx_arg_index]
method = f if context.session.is_active else f_with_retry
return method(*args, **kwargs)
return wrapped
return decorator
def reraise_as_retryrequest(f): def reraise_as_retryrequest(f):
"""Packs retriable exceptions into a RetryRequest.""" """Packs retriable exceptions into a RetryRequest."""

View File

@ -101,6 +101,65 @@ class TestDeadLockDecorator(base.BaseTestCase):
"sa_savepoint_1 does not exist')") "sa_savepoint_1 does not exist')")
self.assertIsNone(self._decorated_function(1, e)) self.assertIsNone(self._decorated_function(1, e))
@db_api.retry_if_session_inactive('alt_context')
def _alt_context_function(self, alt_context, *args, **kwargs):
return self._decorated_function(*args, **kwargs)
@db_api.retry_if_session_inactive()
def _context_function(self, context, list_arg, dict_arg,
fail_count, exc_to_raise):
list_arg.append(1)
dict_arg[max(dict_arg.keys()) + 1] = True
self.fail_count = getattr(self, 'fail_count', fail_count + 1) - 1
if self.fail_count:
raise exc_to_raise
return list_arg, dict_arg
def test_stacked_retries_dont_explode_retry_count(self):
context = mock.Mock()
context.session.is_active = False
e = db_exc.DBConnectionError()
mock.patch('time.sleep').start()
with testtools.ExpectedException(db_exc.DBConnectionError):
# after 10 failures, the inner retry should give up and
# the exception should be tagged to prevent the outer retry
self._alt_context_function(context, 11, e)
def test_retry_if_session_inactive_args_not_mutated_after_retries(self):
context = mock.Mock()
context.session.is_active = False
list_arg = [1, 2, 3, 4]
dict_arg = {1: 'a', 2: 'b'}
l, d = self._context_function(context, list_arg, dict_arg,
5, db_exc.DBDeadlock())
# even though we had 5 failures the list and dict should only
# be mutated once
self.assertEqual(5, len(l))
self.assertEqual(3, len(d))
def test_retry_if_session_inactive_kwargs_not_mutated_after_retries(self):
context = mock.Mock()
context.session.is_active = False
list_arg = [1, 2, 3, 4]
dict_arg = {1: 'a', 2: 'b'}
l, d = self._context_function(context, list_arg=list_arg,
dict_arg=dict_arg,
fail_count=5,
exc_to_raise=db_exc.DBDeadlock())
# even though we had 5 failures the list and dict should only
# be mutated once
self.assertEqual(5, len(l))
self.assertEqual(3, len(d))
def test_retry_if_session_inactive_no_retry_in_active_session(self):
context = mock.Mock()
context.session.is_active = True
with testtools.ExpectedException(db_exc.DBDeadlock):
# retry decorator should have no effect in an active session
self._context_function(context, [], {1: 2},
fail_count=1,
exc_to_raise=db_exc.DBDeadlock())
class TestCommonDBfunctions(base.BaseTestCase): class TestCommonDBfunctions(base.BaseTestCase):