Refactor retry mechanism used in some DB operations

Use oslo_db helper that will allow to restart the whole
transaction in case it needs a certain operation to be repeated.
This is a workaround for the REPEATABLE READ problem where
retrying logic will not work because queries inside a transation
will not see updates made by other transactions.
So, run every attempt in a separate transaction.

Change-Id: I68f9ae8019879725df58f5da2c83bb699a548255
Closes-Bug: #1382064
This commit is contained in:
Eugene Nikanorov 2015-01-22 15:54:29 +03:00 committed by Assaf Muller
parent 48637c8933
commit 5dbb34b56f
5 changed files with 56 additions and 46 deletions

View File

@ -18,6 +18,8 @@ from oslo_db.sqlalchemy import session
_FACADE = None
MAX_RETRIES = 10
def _create_facade_lazily():
global _FACADE

View File

@ -17,14 +17,9 @@ from oslo_db import exception as db_exc
from oslo_log import log
from neutron.common import exceptions as exc
from neutron.i18n import _LW
from neutron.plugins.ml2 import driver_api as api
# Number of attempts to find a valid segment candidate and allocate it
DB_MAX_ATTEMPTS = 10
LOG = log.getLogger(__name__)
@ -108,37 +103,32 @@ class TypeDriverHelper(api.TypeDriver):
filter_by(allocated=False, **filters))
# Selected segment can be allocated before update by someone else,
# We retry until update success or DB_MAX_ATTEMPTS attempts
for attempt in range(1, DB_MAX_ATTEMPTS + 1):
alloc = select.first()
alloc = select.first()
if not alloc:
# No resource available
return
if not alloc:
# No resource available
return
raw_segment = dict((k, alloc[k]) for k in self.primary_keys)
LOG.debug("%(type)s segment allocate from pool, attempt "
"%(attempt)s started with %(segment)s ",
{"type": network_type, "attempt": attempt,
raw_segment = dict((k, alloc[k]) for k in self.primary_keys)
LOG.debug("%(type)s segment allocate from pool "
"started with %(segment)s ",
{"type": network_type,
"segment": raw_segment})
count = (session.query(self.model).
filter_by(allocated=False, **raw_segment).
update({"allocated": True}))
if count:
LOG.debug("%(type)s segment allocate from pool "
"success with %(segment)s ",
{"type": network_type,
"segment": raw_segment})
count = (session.query(self.model).
filter_by(allocated=False, **raw_segment).
update({"allocated": True}))
if count:
LOG.debug("%(type)s segment allocate from pool, attempt "
"%(attempt)s success with %(segment)s ",
{"type": network_type, "attempt": attempt,
"segment": raw_segment})
return alloc
return alloc
# Segment allocated since select
LOG.debug("Allocate %(type)s segment from pool, "
"attempt %(attempt)s failed with segment "
"%(segment)s",
{"type": network_type, "attempt": attempt,
"segment": raw_segment})
LOG.warning(_LW("Allocate %(type)s segment from pool failed "
"after %(number)s failed attempts"),
{"type": network_type, "number": DB_MAX_ATTEMPTS})
raise exc.NoNetworkFoundInMaximumAllowedAttempts()
# Segment allocated since select
LOG.debug("Allocate %(type)s segment from pool "
"failed with segment %(segment)s",
{"type": network_type,
"segment": raw_segment})
# saving real exception in case we exceeded amount of attempts
raise db_exc.RetryRequest(
exc.NoNetworkFoundInMaximumAllowedAttempts())

View File

@ -18,6 +18,7 @@ import contextlib
from eventlet import greenthread
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_db import api as oslo_db_api
from oslo_db import exception as os_db_exception
from oslo_log import log
from oslo_serialization import jsonutils
@ -597,8 +598,14 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self.mechanism_manager.create_network_precommit(mech_context)
return result, mech_context
@oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES,
retry_on_request=True)
def _create_network_with_retries(self, context, network):
return self._create_network_db(context, network)
def create_network(self, context, network):
result, mech_context = self._create_network_db(context, network)
result, mech_context = self._create_network_with_retries(context,
network)
try:
self.mechanism_manager.create_network_postcommit(mech_context)
except ml2_exc.MechanismDriverError:

View File

@ -16,9 +16,9 @@
import fixtures
import logging as std_logging
import mock
from oslo_db import exception as exc
from sqlalchemy.orm import query
from neutron.common import exceptions as exc
import neutron.db.api as db
from neutron.plugins.ml2.drivers import helpers
from neutron.plugins.ml2.drivers import type_vlan
@ -134,15 +134,10 @@ class HelpersTest(testlib_api.SqlTestCase):
def test_allocate_partial_segment_first_attempt_fails(self):
expected = dict(physical_network=TENANT_NET)
with mock.patch.object(query.Query, 'update', side_effect=[0, 1]):
self.assertRaises(
exc.RetryRequest,
self.driver.allocate_partially_specified_segment,
self.session, **expected)
observed = self.driver.allocate_partially_specified_segment(
self.session, **expected)
self.check_raw_segment(expected, observed)
def test_allocate_partial_segment_all_attempts_fail(self):
with mock.patch.object(query.Query, 'update', return_value=0):
with mock.patch.object(helpers.LOG, 'warning') as log_warning:
self.assertRaises(
exc.NoNetworkFoundInMaximumAllowedAttempts,
self.driver.allocate_partially_specified_segment,
self.session)
log_warning.assert_called_once_with(mock.ANY, mock.ANY)

View File

@ -21,10 +21,13 @@ import testtools
import uuid
import webob
from oslo_db import exception as db_exc
from neutron.common import constants
from neutron.common import exceptions as exc
from neutron.common import utils
from neutron import context
from neutron.db import api as db_api
from neutron.db import db_base_plugin_v2 as base_plugin
from neutron.db import l3_db
from neutron.extensions import external_net as external_net
@ -241,6 +244,19 @@ class TestMl2NetworksV2(test_plugin.TestNetworksV2,
for expected, actual in zip(expected_segments, segments):
self.assertEqual(expected, actual)
def test_create_network_segment_allocation_fails(self):
plugin = manager.NeutronManager.get_plugin()
with mock.patch.object(plugin.type_manager, 'create_network_segments',
side_effect=db_exc.RetryRequest(ValueError())) as f:
self.assertRaises(ValueError,
plugin.create_network,
context.get_admin_context(),
{'network': {'tenant_id': 'sometenant',
'name': 'dummy',
'admin_state_up': True,
'shared': False}})
self.assertEqual(db_api.MAX_RETRIES + 1, f.call_count)
class TestMl2SubnetsV2(test_plugin.TestSubnetsV2,
Ml2PluginV2TestCase):