Avoid endless loop on StorageFailure

If an error occures with writing atom detail in database(
persistence backend) flow execution enters an endless loop
throwing errors and retrying to save details.

Avoid this situation and log exception message.

Change-Id: Ic6b0a78d20124cc027468ecc6aeff189c25d1a8a
Closes-bug: 1889773
This commit is contained in:
Ann Taraday 2020-07-31 16:58:23 +04:00
parent cf327a2e2d
commit c32454213e
2 changed files with 18 additions and 0 deletions

View File

@ -0,0 +1,7 @@
---
fixes:
- |
Limit retries for storage failures on saving flow/task state in the storage.
Previously on StorageFailure exception may cause an endless loop during
execution of flows throwing errors and retrying to save details.

View File

@ -21,6 +21,7 @@ import fasteners
from oslo_utils import reflection from oslo_utils import reflection
from oslo_utils import uuidutils from oslo_utils import uuidutils
import six import six
import tenacity
from taskflow import exceptions from taskflow import exceptions
from taskflow import logging from taskflow import logging
@ -33,6 +34,8 @@ from taskflow.utils import misc
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
RETRY_ATTEMPTS = 3
RETRY_WAIT_TIMEOUT = 5
_EXECUTE_STATES_WITH_RESULTS = ( _EXECUTE_STATES_WITH_RESULTS = (
# The atom ``execute`` worked out :) # The atom ``execute`` worked out :)
@ -449,6 +452,10 @@ class Storage(object):
# This never changes (so no read locking needed). # This never changes (so no read locking needed).
return self._backend return self._backend
@tenacity.retry(retry=tenacity.retry_if_exception_type(
exception_types=exceptions.StorageFailure),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS),
wait=tenacity.wait_fixed(RETRY_WAIT_TIMEOUT))
def _save_flow_detail(self, conn, original_flow_detail, flow_detail): def _save_flow_detail(self, conn, original_flow_detail, flow_detail):
# NOTE(harlowja): we need to update our contained flow detail if # NOTE(harlowja): we need to update our contained flow detail if
# the result of the update actually added more (aka another process # the result of the update actually added more (aka another process
@ -482,6 +489,10 @@ class Storage(object):
else: else:
return (ad, ad) return (ad, ad)
@tenacity.retry(retry=tenacity.retry_if_exception_type(
exception_types=exceptions.StorageFailure),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS),
wait=tenacity.wait_fixed(RETRY_WAIT_TIMEOUT))
def _save_atom_detail(self, conn, original_atom_detail, atom_detail): def _save_atom_detail(self, conn, original_atom_detail, atom_detail):
# NOTE(harlowja): we need to update our contained atom detail if # NOTE(harlowja): we need to update our contained atom detail if
# the result of the update actually added more (aka another process # the result of the update actually added more (aka another process